Daijiro MORI
null+****@clear*****
Wed Aug 21 13:15:56 JST 2013
Daijiro MORI 2013-08-21 13:15:56 +0900 (Wed, 21 Aug 2013) New Revision: e7f41b8b98181502fceba807cf5317210f78f2ed https://github.com/droonga/fluent-plugin-droonga/commit/e7f41b8b98181502fceba807cf5317210f78f2ed Message: Make use of Executor class in worker.rb. Modified files: lib/droonga/executor.rb lib/droonga/worker.rb Modified: lib/droonga/executor.rb (+25 -4) =================================================================== --- lib/droonga/executor.rb 2013-08-21 12:31:31 +0900 (3d2720b) +++ lib/droonga/executor.rb 2013-08-21 13:15:56 +0900 (db7eb0b) @@ -37,10 +37,23 @@ module Droonga @queue_name = options[:queue_name] || "DroongaQueue" Droonga::JobQueue.ensure_schema(@database_name, @queue_name) @handler_names = options[:handlers] || ["proxy"] + @pool_size = options[:n_workers] load_handlers - @pool_size = options[:pool_size] || 1 prepare - end + end + + def shutdown + @handlers.each do |handler| + handler.shutdown + end + @outputs.each do |dest, output| + output[:logger].close if output[:logger] + end + @queue = nil + @database.close + @context.close + @database = @context = nil + end def add_handler(name) plugin = HandlerPlugin.new(name) @@ -56,6 +69,14 @@ module Droonga post_or_push(message, body, "type" => type, "arguments" => arguments) end + def execute_one + message = pull_message + return unless message + body, command, arguments = parse_message(message) + handler = find_handler(command) + handler.handle(command, body, *arguments) if handler + end + def post(body, destination=nil) post_or_push(nil, body, destination) end @@ -151,8 +172,7 @@ module Droonga def push_message(message) packed_message = message.to_msgpack - queue = @context[@queue_name] - queue.push do |record| + @queue.push do |record| record.message = packed_message end end @@ -180,6 +200,7 @@ module Droonga @context = Groonga::Context.new @database =****@conte*****_database(@database_name) @context.encoding = :none + @queue = @context[@queue_name] @handler_names.each do |handler_name| add_handler(handler_name) end Modified: lib/droonga/worker.rb (+3 -200) =================================================================== --- lib/droonga/worker.rb 2013-08-21 12:31:31 +0900 (47a7626) +++ lib/droonga/worker.rb 2013-08-21 13:15:56 +0900 (248072c) @@ -30,40 +30,18 @@ module Droonga attr_reader :context, :envelope, :name def initialize - @handlers = [] - @outputs = {} - @name = config[:name] - @database_name = config[:database] || "droonga/db" - @queue_name = config[:queue_name] || "DroongaQueue" - @handler_names = config[:handlers] || ["proxy"] - @pool_size = config[:n_workers] - load_handlers - prepare + @executor = Executor.new(config) end def run $log.trace("worker: run: start") - @queue = @context[@queue_name] @running = true while @running $log.trace("worker: run: pull_message: start") - message = pull_message + @executor.execute_one $log.trace("worker: run: pull_message: done") - next unless message - body, command, arguments = parse_message(message) - handler = find_handler(command) - handler.handle(command, body, *arguments) if handler end - @handlers.each do |handler| - handler.shutdown - end - @outputs.each do |dest, output| - output[:logger].close if output[:logger] - end - @queue = nil - @database.close - @context.close - @database = @context = nil + @executor.shutdown $log.trace("worker: run: done") end @@ -73,19 +51,6 @@ module Droonga $log.trace("worker: stop: done") end - def add_handler(name) - plugin = HandlerPlugin.new(name) - @handlers << plugin.instantiate(self) - end - - def add_route(route) - envelope["via"].push(route) - end - - def post(body, destination=nil) - post_or_push(nil, body, destination) - end - private def shutdown_workers @pool.each do |pid| @@ -104,167 +69,5 @@ module Droonga Process.kill(:KILL, pid) end end - - def post_or_push(message, body, destination) - route = nil - unless destination - route = envelope["via"].pop - destination = route - end - command = nil - receiver = nil - arguments = nil - synchronous = nil - case destination - when String - command = destination - when Hash - command = destination["type"] - receiver = destination["to"] - arguments = destination["arguments"] - synchronous = destination["synchronous"] - else - receiver = envelope["replyTo"] - end - if receiver - output(receiver, body, command, arguments) - else - handler = find_handler(command) - if handler - if synchronous.nil? - synchronous = handler.prefer_synchronous?(command) - end - if route || @pool_size.zero? || synchronous - handler.handle(command, body, *arguments) - else - unless message - envelope["body"] = body - envelope["type"] = command - envelope["arguments"] = arguments - message = ['', Time.now.to_f, envelope] - end - push_message(message) - end - end - end - add_route(route) if route - end - - def output(receiver, body, command, arguments) - return nil unless receiver - unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ - raise "format: hostname:port/tag(?params)" - end - host = $1 - port = $2 - tag = $3 - params = $4 - output = get_output(host, port, params) - return unless output - if command - message = envelope - message["body"] = body - message["type"] = command - message["arguments"] = arguments - else - message = { - inReplyTo: envelope["id"], - statusCode: 200, - type: (envelope["type"] || "") + ".result", - body: body - } - end - output.post(tag + ".message", message) - end - - def parse_message(message) - tag, time, record = message - prefix, type, *arguments = tag.split(/\./) - if type.nil? || type.empty? || type == 'message' - @envelope = record - else - @envelope = { - "type" => type, - "arguments" => arguments, - "body" => record - } - end - envelope["via"] ||= [] - [envelope["body"], envelope["type"], envelope["arguments"]] - end - - def push_message(message) - packed_message = message.to_msgpack - queue = @context[@queue_name] - queue.push do |record| - record.message = packed_message - end - end - - def pull_message - packed_message = nil - @queue.pull do |record| - if record - packed_message = record.message - record.delete - end - end - return nil unless packed_message - MessagePack.unpack(packed_message) - end - - def load_handlers - @handler_names.each do |handler_name| - plugin = Droonga::Plugin.new("handler", handler_name) - plugin.load - end - end - - def prepare - @context = Groonga::Context.new - @database =****@conte*****_database(@database_name) - @context.encoding = :none - @handler_names.each do |handler_name| - add_handler(handler_name) - end - add_handler("proxy_message") - end - - def find_handler(command) - @handlers.find do |handler| - handler.handlable?(command) - end - end - - def get_output(host, port, params) - host_port = "#{host}:#{port}" - @outputs[host_port] ||= {} - output = @outputs[host_port] - - has_connection_id = (not params.nil? \ - and params =~ /[\?&;]connection_id=([^&;]+)/) - if output[:logger].nil? or has_connection_id - connection_id = $1 - if not has_connection_id or output[:connection_id] != connection_id - output[:connection_id] = connection_id - logger = create_logger(:host => host, :port => port.to_i) - # output[:logger] should be closed if it exists beforehand? - output[:logger] = logger - end - end - - has_client_session_id = (not params.nil? \ - and params =~ /[\?&;]client_session_id=([^&;]+)/) - if has_client_session_id - client_session_id = $1 - # some generic way to handle client_session_id is expected - end - - output[:logger] - end - - def create_logger(options) - Fluent::Logger::FluentLogger.new(nil, options) - end end end -------------- next part -------------- HTML����������������������������...Download