[Groonga-commit] groonga/fluent-plugin-droonga [master] Prefork workers

Back to archive index

Daijiro MORI null+****@clear*****
Wed Apr 3 13:49:37 JST 2013


Daijiro MORI	2013-04-03 13:49:37 +0900 (Wed, 03 Apr 2013)

  New Revision: 31290186f636b2fa1ae0568aead9710773645622
  https://github.com/groonga/fluent-plugin-droonga/commit/31290186f636b2fa1ae0568aead9710773645622

  Message:
    Prefork workers

  Modified files:
    lib/droonga/worker.rb
    lib/fluent/plugin/out_droonga.rb

  Modified: lib/droonga/worker.rb (+95 -1)
===================================================================
--- lib/droonga/worker.rb    2013-04-02 18:13:35 +0900 (fd8561a)
+++ lib/droonga/worker.rb    2013-04-03 13:49:37 +0900 (71e4318)
@@ -17,6 +17,8 @@
 
 require 'groonga'
 require "droonga/handler_plugin"
+require "fluent-logger"
+require "json"
 
 module Droonga
   class Worker
@@ -25,6 +27,9 @@ module Droonga
       @database =****@conte*****_database(database)
       @queue_name = queue_name
       @handlers = []
+      @outputs = {}
+      @finish = false
+      @status = :IDLE
     end
 
     def add_handler(name)
@@ -36,15 +41,63 @@ module Droonga
       @handlers.each do |handler|
         handler.shutdown
       end
+      @outputs.each do |dest, output|
+        output[:logger].close if output[:logger]
+      end
       @database.close
       @context.close
       @database = @context = nil
     end
 
+    def start
+      # TODO: doesn't work
+      Signal.trap(:TERM) do
+        @finish = true
+        exit! 0 if @status == :IDLE
+      end
+      queue = @context[@queue_name]
+      while !@finish
+        value = nil
+        queue.pull do |record|
+          @status = :BUSY
+          value = record["value"] if record
+        end
+        if value
+#         value.force_encoding("UTF-8")
+#         envelope = MessagePack.unpack(value)
+          envelope = JSON.parse(value)
+          process_message(envelope) if value
+        end
+        @status = :IDLE
+      end
+    end
+
+    def post_message(envelope)
+#     value = envelope.to_msgpack
+#     value.force_encoding("UTF-8")
+      value = envelope.to_json
+      queue = @context[@queue_name]
+      queue.push do |record|
+        record["value"] = value
+      end
+    end
+
     def process_message(envelope)
       command = envelope["type"]
       handler = find_handler(command)
-      handler.handle(command, envelope["body"])
+      result = handler.handle(command, envelope["body"])
+      output = get_output(envelope)
+      if output
+        response = {
+          inReplyTo: envelope["id"],
+          statusCode: 200,
+          type: (envelope["type"] || "") + ".result",
+          body: {
+            result: result
+          }
+        }
+        output.post("message", response)
+      end
     end
 
     private
@@ -53,5 +106,46 @@ module Droonga
         handler.handlable?(command)
       end
     end
+
+    def get_output(event)
+      receiver = event["replyTo"]
+      return nil unless receiver
+      unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
+        raise "format: hostname:port/tag(?params)"
+      end
+      host = $1
+      port = $2
+      tag  = $3
+      params = $4
+
+      host_port = "#{host}:#{port}"
+      @outputs[host_port] ||= {}
+      output = @outputs[host_port]
+
+      has_connection_id = (not params.nil? \
+                           and params =~ /[\?&;]connection_id=([^&;]+)/)
+      if output[:logger].nil? or has_connection_id
+        connection_id = $1
+        if not has_connection_id or output[:connection_id] != connection_id
+          output[:connection_id] = connection_id
+          logger = create_logger(tag, :host => host, :port => port.to_i)
+          # output[:logger] should be closed if it exists beforehand?
+          output[:logger] = logger
+        end
+      end
+
+      has_client_session_id = (not params.nil? \
+                               and params =~ /[\?&;]client_session_id=([^&;]+)/)
+      if has_client_session_id
+        client_session_id = $1
+        # some generic way to handle client_session_id is expected
+      end
+
+      output[:logger]
+    end
+
+    def create_logger(tag, options)
+      Fluent::Logger::FluentLogger.new(tag, options)
+    end
   end
 end

  Modified: lib/fluent/plugin/out_droonga.rb (+20 -63)
