Kouhei Sutou
null+****@clear*****
Sun Nov 24 18:14:07 JST 2013
Kouhei Sutou 2013-11-24 18:14:07 +0900 (Sun, 24 Nov 2013) New Revision: e91405f212644651484782bf3f51fcc38e83f320 https://github.com/droonga/fluent-plugin-droonga/commit/e91405f212644651484782bf3f51fcc38e83f320 Message: Extract distribution code from adapter as distributor Added files: lib/droonga/distributor.rb lib/droonga/distributor_plugin.rb lib/droonga/plugin/distributor/crud.rb Copied files: lib/droonga/plugin/distributor/groonga.rb (from lib/droonga/plugin/adapter/groonga.rb) lib/droonga/plugin/distributor/search.rb (from lib/droonga/adapter.rb) lib/droonga/plugin/distributor/watch.rb (from lib/droonga/plugin/adapter/groonga.rb) Modified files: lib/droonga/adapter.rb lib/droonga/dispatcher.rb lib/droonga/executor.rb lib/droonga/plugin/adapter/groonga.rb test/unit/test_adapter.rb Modified: lib/droonga/adapter.rb (+1 -139) =================================================================== --- lib/droonga/adapter.rb 2013-11-24 17:40:03 +0900 (a0246bc) +++ lib/droonga/adapter.rb 2013-11-24 18:14:07 +0900 (137ba55) @@ -15,145 +15,7 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/legacy_plugin" - module Droonga - class Adapter < Droonga::LegacyPlugin - def scatter_all(request, key) - message = [{ - "command"=> envelope["type"], - "dataset"=> envelope["dataset"], - "body"=> request, - "key"=> key, - "type"=> "scatter", - "replica"=> "all", - "post"=> true - }] - post(message, "dispatcher") - end - - def broadcast_all(request) - message = [{ - "command"=> envelope["type"], - "dataset"=> envelope["dataset"], - "body"=> request, - "type"=> "broadcast", - "replica"=> "all", - "post"=> true - }] - post(message, "dispatcher") - end - - def prefer_synchronous?(command) - return true - end - end - - class BasicAdapter < Adapter - Droonga::LegacyPlugin.repository.register("adapter", self) - - command :table_create - def table_create(request) - unless envelope["dataset"] - raise "dataset must be set. FIXME: This error should return client." - end - broadcast_all(request) - end - - command :column_create - def column_create(request) - broadcast_all(request) - end - - command "watch.feed" => :feed - def feed(request) - broadcast_all(request) - end - - command "watch.subscribe" => :subscribe - def subscribe(request) - broadcast_all(request) - end - - command "watch.unsubscribe" => :unsubscribe - def unsubscribe(request) - broadcast_all(request) - end - - command :add - def add(request) - # TODO: update events must be serialized in the primary node of replicas. - key = request["key"] || rand.to_s - scatter_all(request, key) - end - - command :update - def update(request) - # TODO: update events must be serialized in the primary node of replicas. - key = request["key"] || rand.to_s - scatter_all(request, key) - end - - command :reset - def reset(request) - # TODO: update events must be serialized in the primary node of replicas. - key = request["key"] || rand.to_s - scatter_all(request, key) - end - - command :search - def search(request) - message = [] - input_names = [] - output_names = [] - name_mapper = {} - request["queries"].each do |input_name, query| - output = query["output"] - next unless output - input_names << input_name - output_name = input_name + "_reduced" - output_names << output_name - name_mapper[output_name] = input_name - # TODO: offset & limit must be arranged here. - elements = {} - output["elements"].each do |element| - case element - when "count" - elements[element] = ["sum"] - when "records" - # TODO: must take "sortBy" section into account. - elements[element] = ["sort", "<"] - end - end - reducer = { - "inputs"=> [input_name], - "outputs"=> [output_name], - "type"=> "reduce", - "body"=> { - input_name=> { - output_name=> elements - } - } - } - message << reducer - end - gatherer = { - "inputs"=> output_names, - "type"=> "gather", - "body"=> name_mapper, - "post"=> true - } - message << gatherer - searcher = { - "dataset"=> envelope["dataset"] || request["dataset"], - "outputs"=> input_names, - "type"=> "broadcast", - "command"=> "search", - "replica"=> "random", - "body"=> request - } - message.push(searcher) - post(message, "dispatcher") - end + class Adapter end end Modified: lib/droonga/dispatcher.rb (+1 -1) =================================================================== --- lib/droonga/dispatcher.rb 2013-11-24 17:40:03 +0900 (33082ac) +++ lib/droonga/dispatcher.rb 2013-11-24 18:14:07 +0900 (6efac4e) @@ -33,7 +33,7 @@ module Droonga @collectors = {} @current_id = 0 @local = Regexp.new("^#{@name}") - plugins = ["collector"] + (Droonga.catalog.option("plugins")||[]) + ["adapter"] + plugins = ["collector"] + (Droonga.catalog.option("plugins")||[]) plugins.each do |plugin| @worker.add_legacy_plugin(plugin) end Added: lib/droonga/distributor.rb (+74 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/distributor.rb 2013-11-24 18:14:07 +0900 (e4eaf1a) @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/distributor_plugin" + +module Droonga + class Distributor + def initialize(executor, options={}) + @executor = executor + @plugins = [] + @options = options + # TODO: don't put the default distributions + load_plugins(options[:distributors] || ["search", "crud", "groonga", "watch"]) + end + + def shutdown + $log.trace("#{log_tag}: shutdown: start") + @plugins.each do |plugin| + plugins.shutdown + end + $log.trace("#{log_tag}: shutdown: done") + end + + def distribute(envelope) + command = envelope["type"] + plugin = find_plugin(command) + if plugin.nil? + raise "unknown distributor plugin: <#{command}>: " + + "TODO: improve error hndling" + end + plugin.process(envelope) + end + + def post(message) + @executor.post(message, "dispatcher") + end + + private + def load_plugins(plugin_names) + plugin_names.each do |plugin_name| + add_plugin(plugin_name) + end + end + + def add_plugin(name) + plugin = DistributorPlugin.repository.instantiate(name, self) + @plugins << plugin + end + + def find_plugin(command) + @plugins.find do |plugin| + plugin.processable?(command) + end + end + + def log_tag + "[#{Process.pid}] distributor" + end + end +end Added: lib/droonga/distributor_plugin.rb (+98 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/distributor_plugin.rb 2013-11-24 18:14:07 +0900 (66cfe53) @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/plugin_repository" + +module Droonga + class DistributorPlugin + @@repository = PluginRepository.new + + class << self + def inherited(sub_class) + super + sub_class.instance_variable_set(:@command_mapper, CommandMapper.new) + end + + def repository + @@repository + end + + def command(name_or_map) + @command_mapper.register(name_or_map) + end + + def method_name(command) + @command_mapper[command] + end + + def processable?(command) + not method_name(command).nil? + end + end + + def initialize(distributor) + @distributor = distributor + end + + # TODO: consider better name + def post(message) + @distributor.post(message) + end + + def shutdown + end + + def processable?(command) + self.class.processable?(command) + end + + def process(envelope, *arguments) + command = envelope["type"] + __send__(self.class.method_name(command), envelope, *arguments) + rescue => exception + Logger.error("error while processing #{command}", + envelope: envelope, + arguments: arguments, + exception: exception) + end + + def scatter_all(envelope, key) + message = [{ + "command"=> envelope["type"], + "dataset"=> envelope["dataset"], + "body"=> envelope["body"], + "key"=> key, + "type"=> "scatter", + "replica"=> "all", + "post"=> true + }] + post(message) + end + + def broadcast_all(envelope) + distirubte_message = [{ + "command"=> envelope["type"], + "dataset"=> envelope["dataset"], + "body"=> envelope["body"], + "type"=> "broadcast", + "replica"=> "all", + "post"=> true + }] + post(distirubte_message) + end + end +end Modified: lib/droonga/executor.rb (+6 -0) =================================================================== --- lib/droonga/executor.rb 2013-11-24 17:40:03 +0900 (1a27725) +++ lib/droonga/executor.rb 2013-11-24 18:14:07 +0900 (da74bd9) @@ -22,6 +22,7 @@ require "groonga" require "droonga/legacy_plugin" require "droonga/plugin_loader" require "droonga/dispatcher" +require "droonga/distributor" module Droonga class Executor @@ -42,6 +43,7 @@ module Droonga def shutdown $log.trace("#{log_tag}: shutdown: start") + @distributor.shutdown @legacy_plugins.each do |legacy_plugin| legacy_plugin.shutdown end @@ -122,6 +124,9 @@ module Droonga legacy_plugin.handle(command, body, *arguments) $log.trace("#{log_tag}: post_or_push: handle: done: <#{command}>", :plugin => legacy_plugin.class) + else + @distributor.distribute(envelope.merge("type" => command, + "body" => body)) end end add_route(route) if route @@ -198,6 +203,7 @@ module Droonga @context = Groonga::Context.new @database =****@conte*****_database(@database_name) end + @distributor = Distributor.new(self, @options) add_legacy_plugin("dispatcher_message") end Modified: lib/droonga/plugin/adapter/groonga.rb (+2 -2) =================================================================== --- lib/droonga/plugin/adapter/groonga.rb 2013-11-24 17:40:03 +0900 (635c09e) +++ lib/droonga/plugin/adapter/groonga.rb 2013-11-24 18:14:07 +0900 (ef96ac1) @@ -15,10 +15,10 @@ require "groonga" -require "droonga/adapter" +require "droonga/legacy_plugin" module Droonga - class GroongaAdapter < Droonga::Adapter + class GroongaAdapter < Droonga::LegacyPlugin # TODO: AdapterPlugin or something should be defined to avoid conflicts. Droonga::LegacyPlugin.repository.register("select", self) command :select Added: lib/droonga/plugin/distributor/crud.rb (+46 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/plugin/distributor/crud.rb 2013-11-24 18:14:07 +0900 (e89f670) @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/distributor_plugin" + +module Droonga + class CRUDDistributor < Droonga::DistributorPlugin + repository.register("crud", self) + + command :add + def add(envelope) + # TODO: update events must be serialized in the primary node of replicas. + key = envelope["body"]["key"] || rand.to_s + scatter_all(envelope, key) + end + + command :update + def update(envelope) + # TODO: update events must be serialized in the primary node of replicas. + key = envelope["body"]["key"] || rand.to_s + scatter_all(envelope, key) + end + + # TODO: What is this? + command :reset + def reset(envelope) + # TODO: update events must be serialized in the primary node of replicas. + key = envelope["body"]["key"] || rand.to_s + scatter_all(envelope, key) + end + end +end Copied: lib/droonga/plugin/distributor/groonga.rb (+15 -18) 53% =================================================================== --- lib/droonga/plugin/adapter/groonga.rb 2013-11-24 17:40:03 +0900 (635c09e) +++ lib/droonga/plugin/distributor/groonga.rb 2013-11-24 18:14:07 +0900 (54d54cb) @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# # Copyright (C) 2013 Droonga Project # # This library is free software; you can redistribute it and/or @@ -13,28 +15,23 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "groonga" - -require "droonga/adapter" +require "droonga/distributor_plugin" module Droonga - class GroongaAdapter < Droonga::Adapter - # TODO: AdapterPlugin or something should be defined to avoid conflicts. - Droonga::LegacyPlugin.repository.register("select", self) - command :select - def select(select_request) - command = Select.new - search_request = command.convert_request(select_request) - add_route("select_response") - post(search_request, "search") + class GroongaDistributor < Droonga::DistributorPlugin + repository.register("groonga", self) + + command :table_create + def table_create(envelope) + unless envelope["dataset"] + raise "dataset must be set. FIXME: This error should return client." + end + broadcast_all(envelope) end - command :select_response - def select_response(search_response) - command = Select.new - emit(command.convert_response(search_response)) + command :column_create + def column_create(envelope) + broadcast_all(envelope) end end end - -require "droonga/plugin/adapter/groonga/select" Copied: lib/droonga/plugin/distributor/search.rb (+6 -85) 51% =================================================================== --- lib/droonga/adapter.rb 2013-11-24 17:40:03 +0900 (a0246bc) +++ lib/droonga/plugin/distributor/search.rb 2013-11-24 18:14:07 +0900 (97d2973) @@ -15,98 +15,19 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/legacy_plugin" +require "droonga/distributor_plugin" module Droonga - class Adapter < Droonga::LegacyPlugin - def scatter_all(request, key) - message = [{ - "command"=> envelope["type"], - "dataset"=> envelope["dataset"], - "body"=> request, - "key"=> key, - "type"=> "scatter", - "replica"=> "all", - "post"=> true - }] - post(message, "dispatcher") - end - - def broadcast_all(request) - message = [{ - "command"=> envelope["type"], - "dataset"=> envelope["dataset"], - "body"=> request, - "type"=> "broadcast", - "replica"=> "all", - "post"=> true - }] - post(message, "dispatcher") - end - - def prefer_synchronous?(command) - return true - end - end - - class BasicAdapter < Adapter - Droonga::LegacyPlugin.repository.register("adapter", self) - - command :table_create - def table_create(request) - unless envelope["dataset"] - raise "dataset must be set. FIXME: This error should return client." - end - broadcast_all(request) - end - - command :column_create - def column_create(request) - broadcast_all(request) - end - - command "watch.feed" => :feed - def feed(request) - broadcast_all(request) - end - - command "watch.subscribe" => :subscribe - def subscribe(request) - broadcast_all(request) - end - - command "watch.unsubscribe" => :unsubscribe - def unsubscribe(request) - broadcast_all(request) - end - - command :add - def add(request) - # TODO: update events must be serialized in the primary node of replicas. - key = request["key"] || rand.to_s - scatter_all(request, key) - end - - command :update - def update(request) - # TODO: update events must be serialized in the primary node of replicas. - key = request["key"] || rand.to_s - scatter_all(request, key) - end - - command :reset - def reset(request) - # TODO: update events must be serialized in the primary node of replicas. - key = request["key"] || rand.to_s - scatter_all(request, key) - end + class SearchDistributor < Droonga::DistributorPlugin + repository.register("search", self) command :search - def search(request) + def search(envelope) message = [] input_names = [] output_names = [] name_mapper = {} + request = envelope["body"] request["queries"].each do |input_name, query| output = query["output"] next unless output @@ -153,7 +74,7 @@ module Droonga "body"=> request } message.push(searcher) - post(message, "dispatcher") + post(message) end end end Copied: lib/droonga/plugin/distributor/watch.rb (+17 -18) 53% =================================================================== --- lib/droonga/plugin/adapter/groonga.rb 2013-11-24 17:40:03 +0900 (635c09e) +++ lib/droonga/plugin/distributor/watch.rb 2013-11-24 18:14:07 +0900 (07c5571) @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# # Copyright (C) 2013 Droonga Project # # This library is free software; you can redistribute it and/or @@ -13,28 +15,25 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "groonga" - -require "droonga/adapter" +require "droonga/distributor_plugin" module Droonga - class GroongaAdapter < Droonga::Adapter - # TODO: AdapterPlugin or something should be defined to avoid conflicts. - Droonga::LegacyPlugin.repository.register("select", self) - command :select - def select(select_request) - command = Select.new - search_request = command.convert_request(select_request) - add_route("select_response") - post(search_request, "search") + class WatchDistributor < Droonga::DistributorPlugin + repository.register("watch", self) + + command "watch.feed" => :feed + def feed(envelope) + broadcast_all(envelope) end - command :select_response - def select_response(search_response) - command = Select.new - emit(command.convert_response(search_response)) + command "watch.subscribe" => :subscribe + def subscribe(envelope) + broadcast_all(envelope) + end + + command "watch.unsubscribe" => :unsubscribe + def unsubscribe(envelope) + broadcast_all(envelope) end end end - -require "droonga/plugin/adapter/groonga/select" Modified: test/unit/test_adapter.rb (+9 -9) =================================================================== --- test/unit/test_adapter.rb 2013-11-24 17:40:03 +0900 (29e807f) +++ test/unit/test_adapter.rb 2013-11-24 18:14:07 +0900 (4f93455) @@ -17,15 +17,15 @@ require "droonga/adapter" class AdapterTest < Test::Unit::TestCase class AdaptTest < self - class GroongaAdapter < Droonga::Adapter - command :select - def select(request) - post(:search) do |response| - # do nothing - end - :selected - end - end + # class GroongaAdapter < Droonga::Adapter + # command :select + # def select(request) + # post(:search) do |response| + # # do nothing + # end + # :selected + # end + # end def setup omit("Pending") -------------- next part -------------- HTML����������������������������...Download