[Groonga-commit] groonga/fluent-plugin-droonga:9566b4a [master] Move worker specific routines to Droonga::Worker

Back to archive index

Daijiro MORI null+****@clear*****
Wed Apr 17 20:06:20 JST 2013


Daijiro MORI	2013-04-17 20:06:20 +0900 (Wed, 17 Apr 2013)

  New Revision: 9566b4aee7bb0443fc850032d3f7287f4f51fdd5
  https://github.com/groonga/fluent-plugin-droonga/commit/9566b4aee7bb0443fc850032d3f7287f4f51fdd5

  Message:
    Move worker specific routines to Droonga::Worker

  Modified files:
    lib/droonga/worker.rb
    lib/fluent/plugin/out_droonga.rb
    test/test_output.rb
    test/test_worker.rb

  Modified: lib/droonga/worker.rb (+86 -31)
===================================================================
--- lib/droonga/worker.rb    2013-04-09 19:12:03 +0900 (b2061df)
+++ lib/droonga/worker.rb    2013-04-17 20:06:20 +0900 (a873ee9)
@@ -19,27 +19,31 @@ require "msgpack"
 require "fluent-logger"
 require "groonga"
 
+require "droonga/job_queue"
 require "droonga/handler_plugin"
+require "droonga/plugin"
 
 module Droonga
   class Worker
-    def initialize(database, queue_name)
-      @context = Groonga::Context.new
-      @database =****@conte*****_database(database)
-      @context.encoding = :none
-      @queue_name = queue_name
+    def initialize(options={})
+      @pool = []
       @handlers = []
       @outputs = {}
-      @finish = false
-      @status = :IDLE
-    end
-
-    def add_handler(name)
-      plugin = HandlerPlugin.new(name)
-      @handlers << plugin.instantiate(@context)
+      @database_name = options[:database] || "droonga/db"
+      @queue_name = options[:queue_name] || "DroongaQueue"
+      Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
+      @handler_names = options[:handlers] || ["search"]
+      load_handlers
+      pool_size = options[:pool_size] || 1
+      @pool = spawn(pool_size)
+      prepare
     end
 
     def shutdown
+      @pool.each do |pid|
+        # TODO: do it gracefully
+        Process.kill(:KILL, pid)
+      end
       @handlers.each do |handler|
         handler.shutdown
       end
@@ -51,7 +55,48 @@ module Droonga
       @database = @context = nil
     end
 
+    def dispatch(tag, time, record)
+      if****@pool*****?
+        process_message(record)
+      else
+        post_message(record)
+      end
+    end
+
+    def add_handler(name)
+      plugin = HandlerPlugin.new(name)
+      @handlers << plugin.instantiate(@context)
+    end
+
+    def process_message(envelope)
+      command = envelope["type"]
+      handler = find_handler(command)
+      return unless handler
+      result = handler.handle(command, envelope["body"])
+      output = get_output(envelope)
+      if output
+        response = {
+          inReplyTo: envelope["id"],
+          statusCode: 200,
+          type: (envelope["type"] || "") + ".result",
+          body: result
+        }
+        output.post("message", response)
+      end
+    end
+
+    private
+    def post_message(envelope)
+      message = envelope.to_msgpack
+      queue = @context[@queue_name]
+      queue.push do |record|
+        record.message = message
+      end
+    end
+
     def start
+      @finish = false
+      @status = :IDLE
       # TODO: doesn't work
       Signal.trap(:TERM) do
         @finish = true
@@ -72,31 +117,41 @@ module Droonga
       end
     end
 
-    def post_message(envelope)
-      message = envelope.to_msgpack
-      queue = @context[@queue_name]
-      queue.push do |record|
-        record.message = message
+    def spawn(pool_size)
+      pool = []
+      pool_size.times do
+        pid = Process.fork
+        if pid
+          pool << pid
+          next
+        end
+        # child process
+        begin
+          prepare
+          start
+          shutdown
+          exit! 0
+        end
       end
+      pool
     end
 
