YUKI Hiroshi
null+****@clear*****
Wed Oct 23 18:28:55 JST 2013
YUKI Hiroshi 2013-10-23 18:28:55 +0900 (Wed, 23 Oct 2013) New Revision: be217d615a1c9d3262ddcc30dff67206e0ad379b https://github.com/droonga/express-droonga/commit/be217d615a1c9d3262ddcc30dff67206e0ad379b Message: Allow to define custom pub-sub commands Modified files: lib/adapter/api/socket.io.js lib/adapter/command.js lib/adapter/socket.io.js Modified: lib/adapter/api/socket.io.js (+4 -0) =================================================================== --- lib/adapter/api/socket.io.js 2013-10-23 17:36:14 +0900 (bead7a7) +++ lib/adapter/api/socket.io.js 2013-10-23 18:28:55 +0900 (f2101b9) @@ -14,6 +14,10 @@ module.exports = { data.route = data.route || connection.routeToSelf; data.subscriber = data.subscriber || command.sha1sum(data.route); connection.emit('watch.unsubscribe', data); + }, + notification: 'watch.notification', + onNotify: function(data, socket) { + socket.emit('watch.notification', data); } })//, Modified: lib/adapter/command.js (+3 -0) =================================================================== --- lib/adapter/command.js 2013-10-23 17:36:14 +0900 (90270d7) +++ lib/adapter/command.js 2013-10-23 18:28:55 +0900 (61c53cd) @@ -57,6 +57,9 @@ Object.defineProperty(PublishSubscribe.prototype, 'onUnsubscribe', { Object.defineProperty(PublishSubscribe.prototype, 'onUnsubscribed', { get: function() { return this._options.onUnsubscribed; } }); +Object.defineProperty(PublishSubscribe.prototype, 'notification', { + get: function() { return this._options.notification; } +}); Object.defineProperty(PublishSubscribe.prototype, 'onNotify', { get: function() { return this._options.onNotify; } }); Modified: lib/adapter/socket.io.js (+49 -35) =================================================================== --- lib/adapter/socket.io.js 2013-10-23 17:36:14 +0900 (97eae82) +++ lib/adapter/socket.io.js 2013-10-23 18:28:55 +0900 (4bbdcd6) @@ -109,6 +109,37 @@ exports.register = function(application, server, params) { }); } + var allSubscriberSockets = {}; + function createNotificationHandler(notificationEvent, commandDefinition) { + return (function(envelope) { +console.log(envelope); + var subscriberIds = envelope.to; + if (!Array.isArray(subscriberIds)) + subscriberIds = [subscriberIds]; + + var subscriberSockets = allSubscriberSockets[notificationEvent] || {}; +console.log('subscriberSockets = '+Object.keys(subscriberSockets).length); + subscriberIds.forEach(function(subscriberId) { + subscriberSocket = subscriberSockets[subscriberId]; + if (!subscriberSocket) + return; + try { + if (commandDefinition.onNotify) { + try { + commandDefinition.onNotify(envelope.body, subscriberSocket); + } catch(error) { + subscriberSocket.emit('error', error.message || error); + } + } else { + subscriberSocket.emit(notificationEvent, envelope.body); + } + } catch(exception) { + console.log(exception + '\n' + exception.stack); + } + }); + }); + } + var commandSets = api.normalize(params.plugins); var unifiedCommandSet = {}; @@ -128,35 +159,18 @@ exports.register = function(application, server, params) { return; registeredCommands.push({ name: commandName, definition: definition }); + if (definition.notification) + connection.on(definition.notification, + createNotificationHandler(definition.notification, + definition)); }); -// =========================WORKAROUND========================== - // "watch" command defined in api/socket.io.js - var watchSubscribers = {}; - var watchNotificationHandler = function(envelope) { - var subscribers = envelope.to; - if (!Array.isArray(subscribers)) - subscribers = [subscribers]; - subscribers.forEach(function(subscriber) { - subscriber = watchSubscribers[subscriber]; - if (!subscriber) - return; - try { - subscriber.emit('watch.notification', envelope.body); - } catch(exception) { - console.log(exception + '\n' + exception.stack); - } - }); - }; - connection.on('watch.notification', watchNotificationHandler); -// /========================/WORKAROUND========================== - var io = socketIo.listen(server); io.sockets.on('connection', function(socket) { application.emit('connection', socket); - var watchSubscriber = connection.getRouteToSelf({ sessionId: socket.id }); - watchSubscriber = command.sha1sum(watchSubscriber); + var subscriberId = connection.getRouteToSelf({ sessionId: socket.id }); + subscriberId = command.sha1sum(subscriberId); var subscribeHandlers = {}; var unsubscribeHandlers = {}; @@ -169,16 +183,18 @@ exports.register = function(application, server, params) { unifiedCommandSet[subscribeEvent] = unifiedCommandSet[unsubscribeEvent] = registeredCommand.definition; + var notificationEvent = registeredCommand.definition.notification; + var subscriberSockets = allSubscriberSockets[notificationEvent] || {}; + allSubscriberSockets[notificationEvent] = subscriberSockets; + var subscribeHandler = createClientMessageHandler( subscribeEvent, registeredCommand.definition, socket, { onResponse: function() { -// =========================WORKAROUND========================== - if (!watchSubscribers[watchSubscriber]) - watchSubscribers[watchSubscriber] = socket; -// ========================/WORKAROUND========================== + if (!subscriberSockets[subscriberId]) + subscriberSockets[subscriberId] = socket; } } ); @@ -192,10 +208,8 @@ exports.register = function(application, server, params) { socket, { onResponse: function() { -// =========================WORKAROUND========================== - if (watchSubscribers[watchSubscriber]) - delete watchSubscribers[watchSubscriber]; -// ========================/WORKAROUND========================== + if (subscriberSockets[subscriberId]) + delete subscriberSockets[subscriberId]; } } ); @@ -214,14 +228,14 @@ exports.register = function(application, server, params) { connection.on('error', errorHandler); socket.on('disconnect', function() { - unsubscribeHandlers.forEach(function(event) { +/* + Object.keys(unsubscribeHandlers).forEach(function(event) { unsubscribeHandlers[event].forEach(function(handler) { handler(data); }); }); -// =========================WORKAROUND========================== - delete watchSubscribers[watchSubscriber]; -// ========================/WORKAROUND========================== +*/ + delete subscriberSockets[subscriberId]; connection.removeListener('error', errorHandler); socket.removeAllListeners(); }); -------------- next part -------------- HTML����������������������������...Download