[Groonga-commit] droonga/fluent-plugin-droonga at 403f662 [master] Move plugin code in executor to dispatcher

Back to archive index

Kouhei Sutou null+****@clear*****
Sun Nov 24 20:22:59 JST 2013


Kouhei Sutou	2013-11-24 20:22:59 +0900 (Sun, 24 Nov 2013)

  New Revision: 403f66265b11258c32ad17d697e55a2dd50ea193
  https://github.com/droonga/fluent-plugin-droonga/commit/403f66265b11258c32ad17d697e55a2dd50ea193

  Message:
    Move plugin code in executor to dispatcher

  Modified files:
    lib/droonga/dispatcher.rb
    lib/droonga/distributor.rb
    lib/droonga/executor.rb

  Modified: lib/droonga/dispatcher.rb (+35 -25)
===================================================================
--- lib/droonga/dispatcher.rb    2013-11-24 19:40:06 +0900 (9a79017)
+++ lib/droonga/dispatcher.rb    2013-11-24 20:22:59 +0900 (11fc611)
@@ -33,16 +33,30 @@ module Droonga
       @collectors = {}
       @current_id = 0
       @local = Regexp.new("^#{@name}")
-      plugins = (Droonga.catalog.option("plugins")||[])
-      plugins.each do |plugin|
-        @worker.add_legacy_plugin(plugin)
-      end
+      Droonga::PluginLoader.load_all
+      load_legacy_plugins(Droonga.catalog.option("plugins")||[])
     end
 
     def shutdown
+      @legacy_plugins.each do |legacy_plugin|
+        legacy_plugin.shutdown
+      end
       @farm.shutdown
     end
 
+    def processable?(command)
+      not find_legacy_plugin(command).nil?
+    end
+
+    def process(command, body, *arguments)
+      legacy_plugin = find_legacy_plugin(command)
+      $log.trace("#{log_tag}: process: start: <#{command}>",
+                 :plugin => legacy_plugin.class)
+      legacy_plugin.handle(command, body, *arguments)
+      $log.trace("#{log_tag}: process: done: <#{command}>",
+                 :plugin => legacy_plugin.class)
+    end
+
     def handle(message, arguments)
       case message
       when Array
@@ -117,6 +131,23 @@ module Droonga
       route =~ @local
     end
 
+    private
+    def find_legacy_plugin(command)
+      @legacy_plugins.find do |plugin|
+        plugin.handlable?(command)
+      end
+    end
+
+    def load_legacy_plugins(names)
+      @legacy_plugins = names.collect do |name|
+        LegacyPlugin.repository.instantiate(name, @worker)
+      end
+    end
+
+    def log_tag
+      "[#{Process.ppid}][#{Process.pid}] dispatcher"
+    end
+
     class Planner
       attr_reader :components
       class UndefinedInputError < StandardError
@@ -246,25 +277,4 @@ module Droonga
       end
     end
   end
-
-  class DispatcherMessageHandler < Droonga::LegacyPlugin
-    Droonga::LegacyPlugin.repository.register("dispatcher_message", self)
-    def initialize(*arguments)
-      super
-      @dispatcher = Droonga::Dispatcher.new(@worker, @worker.name)
-    end
-
-    def shutdown
-      @dispatcher.shutdown
-    end
-
-    command :dispatcher
-    def dispatcher(request, *arguments)
-      @dispatcher.handle(request, arguments)
-    end
-
-    def prefer_synchronous?(command)
-      return true
-    end
-  end
 end

  Modified: lib/droonga/distributor.rb (+3 -3)
===================================================================
--- lib/droonga/distributor.rb    2013-11-24 19:40:06 +0900 (e4eaf1a)
+++ lib/droonga/distributor.rb    2013-11-24 20:22:59 +0900 (bf0d6bf)
@@ -19,8 +19,8 @@ require "droonga/distributor_plugin"
 
 module Droonga
   class Distributor
-    def initialize(executor, options={})
-      @executor = executor
+    def initialize(dispatcher, options={})
+      @dispatcher = dispatcher
       @plugins = []
       @options = options
       # TODO: don't put the default distributions
@@ -46,7 +46,7 @@ module Droonga
     end
 
     def post(message)
-      @executor.post(message, "dispatcher")
+      @dispatcher.handle(message, [])
     end
 
     private

  Modified: lib/droonga/executor.rb (+6 -27)
===================================================================
--- lib/droonga/executor.rb    2013-11-24 19:40:06 +0900 (da74bd9)
+++ lib/droonga/executor.rb    2013-11-24 20:22:59 +0900 (f21d629)
@@ -19,7 +19,6 @@ require "fluent-logger"
 require "fluent/logger/fluent_logger"
 require "groonga"
 
-require "droonga/legacy_plugin"
 require "droonga/plugin_loader"
 require "droonga/dispatcher"
 require "droonga/distributor"
@@ -29,24 +28,18 @@ module Droonga
     attr_reader :context, :envelope, :name
 
     def initialize(options={})
-      @legacy_plugins = []
       @outputs = {}
       @options = options
       @name = options[:name]
       @database_name = options[:database]
       @queue_name = options[:queue_name] || "DroongaQueue"
       @pool_size = options[:n_workers] || 0
-#     load_plugins
-      Droonga::PluginLoader.load_all
       prepare
     end
 
     def shutdown
       $log.trace("#{log_tag}: shutdown: start")
       @distributor.shutdown
-      @legacy_plugins.each do |legacy_plugin|
-        legacy_plugin.shutdown
-      end
       @outputs.each do |dest, output|
         output[:logger].close if output[:logger]
       end
@@ -58,11 +51,6 @@ module Droonga
       $log.trace("#{log_tag}: shutdown: done")
     end
 
-    def add_legacy_plugin(name)
-      legacy_plugin = LegacyPlugin.repository.instantiate(name, self)
-      @legacy_plugins << legacy_plugin
-    end
-
     def add_route(route)
       envelope["via"].push(route)
     end
@@ -117,13 +105,10 @@ module Droonga
       if receiver
         output(receiver, body, command, arguments)
       else
-        legacy_plugin = find_legacy_plugin(command)
-        if legacy_plugin
-          $log.trace("#{log_tag}: post_or_push: handle: start: <#{command}>",
-                     :plugin => legacy_plugin.class)
-          legacy_plugin.handle(command, body, *arguments)
-          $log.trace("#{log_tag}: post_or_push: handle: done: <#{command}>",
-                     :plugin => legacy_plugin.class)
+        if command == "dispatcher"
+          @dispatcher.handle(body, arguments)
+        elsif****@dispa*****?(command)
+          @dispatcher.process(command, body, *arguments)
         else
           @distributor.distribute(envelope.merge("type" => command,
                                                  "body" => body))
@@ -203,14 +188,8 @@ module Droonga
         @context = Groonga::Context.new
         @database =****@conte*****_database(@database_name)
       end
-      @distributor = Distributor.new(self, @options)
-      add_legacy_plugin("dispatcher_message")
-    end
-
-    def find_legacy_plugin(command)
-      @legacy_plugins.find do |legacy_plugin|
-        legacy_plugin.handlable?(command)
-      end
+      @dispatcher = Dispatcher.new(self, name)
+      @distributor = Distributor.new(@dispatcher, @options)
     end
 
     def get_output(host, port, params)
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index