[Groonga-commit] droonga/express-droonga at be217d6 [master] Allow to define custom pub-sub commands

Back to archive index

YUKI Hiroshi null+****@clear*****
Wed Oct 23 18:28:55 JST 2013


YUKI Hiroshi	2013-10-23 18:28:55 +0900 (Wed, 23 Oct 2013)

  New Revision: be217d615a1c9d3262ddcc30dff67206e0ad379b
  https://github.com/droonga/express-droonga/commit/be217d615a1c9d3262ddcc30dff67206e0ad379b

  Message:
    Allow to define custom pub-sub commands

  Modified files:
    lib/adapter/api/socket.io.js
    lib/adapter/command.js
    lib/adapter/socket.io.js

  Modified: lib/adapter/api/socket.io.js (+4 -0)
===================================================================
--- lib/adapter/api/socket.io.js    2013-10-23 17:36:14 +0900 (bead7a7)
+++ lib/adapter/api/socket.io.js    2013-10-23 18:28:55 +0900 (f2101b9)
@@ -14,6 +14,10 @@ module.exports = {
       data.route = data.route || connection.routeToSelf;
       data.subscriber = data.subscriber || command.sha1sum(data.route);
       connection.emit('watch.unsubscribe', data);
+    },
+    notification: 'watch.notification', 
+    onNotify: function(data, socket) {
+      socket.emit('watch.notification', data);
     }
   })//,    
 

  Modified: lib/adapter/command.js (+3 -0)
===================================================================
--- lib/adapter/command.js    2013-10-23 17:36:14 +0900 (90270d7)
+++ lib/adapter/command.js    2013-10-23 18:28:55 +0900 (61c53cd)
@@ -57,6 +57,9 @@ Object.defineProperty(PublishSubscribe.prototype, 'onUnsubscribe', {
 Object.defineProperty(PublishSubscribe.prototype, 'onUnsubscribed', {
   get: function() { return this._options.onUnsubscribed; }
 });
+Object.defineProperty(PublishSubscribe.prototype, 'notification', {
+  get: function() { return this._options.notification; }
+});
 Object.defineProperty(PublishSubscribe.prototype, 'onNotify', {
   get: function() { return this._options.onNotify; }
 });

  Modified: lib/adapter/socket.io.js (+49 -35)
===================================================================
--- lib/adapter/socket.io.js    2013-10-23 17:36:14 +0900 (97eae82)
+++ lib/adapter/socket.io.js    2013-10-23 18:28:55 +0900 (4bbdcd6)
@@ -109,6 +109,37 @@ exports.register = function(application, server, params) {
     });
   }
 
+  var allSubscriberSockets = {};
+  function createNotificationHandler(notificationEvent, commandDefinition) {
+    return (function(envelope) {
+console.log(envelope);
+      var subscriberIds = envelope.to;
+      if (!Array.isArray(subscriberIds))
+        subscriberIds = [subscriberIds];
+
+      var subscriberSockets = allSubscriberSockets[notificationEvent] || {};
+console.log('subscriberSockets = '+Object.keys(subscriberSockets).length);
+      subscriberIds.forEach(function(subscriberId) {
+        subscriberSocket = subscriberSockets[subscriberId];
+        if (!subscriberSocket)
+          return;
+        try {
+          if (commandDefinition.onNotify) {
+            try {
+              commandDefinition.onNotify(envelope.body, subscriberSocket);
+            } catch(error) {
+              subscriberSocket.emit('error', error.message || error);
+            }
+          } else {
+            subscriberSocket.emit(notificationEvent, envelope.body);
+          }
+        } catch(exception) {
+          console.log(exception + '\n' + exception.stack);
+        }
+      });
+    });
+  }
+
   var commandSets = api.normalize(params.plugins);
   
   var unifiedCommandSet = {};
@@ -128,35 +159,18 @@ exports.register = function(application, server, params) {
       return;
     registeredCommands.push({ name:       commandName,
                               definition: definition });
+    if (definition.notification)
+      connection.on(definition.notification,
+                    createNotificationHandler(definition.notification,
+                                              definition));
   });
 
-// =========================WORKAROUND==========================
-  // "watch" command defined in api/socket.io.js
-  var watchSubscribers = {};
-  var watchNotificationHandler = function(envelope) {
-    var subscribers = envelope.to;
-    if (!Array.isArray(subscribers))
-      subscribers = [subscribers];
-    subscribers.forEach(function(subscriber) {
-      subscriber = watchSubscribers[subscriber];
-      if (!subscriber)
-        return;
-      try {
-        subscriber.emit('watch.notification', envelope.body);
-      } catch(exception) {
-        console.log(exception + '\n' + exception.stack);
-      }
-    });
-  };
-  connection.on('watch.notification', watchNotificationHandler);
-// /========================/WORKAROUND==========================
-
   var io = socketIo.listen(server);
   io.sockets.on('connection', function(socket) {
     application.emit('connection', socket);
 
-    var watchSubscriber = connection.getRouteToSelf({ sessionId: socket.id });
-    watchSubscriber = command.sha1sum(watchSubscriber);
+    var subscriberId = connection.getRouteToSelf({ sessionId: socket.id });
+    subscriberId = command.sha1sum(subscriberId);
 
     var subscribeHandlers = {};
     var unsubscribeHandlers = {};
@@ -169,16 +183,18 @@ exports.register = function(application, server, params) {
         unifiedCommandSet[subscribeEvent] =
           unifiedCommandSet[unsubscribeEvent] = registeredCommand.definition;
 
+        var notificationEvent = registeredCommand.definition.notification;
+        var subscriberSockets = allSubscriberSockets[notificationEvent] || {};
+        allSubscriberSockets[notificationEvent] = subscriberSockets;
+
         var subscribeHandler = createClientMessageHandler(
             subscribeEvent,
             registeredCommand.definition,
             socket,
             {
               onResponse: function() {
-// =========================WORKAROUND==========================
-                if (!watchSubscribers[watchSubscriber])
-                  watchSubscribers[watchSubscriber] = socket;
-// ========================/WORKAROUND==========================
+                if (!subscriberSockets[subscriberId])
+                  subscriberSockets[subscriberId] = socket;
               }
             }
         );
@@ -192,10 +208,8 @@ exports.register = function(application, server, params) {
             socket,
             {
               onResponse: function() {
-// =========================WORKAROUND==========================
-                if (watchSubscribers[watchSubscriber])
-                  delete watchSubscribers[watchSubscriber];
-// ========================/WORKAROUND==========================
+                if (subscriberSockets[subscriberId])
+                  delete subscriberSockets[subscriberId];
               }
             }
         );
@@ -214,14 +228,14 @@ exports.register = function(application, server, params) {
     connection.on('error', errorHandler);
 
     socket.on('disconnect', function() {
-      unsubscribeHandlers.forEach(function(event) {
+/*
+      Object.keys(unsubscribeHandlers).forEach(function(event) {
         unsubscribeHandlers[event].forEach(function(handler) {
           handler(data);
         });
       });
-// =========================WORKAROUND==========================
-      delete watchSubscribers[watchSubscriber];
-// ========================/WORKAROUND==========================
+*/
+      delete subscriberSockets[subscriberId];
       connection.removeListener('error', errorHandler);
       socket.removeAllListeners();
     });
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index