[Groonga-commit] droonga/express-droonga at 9cc6ad7 [master] Process "watch.notification" messages by the framework (workaround)

Back to archive index

YUKI Hiroshi null+****@clear*****
Thu Oct 17 17:45:04 JST 2013


YUKI Hiroshi	2013-10-17 17:45:04 +0900 (Thu, 17 Oct 2013)

  New Revision: 9cc6ad726c286f0bac17b7c65a967a3fcdd0875b
  https://github.com/droonga/express-droonga/commit/9cc6ad726c286f0bac17b7c65a967a3fcdd0875b

  Message:
    Process "watch.notification" messages by the framework (workaround)

  Modified files:
    lib/adapter/socket.io.js
    lib/droonga-protocol/connection.js

  Modified: lib/adapter/socket.io.js (+52 -44)
===================================================================
--- lib/adapter/socket.io.js    2013-10-17 16:47:26 +0900 (7e4a60f)
+++ lib/adapter/socket.io.js    2013-10-17 17:45:04 +0900 (ef91e95)
@@ -33,10 +33,8 @@ exports.register = function(application, server, params) {
   if (!server)
     throw new Error('A server instance is required!');
 
-  function createClientMessageHandler(commandName, socket, onResponseCallback) {
-    var commandDefinition = unifiedCommandSet[commandName];
-    if (!commandDefinition)
-      return;
+  function createClientMessageHandler(commandName, commandDefinition, socket, handlerOptions) {
+    handlerOptions = handlerOptions || {};
 
     var defaultParameters = {};
 
@@ -75,8 +73,8 @@ exports.register = function(application, server, params) {
           options.event = responseEvent;
         }
 
-        if (onResponseCallback && typeof onResponseCallback == 'function')
-          onResponseCallback();
+        if (handlerOptions.onResponse && typeof handlerOptions.onResponse == 'function')
+          handlerOptions.onResponse();
 
         var wrappedSocket = new wrapper.SocketIOClientSocketWrapper(socket, options);
         if (commandDefinition[responseHandler]) {
@@ -105,10 +103,8 @@ exports.register = function(application, server, params) {
     });
   }
 
-  function createNotificationHandler(commandName, socket) {
+  function createNotificationHandler(commandName, commandDefinition, socket) {
     return (function(body) {
-      var commandDefinition = unifiedCommandSet[commandName];
-
       var event = commandName;
       var data = body;
 
@@ -153,46 +149,64 @@ exports.register = function(application, server, params) {
                               definition: definition });
   });
 
+  // "watch" command defined in api/socket.io.js
+  var watchSubscribers = {};
+  var watchNotificationHandler = function(envelope) {
+    var subscribers = envelope.to;
+    if (!Array.isArray(subscribers))
+      subscribers = [subscribers];
+    subscribers.forEach(function(subscriber) {
+      subscriber.socket.emit('watch.notification', envelope.body);
+    });
+  };
+  connection.on('watch.notification', watchNotificationHandler);
+
   var io = socketIo.listen(server);
   io.sockets.on('connection', function(socket) {
     application.emit('connection', socket);
 
-    var events = [];
-    var handlers = {};
     registeredCommands.forEach(function(registeredCommand) {
       if (command.PublishSubscribe.isInstance(registeredCommand.definition)) {
         var subscribeEvent = registeredCommand.name + '.subscribe';
         var unsubscribeEvent = registeredCommand.name + '.unsubscribe';
-        var notificationEvent = registeredCommand.name + '.notification';
 
         unifiedCommandSet[subscribeEvent] =
-          unifiedCommandSet[unsubscribeEvent] =
-          unifiedCommandSet[notificationEvent] = registeredCommand.definition;
-
-        var subscriveHandler = createClientMessageHandler(subscribeEvent, socket, function() {
-          if (handlers[notificationEvent])
-            return;
-          var handler = createNotificationHandler(notificationEvent, socket);
-          connection.on(notificationEvent, handler);
-          handlers[notificationEvent] = handler;
-        });
-        if (subscriveHandler)
-          socket.on(subscribeEvent, subscriveHandler);
-
-        var unsubscribeHandler = createClientMessageHandler(unsubscribeEvent, socket, function() {
-          if (!handlers[notificationEvent])
-            return;
-          connection.removeListener(notificationEvent, handlers[notificationEvent]);
-          delete handlers[notificationEvent];
-        });
-        if (unsubscribeHandler)
-          socket.on(unsubscribeEvent, unsubscribeHandler);
-
-        events.push(notificationEvent);
+          unifiedCommandSet[unsubscribeEvent] = registeredCommand.definition;
+
+        socket.on(
+          subscribeEvent,
+          createClientMessageHandler(
+            subscribeEvent,
+            registeredCommand.definition,
+            socket,
+            {
+              onResponse: function() {
+                if (!watchSubscribers[subscriber])
+                  watchSubscribers[subscriber] = socket;
+              }
+            }
+          )
+        );
+
+        socket.on(
+          unsubscribeEvent,
+          createClientMessageHandler(
+            unsubscribeEvent,
+            registeredCommand.definition,
+            socket,
+            {
+              onResponse: function() {
+                if (watchSubscribers[subscriber])
+                  delete watchSubscribers[subscriber];
+              }
+            }
+          )
+        );
       } else {
-        var handler = createClientMessageHandler(registeredCommand.name, socket);
-        if (handler)
-          socket.on(registeredCommand.name, handler);
+        socket.on(registeredCommand.name,
+                  createClientMessageHandler(registeredCommand.name,
+                                             registeredCommand.definition,
+                                             socket));
       }
     });
 
@@ -200,12 +214,6 @@ exports.register = function(application, server, params) {
     connection.on('error', errorHandler);
 
     socket.on('disconnect', function() {
-      events.forEach(function(event) {
-        if (handlers[event])
-          connection.removeListener(event, handlers[event]);
-      });
-      events = undefined;
-      handlers = undefined;
       connection.removeListener('error', errorHandler);
       socket.removeAllListeners();
     });

  Modified: lib/droonga-protocol/connection.js (+1 -1)
===================================================================
--- lib/droonga-protocol/connection.js    2013-10-17 16:47:26 +0900 (aa1a7fc)
+++ lib/droonga-protocol/connection.js    2013-10-17 17:45:04 +0900 (f944467)
@@ -92,7 +92,7 @@ Connection.prototype._handleMessage = function(envelope) {
     this.emit('reply:' + inReplyTo, errorCode, envelope);
   } else {
     debug('Connection._handleMessage.message %d:', this._id, envelope.type);
-    this.emit(envelope.type, envelope.body);
+    this.emit(envelope.type, envelope);
   }
 };
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index