[Groonga-commit] droonga/fluent-plugin-droonga at e7f41b8 [master] Make use of Executor class in worker.rb.

Back to archive index

Daijiro MORI null+****@clear*****
Wed Aug 21 13:15:56 JST 2013


Daijiro MORI	2013-08-21 13:15:56 +0900 (Wed, 21 Aug 2013)

  New Revision: e7f41b8b98181502fceba807cf5317210f78f2ed
  https://github.com/droonga/fluent-plugin-droonga/commit/e7f41b8b98181502fceba807cf5317210f78f2ed

  Message:
    Make use of Executor class in worker.rb.

  Modified files:
    lib/droonga/executor.rb
    lib/droonga/worker.rb

  Modified: lib/droonga/executor.rb (+25 -4)
===================================================================
--- lib/droonga/executor.rb    2013-08-21 12:31:31 +0900 (3d2720b)
+++ lib/droonga/executor.rb    2013-08-21 13:15:56 +0900 (db7eb0b)
@@ -37,10 +37,23 @@ module Droonga
        @queue_name = options[:queue_name] || "DroongaQueue"
        Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
        @handler_names = options[:handlers] || ["proxy"]
+       @pool_size = options[:n_workers]
        load_handlers
-       @pool_size = options[:pool_size] || 1
        prepare
-     end
+    end
+
+    def shutdown
+      @handlers.each do |handler|
+        handler.shutdown
+      end
+      @outputs.each do |dest, output|
+        output[:logger].close if output[:logger]
+      end
+      @queue = nil
+      @database.close
+      @context.close
+      @database = @context = nil
+    end
 
     def add_handler(name)
       plugin = HandlerPlugin.new(name)
@@ -56,6 +69,14 @@ module Droonga
       post_or_push(message, body, "type" => type, "arguments" => arguments)
     end
 
+    def execute_one
+      message = pull_message
+      return unless message
+      body, command, arguments = parse_message(message)
+      handler = find_handler(command)
+      handler.handle(command, body, *arguments) if handler
+    end
+
     def post(body, destination=nil)
       post_or_push(nil, body, destination)
     end
@@ -151,8 +172,7 @@ module Droonga
 
     def push_message(message)
       packed_message = message.to_msgpack
-      queue = @context[@queue_name]
-      queue.push do |record|
+      @queue.push do |record|
         record.message = packed_message
       end
     end
@@ -180,6 +200,7 @@ module Droonga
       @context = Groonga::Context.new
       @database =****@conte*****_database(@database_name)
       @context.encoding = :none
+      @queue = @context[@queue_name]
       @handler_names.each do |handler_name|
         add_handler(handler_name)
       end

  Modified: lib/droonga/worker.rb (+3 -200)
===================================================================
--- lib/droonga/worker.rb    2013-08-21 12:31:31 +0900 (47a7626)
+++ lib/droonga/worker.rb    2013-08-21 13:15:56 +0900 (248072c)
@@ -30,40 +30,18 @@ module Droonga
     attr_reader :context, :envelope, :name
 
     def initialize
-      @handlers = []
-      @outputs = {}
-      @name = config[:name]
-      @database_name = config[:database] || "droonga/db"
-      @queue_name = config[:queue_name] || "DroongaQueue"
-      @handler_names = config[:handlers] || ["proxy"]
-      @pool_size = config[:n_workers]
-      load_handlers
-      prepare
+      @executor = Executor.new(config)
     end
 
     def run
       $log.trace("worker: run: start")
-      @queue = @context[@queue_name]
       @running = true
       while @running
         $log.trace("worker: run: pull_message: start")
-        message = pull_message
+        @executor.execute_one
         $log.trace("worker: run: pull_message: done")
-        next unless message
-        body, command, arguments = parse_message(message)
-        handler = find_handler(command)
-        handler.handle(command, body, *arguments) if handler
       end
-      @handlers.each do |handler|
-        handler.shutdown
-      end
-      @outputs.each do |dest, output|
-        output[:logger].close if output[:logger]
-      end
-      @queue = nil
-      @database.close
-      @context.close
-      @database = @context = nil
+      @executor.shutdown
       $log.trace("worker: run: done")
     end
 
@@ -73,19 +51,6 @@ module Droonga
       $log.trace("worker: stop: done")
     end
 
-    def add_handler(name)
-      plugin = HandlerPlugin.new(name)
-      @handlers << plugin.instantiate(self)
-    end
-
-    def add_route(route)
-      envelope["via"].push(route)
-    end
-
-    def post(body, destination=nil)
-      post_or_push(nil, body, destination)
-    end
-
     private
     def shutdown_workers
       @pool.each do |pid|
