Kouhei Sutou
null+****@clear*****
Sun Nov 24 20:22:59 JST 2013
Kouhei Sutou 2013-11-24 20:22:59 +0900 (Sun, 24 Nov 2013) New Revision: 403f66265b11258c32ad17d697e55a2dd50ea193 https://github.com/droonga/fluent-plugin-droonga/commit/403f66265b11258c32ad17d697e55a2dd50ea193 Message: Move plugin code in executor to dispatcher Modified files: lib/droonga/dispatcher.rb lib/droonga/distributor.rb lib/droonga/executor.rb Modified: lib/droonga/dispatcher.rb (+35 -25) =================================================================== --- lib/droonga/dispatcher.rb 2013-11-24 19:40:06 +0900 (9a79017) +++ lib/droonga/dispatcher.rb 2013-11-24 20:22:59 +0900 (11fc611) @@ -33,16 +33,30 @@ module Droonga @collectors = {} @current_id = 0 @local = Regexp.new("^#{@name}") - plugins = (Droonga.catalog.option("plugins")||[]) - plugins.each do |plugin| - @worker.add_legacy_plugin(plugin) - end + Droonga::PluginLoader.load_all + load_legacy_plugins(Droonga.catalog.option("plugins")||[]) end def shutdown + @legacy_plugins.each do |legacy_plugin| + legacy_plugin.shutdown + end @farm.shutdown end + def processable?(command) + not find_legacy_plugin(command).nil? + end + + def process(command, body, *arguments) + legacy_plugin = find_legacy_plugin(command) + $log.trace("#{log_tag}: process: start: <#{command}>", + :plugin => legacy_plugin.class) + legacy_plugin.handle(command, body, *arguments) + $log.trace("#{log_tag}: process: done: <#{command}>", + :plugin => legacy_plugin.class) + end + def handle(message, arguments) case message when Array @@ -117,6 +131,23 @@ module Droonga route =~ @local end + private + def find_legacy_plugin(command) + @legacy_plugins.find do |plugin| + plugin.handlable?(command) + end + end + + def load_legacy_plugins(names) + @legacy_plugins = names.collect do |name| + LegacyPlugin.repository.instantiate(name, @worker) + end + end + + def log_tag + "[#{Process.ppid}][#{Process.pid}] dispatcher" + end + class Planner attr_reader :components class UndefinedInputError < StandardError @@ -246,25 +277,4 @@ module Droonga end end end - - class DispatcherMessageHandler < Droonga::LegacyPlugin - Droonga::LegacyPlugin.repository.register("dispatcher_message", self) - def initialize(*arguments) - super - @dispatcher = Droonga::Dispatcher.new(@worker, @worker.name) - end - - def shutdown - @dispatcher.shutdown - end - - command :dispatcher - def dispatcher(request, *arguments) - @dispatcher.handle(request, arguments) - end - - def prefer_synchronous?(command) - return true - end - end end Modified: lib/droonga/distributor.rb (+3 -3) =================================================================== --- lib/droonga/distributor.rb 2013-11-24 19:40:06 +0900 (e4eaf1a) +++ lib/droonga/distributor.rb 2013-11-24 20:22:59 +0900 (bf0d6bf) @@ -19,8 +19,8 @@ require "droonga/distributor_plugin" module Droonga class Distributor - def initialize(executor, options={}) - @executor = executor + def initialize(dispatcher, options={}) + @dispatcher = dispatcher @plugins = [] @options = options # TODO: don't put the default distributions @@ -46,7 +46,7 @@ module Droonga end def post(message) - @executor.post(message, "dispatcher") + @dispatcher.handle(message, []) end private Modified: lib/droonga/executor.rb (+6 -27) =================================================================== --- lib/droonga/executor.rb 2013-11-24 19:40:06 +0900 (da74bd9) +++ lib/droonga/executor.rb 2013-11-24 20:22:59 +0900 (f21d629) @@ -19,7 +19,6 @@ require "fluent-logger" require "fluent/logger/fluent_logger" require "groonga" -require "droonga/legacy_plugin" require "droonga/plugin_loader" require "droonga/dispatcher" require "droonga/distributor" @@ -29,24 +28,18 @@ module Droonga attr_reader :context, :envelope, :name def initialize(options={}) - @legacy_plugins = [] @outputs = {} @options = options @name = options[:name] @database_name = options[:database] @queue_name = options[:queue_name] || "DroongaQueue" @pool_size = options[:n_workers] || 0 -# load_plugins - Droonga::PluginLoader.load_all prepare end def shutdown $log.trace("#{log_tag}: shutdown: start") @distributor.shutdown - @legacy_plugins.each do |legacy_plugin| - legacy_plugin.shutdown - end @outputs.each do |dest, output| output[:logger].close if output[:logger] end @@ -58,11 +51,6 @@ module Droonga $log.trace("#{log_tag}: shutdown: done") end - def add_legacy_plugin(name) - legacy_plugin = LegacyPlugin.repository.instantiate(name, self) - @legacy_plugins << legacy_plugin - end - def add_route(route) envelope["via"].push(route) end @@ -117,13 +105,10 @@ module Droonga if receiver output(receiver, body, command, arguments) else - legacy_plugin = find_legacy_plugin(command) - if legacy_plugin - $log.trace("#{log_tag}: post_or_push: handle: start: <#{command}>", - :plugin => legacy_plugin.class) - legacy_plugin.handle(command, body, *arguments) - $log.trace("#{log_tag}: post_or_push: handle: done: <#{command}>", - :plugin => legacy_plugin.class) + if command == "dispatcher" + @dispatcher.handle(body, arguments) + elsif****@dispa*****?(command) + @dispatcher.process(command, body, *arguments) else @distributor.distribute(envelope.merge("type" => command, "body" => body)) @@ -203,14 +188,8 @@ module Droonga @context = Groonga::Context.new @database =****@conte*****_database(@database_name) end - @distributor = Distributor.new(self, @options) - add_legacy_plugin("dispatcher_message") - end - - def find_legacy_plugin(command) - @legacy_plugins.find do |legacy_plugin| - legacy_plugin.handlable?(command) - end + @dispatcher = Dispatcher.new(self, name) + @distributor = Distributor.new(@dispatcher, @options) end def get_output(host, port, params) -------------- next part -------------- HTML����������������������������...Download