===================================================================
--- lib/fluent/plugin/out_droonga.rb    2013-04-02 18:13:35 +0900 (f5ac822)
+++ lib/fluent/plugin/out_droonga.rb    2013-04-03 13:49:37 +0900 (3485e8d)
@@ -15,7 +15,6 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "fluent-logger"
 require "droonga/worker"
 require "droonga/plugin"
 
@@ -23,6 +22,7 @@ module Fluent
   class DroongaOutput < Output
     Plugin.register_output("droonga", self)
 
+    config_param :n_workers, :integer, :default => 1
     config_param :database, :string, :default => "droonga.db"
     config_param :queue_name, :string, :default => "DroongaQueue"
     config_param :handlers, :default => [] do |value|
@@ -36,16 +36,27 @@ module Fluent
 
     def start
       super
-      # prefork @workers
+      @workers = []
+      @n_workers.times do
+        pid = Process.fork
+        if pid
+          @workers << pid
+          next
+        end
+        # child process
+        begin
+          create_worker.start
+          exit! 0
+        end
+      end
       @worker = create_worker
-      @outputs = {}
     end
 
     def shutdown
       super
       @worker.shutdown
-      @outputs.each do |dest, output|
-        output[:logger].close if output[:logger]
+      @workers.each do |pid|
+        Process.kill(:TERM, pid)
       end
     end
 
@@ -58,60 +69,10 @@ module Fluent
     end
 
     def dispatch(tag, time, record)
-      # Post to peers or execute it as needed
-      exec(tag, time, record)
-    end
-
-    def get_output(event)
-      receiver = event["replyTo"]
-      return nil unless receiver
-      unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
-        raise "format: hostname:port/tag(?params)"
-      end
-      host = $1
-      port = $2
-      tag  = $3
-      params = $4
-
-      host_port = "#{host}:#{port}"
-      @outputs[host_port] ||= {}
-      output = @outputs[host_port]
-
-      has_connection_id = (not params.nil? \
-                           and params =~ /[\?&;]connection_id=([^&;]+)/)
-      if output[:logger].nil? or has_connection_id
-        connection_id = $1
-        if not has_connection_id or output[:connection_id] != connection_id
-          output[:connection_id] = connection_id
-          logger = create_logger(tag, :host => host, :port => port.to_i)
-          # output[:logger] should be closed if it exists beforehand?
-          output[:logger] = logger
-        end
-      end
-
-      has_client_session_id = (not params.nil? \
-                               and params =~ /[\?&;]client_session_id=([^&;]+)/)
-      if has_client_session_id
-        client_session_id = $1
-        # some generic way to handle client_session_id is expected
-      end
-
-      output[:logger]
-    end
-
-    def exec(tag, time, record)
-      result =****@worke*****_message(record)
-      output = get_output(record)
-      if output
-        response = {
-          inReplyTo: record["id"],
-          statusCode: 200,
-          type: (record["type"] || "") + ".result",
-          body: {
-            result: result
-          }
-        }
-        output.post("message", response)
+      if****@worke*****?
+        @worker.process_message(record)
+      else
+        @worker.post_message(record)
       end
     end
 
@@ -130,9 +91,5 @@ module Fluent
       end
       worker
     end
-
-    def create_logger(tag, options)
-      Fluent::Logger::FluentLogger.new(tag, options)
-    end
   end
 end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index