@@ -104,167 +69,5 @@ module Droonga
         Process.kill(:KILL, pid)
       end
     end
-
-    def post_or_push(message, body, destination)
-      route = nil
-      unless destination
-        route = envelope["via"].pop
-        destination = route
-      end
-      command = nil
-      receiver = nil
-      arguments = nil
-      synchronous = nil
-      case destination
-      when String
-        command = destination
-      when Hash
-        command = destination["type"]
-        receiver = destination["to"]
-        arguments = destination["arguments"]
-        synchronous = destination["synchronous"]
-      else
-        receiver = envelope["replyTo"]
-      end
-      if receiver
-        output(receiver, body, command, arguments)
-      else
-        handler = find_handler(command)
-        if handler
-          if synchronous.nil?
-            synchronous = handler.prefer_synchronous?(command)
-          end
-          if route || @pool_size.zero? || synchronous
-            handler.handle(command, body, *arguments)
-          else
-            unless message
-              envelope["body"] = body
-              envelope["type"] = command
-              envelope["arguments"] = arguments
-              message = ['', Time.now.to_f, envelope]
-            end
-            push_message(message)
-          end
-        end
-      end
-      add_route(route) if route
-    end
-
-    def output(receiver, body, command, arguments)
-      return nil unless receiver
-      unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
-        raise "format: hostname:port/tag(?params)"
-      end
-      host = $1
-      port = $2
-      tag  = $3
-      params = $4
-      output = get_output(host, port, params)
-      return unless output
-      if command
-        message = envelope
-        message["body"] = body
-        message["type"] = command
-        message["arguments"] = arguments
-      else
-        message = {
-          inReplyTo: envelope["id"],
-          statusCode: 200,
-          type: (envelope["type"] || "") + ".result",
-          body: body
-        }
-      end
-      output.post(tag + ".message", message)
-    end
-
-    def parse_message(message)
-      tag, time, record = message
-      prefix, type, *arguments = tag.split(/\./)
-      if type.nil? || type.empty? || type == 'message'
-        @envelope = record
-      else
-        @envelope = {
-          "type" => type,
-          "arguments" => arguments,
-          "body" => record
-        }
-      end
-      envelope["via"] ||= []
-      [envelope["body"], envelope["type"], envelope["arguments"]]
-    end
-
-    def push_message(message)
-      packed_message = message.to_msgpack
-      queue = @context[@queue_name]
-      queue.push do |record|
-        record.message = packed_message
-      end
-    end
-
-    def pull_message
-      packed_message = nil
-      @queue.pull do |record|
-        if record
-          packed_message = record.message
-          record.delete
-        end
-      end
-      return nil unless packed_message
-      MessagePack.unpack(packed_message)
-    end
-
-    def load_handlers
-      @handler_names.each do |handler_name|
-        plugin = Droonga::Plugin.new("handler", handler_name)
-        plugin.load
-      end
-    end
-
-    def prepare
-      @context = Groonga::Context.new
-      @database =****@conte*****_database(@database_name)
-      @context.encoding = :none
-      @handler_names.each do |handler_name|
-        add_handler(handler_name)
-      end
-      add_handler("proxy_message")
-    end
-
-    def find_handler(command)
-      @handlers.find do |handler|
-        handler.handlable?(command)
-      end
-    end
-
-    def get_output(host, port, params)
-      host_port = "#{host}:#{port}"
-      @outputs[host_port] ||= {}
-      output = @outputs[host_port]
-
-      has_connection_id = (not params.nil? \
-                           and params =~ /[\?&;]connection_id=([^&;]+)/)
-      if output[:logger].nil? or has_connection_id
-        connection_id = $1
-        if not has_connection_id or output[:connection_id] != connection_id
-          output[:connection_id] = connection_id
-          logger = create_logger(:host => host, :port => port.to_i)
-          # output[:logger] should be closed if it exists beforehand?
-          output[:logger] = logger
-        end
-      end
-
-      has_client_session_id = (not params.nil? \
-                               and params =~ /[\?&;]client_session_id=([^&;]+)/)
-      if has_client_session_id
-        client_session_id = $1
-        # some generic way to handle client_session_id is expected
-      end
-
-      output[:logger]
-    end
-
-    def create_logger(options)
-      Fluent::Logger::FluentLogger.new(nil, options)
-    end
   end
 end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index