Daijiro MORI
null+****@clear*****
Wed Apr 17 20:06:20 JST 2013
Daijiro MORI 2013-04-17 20:06:20 +0900 (Wed, 17 Apr 2013) New Revision: 9566b4aee7bb0443fc850032d3f7287f4f51fdd5 https://github.com/groonga/fluent-plugin-droonga/commit/9566b4aee7bb0443fc850032d3f7287f4f51fdd5 Message: Move worker specific routines to Droonga::Worker Modified files: lib/droonga/worker.rb lib/fluent/plugin/out_droonga.rb test/test_output.rb test/test_worker.rb Modified: lib/droonga/worker.rb (+86 -31) =================================================================== --- lib/droonga/worker.rb 2013-04-09 19:12:03 +0900 (b2061df) +++ lib/droonga/worker.rb 2013-04-17 20:06:20 +0900 (a873ee9) @@ -19,27 +19,31 @@ require "msgpack" require "fluent-logger" require "groonga" +require "droonga/job_queue" require "droonga/handler_plugin" +require "droonga/plugin" module Droonga class Worker - def initialize(database, queue_name) - @context = Groonga::Context.new - @database =****@conte*****_database(database) - @context.encoding = :none - @queue_name = queue_name + def initialize(options={}) + @pool = [] @handlers = [] @outputs = {} - @finish = false - @status = :IDLE - end - - def add_handler(name) - plugin = HandlerPlugin.new(name) - @handlers << plugin.instantiate(@context) + @database_name = options[:database] || "droonga/db" + @queue_name = options[:queue_name] || "DroongaQueue" + Droonga::JobQueue.ensure_schema(@database_name, @queue_name) + @handler_names = options[:handlers] || ["search"] + load_handlers + pool_size = options[:pool_size] || 1 + @pool = spawn(pool_size) + prepare end def shutdown + @pool.each do |pid| + # TODO: do it gracefully + Process.kill(:KILL, pid) + end @handlers.each do |handler| handler.shutdown end @@ -51,7 +55,48 @@ module Droonga @database = @context = nil end + def dispatch(tag, time, record) + if****@pool*****? + process_message(record) + else + post_message(record) + end + end + + def add_handler(name) + plugin = HandlerPlugin.new(name) + @handlers << plugin.instantiate(@context) + end + + def process_message(envelope) + command = envelope["type"] + handler = find_handler(command) + return unless handler + result = handler.handle(command, envelope["body"]) + output = get_output(envelope) + if output + response = { + inReplyTo: envelope["id"], + statusCode: 200, + type: (envelope["type"] || "") + ".result", + body: result + } + output.post("message", response) + end + end + + private + def post_message(envelope) + message = envelope.to_msgpack + queue = @context[@queue_name] + queue.push do |record| + record.message = message + end + end + def start + @finish = false + @status = :IDLE # TODO: doesn't work Signal.trap(:TERM) do @finish = true @@ -72,31 +117,41 @@ module Droonga end end - def post_message(envelope) - message = envelope.to_msgpack - queue = @context[@queue_name] - queue.push do |record| - record.message = message + def spawn(pool_size) + pool = [] + pool_size.times do + pid = Process.fork + if pid + pool << pid + next + end + # child process + begin + prepare + start + shutdown + exit! 0 + end end + pool end - def process_message(envelope) - command = envelope["type"] - handler = find_handler(command) - result = handler.handle(command, envelope["body"]) - output = get_output(envelope) - if output - response = { - inReplyTo: envelope["id"], - statusCode: 200, - type: (envelope["type"] || "") + ".result", - body: result - } - output.post("message", response) + def load_handlers + @handler_names.each do |handler_name| + plugin = Droonga::Plugin.new("handler", handler_name) + plugin.load + end + end + + def prepare + @context = Groonga::Context.new + @database =****@conte*****_database(@database_name) + @context.encoding = :none + @handler_names.each do |handler_name| + add_handler(handler_name) end end - private def find_handler(command) @handlers.find do |handler| handler.handlable?(command) Modified: lib/fluent/plugin/out_droonga.rb (+5 -51) =================================================================== --- lib/fluent/plugin/out_droonga.rb 2013-04-09 19:12:03 +0900 (85c1561) +++ lib/fluent/plugin/out_droonga.rb 2013-04-17 20:06:20 +0900 (89bb95a) @@ -15,9 +15,7 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/job_queue" require "droonga/worker" -require "droonga/plugin" module Fluent class DroongaOutput < Output @@ -30,68 +28,24 @@ module Fluent value.split(/\s*,\s*/) end - def configure(conf) - super - Droonga::JobQueue.ensure_schema(@database, @queue_name) - load_handlers - end - def start super - @workers = [] - @n_workers.times do - pid = Process.fork - if pid - @workers << pid - next - end - # child process - begin - create_worker.start - exit! 0 - end - end - @worker = create_worker + @worker = Droonga::Worker.new(:database => @database, + :queue_name => @queue_name, + :pool_size => @n_workers, + :handlers => @handlers) end def shutdown super @worker.shutdown - @workers.each do |pid| - Process.kill(:KILL, pid) - end end def emit(tag, es, chain) es.each do |time, record| - # Merge it if needed - dispatch(tag, time, record) + @worker.dispatch(tag, time, record) end chain.next end - - def dispatch(tag, time, record) - if****@worke*****? - @worker.process_message(record) - else - @worker.post_message(record) - end - end - - private - def load_handlers - @handlers.each do |handler_name| - plugin = Droonga::Plugin.new("handler", handler_name) - plugin.load - end - end - - def create_worker - worker = Droonga::Worker.new(@database, @queue_name) - @handlers.each do |handler_name| - worker.add_handler(handler_name) - end - worker - end end end Modified: test/test_output.rb (+3 -3) =================================================================== --- test/test_output.rb 2013-04-09 19:12:03 +0900 (38d2552) +++ test/test_output.rb 2013-04-17 20:06:20 +0900 (96ddab9) @@ -23,7 +23,7 @@ module OutputStub @processed_record = nil end - def process_message(record) + def dispatch(tag, time, record) @processed_record = record @response end @@ -56,8 +56,8 @@ module OutputStub super() end - def create_worker - Worker.new(@response) + def start + @worker = Worker.new(@response) end def create_logger(tag, options) Modified: test/test_worker.rb (+2 -1) =================================================================== --- test/test_worker.rb 2013-04-09 19:12:03 +0900 (f47426c) +++ test/test_worker.rb 2013-04-17 20:06:20 +0900 (e214969) @@ -43,7 +43,8 @@ class WorkerTest < Test::Unit::TestCase end def setup_worker - @worker = Droonga::Worker.new(@database_path.to_s, "DroongaQueue") + @worker = Droonga::Worker.new(:database => @database_path.to_s, + :queue_name => "DroongaQueue") @worker.add_handler("search") end -------------- next part -------------- HTML����������������������������...Download