-    def process_message(envelope)
-      command = envelope["type"]
-      handler = find_handler(command)
-      result = handler.handle(command, envelope["body"])
-      output = get_output(envelope)
-      if output
-        response = {
-          inReplyTo: envelope["id"],
-          statusCode: 200,
-          type: (envelope["type"] || "") + ".result",
-          body: result
-        }
-        output.post("message", response)
+    def load_handlers
+      @handler_names.each do |handler_name|
+        plugin = Droonga::Plugin.new("handler", handler_name)
+        plugin.load
+      end
+    end
+
+    def prepare
+      @context = Groonga::Context.new
+      @database =****@conte*****_database(@database_name)
+      @context.encoding = :none
+      @handler_names.each do |handler_name|
+        add_handler(handler_name)
       end
     end
 
-    private
     def find_handler(command)
       @handlers.find do |handler|
         handler.handlable?(command)

  Modified: lib/fluent/plugin/out_droonga.rb (+5 -51)
===================================================================
--- lib/fluent/plugin/out_droonga.rb    2013-04-09 19:12:03 +0900 (85c1561)
+++ lib/fluent/plugin/out_droonga.rb    2013-04-17 20:06:20 +0900 (89bb95a)
@@ -15,9 +15,7 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/job_queue"
 require "droonga/worker"
-require "droonga/plugin"
 
 module Fluent
   class DroongaOutput < Output
@@ -30,68 +28,24 @@ module Fluent
       value.split(/\s*,\s*/)
     end
 
-    def configure(conf)
-      super
-      Droonga::JobQueue.ensure_schema(@database, @queue_name)
-      load_handlers
-    end
-
     def start
       super
-      @workers = []
-      @n_workers.times do
-        pid = Process.fork
-        if pid
-          @workers << pid
-          next
-        end
-        # child process
-        begin
-          create_worker.start
-          exit! 0
-        end
-      end
-      @worker = create_worker
+      @worker = Droonga::Worker.new(:database => @database,
+                                    :queue_name => @queue_name,
+                                    :pool_size => @n_workers,
+                                    :handlers => @handlers)
     end
 
     def shutdown
       super
       @worker.shutdown
-      @workers.each do |pid|
-        Process.kill(:KILL, pid)
-      end
     end
 
     def emit(tag, es, chain)
       es.each do |time, record|
-        # Merge it if needed
-        dispatch(tag, time, record)
+        @worker.dispatch(tag, time, record)
       end
       chain.next
     end
-
-    def dispatch(tag, time, record)
-      if****@worke*****?
-        @worker.process_message(record)
-      else
-        @worker.post_message(record)
-      end
-    end
-
-    private
-    def load_handlers
-      @handlers.each do |handler_name|
-        plugin = Droonga::Plugin.new("handler", handler_name)
-        plugin.load
-      end
-    end
-
-    def create_worker
-      worker = Droonga::Worker.new(@database, @queue_name)
-      @handlers.each do |handler_name|
-        worker.add_handler(handler_name)
-      end
-      worker
-    end
   end
 end

  Modified: test/test_output.rb (+3 -3)
===================================================================
--- test/test_output.rb    2013-04-09 19:12:03 +0900 (38d2552)
+++ test/test_output.rb    2013-04-17 20:06:20 +0900 (96ddab9)
@@ -23,7 +23,7 @@ module OutputStub
       @processed_record = nil
     end
 
-    def process_message(record)
+    def dispatch(tag, time, record)
       @processed_record = record
       @response
     end
@@ -56,8 +56,8 @@ module OutputStub
       super()
     end
 
-    def create_worker
-      Worker.new(@response)
+    def start
+      @worker = Worker.new(@response)
     end
 
     def create_logger(tag, options)

  Modified: test/test_worker.rb (+2 -1)
===================================================================
--- test/test_worker.rb    2013-04-09 19:12:03 +0900 (f47426c)
+++ test/test_worker.rb    2013-04-17 20:06:20 +0900 (e214969)
@@ -43,7 +43,8 @@ class WorkerTest < Test::Unit::TestCase
   end
 
   def setup_worker
-    @worker = Droonga::Worker.new(@database_path.to_s, "DroongaQueue")
+    @worker = Droonga::Worker.new(:database => @database_path.to_s,
+                                  :queue_name => "DroongaQueue")
     @worker.add_handler("search")
   end
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index