YUKI Hiroshi
null+****@clear*****
Thu Oct 17 17:45:04 JST 2013
YUKI Hiroshi 2013-10-17 17:45:04 +0900 (Thu, 17 Oct 2013) New Revision: 9cc6ad726c286f0bac17b7c65a967a3fcdd0875b https://github.com/droonga/express-droonga/commit/9cc6ad726c286f0bac17b7c65a967a3fcdd0875b Message: Process "watch.notification" messages by the framework (workaround) Modified files: lib/adapter/socket.io.js lib/droonga-protocol/connection.js Modified: lib/adapter/socket.io.js (+52 -44) =================================================================== --- lib/adapter/socket.io.js 2013-10-17 16:47:26 +0900 (7e4a60f) +++ lib/adapter/socket.io.js 2013-10-17 17:45:04 +0900 (ef91e95) @@ -33,10 +33,8 @@ exports.register = function(application, server, params) { if (!server) throw new Error('A server instance is required!'); - function createClientMessageHandler(commandName, socket, onResponseCallback) { - var commandDefinition = unifiedCommandSet[commandName]; - if (!commandDefinition) - return; + function createClientMessageHandler(commandName, commandDefinition, socket, handlerOptions) { + handlerOptions = handlerOptions || {}; var defaultParameters = {}; @@ -75,8 +73,8 @@ exports.register = function(application, server, params) { options.event = responseEvent; } - if (onResponseCallback && typeof onResponseCallback == 'function') - onResponseCallback(); + if (handlerOptions.onResponse && typeof handlerOptions.onResponse == 'function') + handlerOptions.onResponse(); var wrappedSocket = new wrapper.SocketIOClientSocketWrapper(socket, options); if (commandDefinition[responseHandler]) { @@ -105,10 +103,8 @@ exports.register = function(application, server, params) { }); } - function createNotificationHandler(commandName, socket) { + function createNotificationHandler(commandName, commandDefinition, socket) { return (function(body) { - var commandDefinition = unifiedCommandSet[commandName]; - var event = commandName; var data = body; @@ -153,46 +149,64 @@ exports.register = function(application, server, params) { definition: definition }); }); + // "watch" command defined in api/socket.io.js + var watchSubscribers = {}; + var watchNotificationHandler = function(envelope) { + var subscribers = envelope.to; + if (!Array.isArray(subscribers)) + subscribers = [subscribers]; + subscribers.forEach(function(subscriber) { + subscriber.socket.emit('watch.notification', envelope.body); + }); + }; + connection.on('watch.notification', watchNotificationHandler); + var io = socketIo.listen(server); io.sockets.on('connection', function(socket) { application.emit('connection', socket); - var events = []; - var handlers = {}; registeredCommands.forEach(function(registeredCommand) { if (command.PublishSubscribe.isInstance(registeredCommand.definition)) { var subscribeEvent = registeredCommand.name + '.subscribe'; var unsubscribeEvent = registeredCommand.name + '.unsubscribe'; - var notificationEvent = registeredCommand.name + '.notification'; unifiedCommandSet[subscribeEvent] = - unifiedCommandSet[unsubscribeEvent] = - unifiedCommandSet[notificationEvent] = registeredCommand.definition; - - var subscriveHandler = createClientMessageHandler(subscribeEvent, socket, function() { - if (handlers[notificationEvent]) - return; - var handler = createNotificationHandler(notificationEvent, socket); - connection.on(notificationEvent, handler); - handlers[notificationEvent] = handler; - }); - if (subscriveHandler) - socket.on(subscribeEvent, subscriveHandler); - - var unsubscribeHandler = createClientMessageHandler(unsubscribeEvent, socket, function() { - if (!handlers[notificationEvent]) - return; - connection.removeListener(notificationEvent, handlers[notificationEvent]); - delete handlers[notificationEvent]; - }); - if (unsubscribeHandler) - socket.on(unsubscribeEvent, unsubscribeHandler); - - events.push(notificationEvent); + unifiedCommandSet[unsubscribeEvent] = registeredCommand.definition; + + socket.on( + subscribeEvent, + createClientMessageHandler( + subscribeEvent, + registeredCommand.definition, + socket, + { + onResponse: function() { + if (!watchSubscribers[subscriber]) + watchSubscribers[subscriber] = socket; + } + } + ) + ); + + socket.on( + unsubscribeEvent, + createClientMessageHandler( + unsubscribeEvent, + registeredCommand.definition, + socket, + { + onResponse: function() { + if (watchSubscribers[subscriber]) + delete watchSubscribers[subscriber]; + } + } + ) + ); } else { - var handler = createClientMessageHandler(registeredCommand.name, socket); - if (handler) - socket.on(registeredCommand.name, handler); + socket.on(registeredCommand.name, + createClientMessageHandler(registeredCommand.name, + registeredCommand.definition, + socket)); } }); @@ -200,12 +214,6 @@ exports.register = function(application, server, params) { connection.on('error', errorHandler); socket.on('disconnect', function() { - events.forEach(function(event) { - if (handlers[event]) - connection.removeListener(event, handlers[event]); - }); - events = undefined; - handlers = undefined; connection.removeListener('error', errorHandler); socket.removeAllListeners(); }); Modified: lib/droonga-protocol/connection.js (+1 -1) =================================================================== --- lib/droonga-protocol/connection.js 2013-10-17 16:47:26 +0900 (aa1a7fc) +++ lib/droonga-protocol/connection.js 2013-10-17 17:45:04 +0900 (f944467) @@ -92,7 +92,7 @@ Connection.prototype._handleMessage = function(envelope) { this.emit('reply:' + inReplyTo, errorCode, envelope); } else { debug('Connection._handleMessage.message %d:', this._id, envelope.type); - this.emit(envelope.type, envelope.body); + this.emit(envelope.type, envelope); } }; -------------- next part -------------- HTML����������������������������...Download