Daijiro MORI
null+****@clear*****
Wed Apr 3 13:49:37 JST 2013
Daijiro MORI 2013-04-03 13:49:37 +0900 (Wed, 03 Apr 2013) New Revision: 31290186f636b2fa1ae0568aead9710773645622 https://github.com/groonga/fluent-plugin-droonga/commit/31290186f636b2fa1ae0568aead9710773645622 Message: Prefork workers Modified files: lib/droonga/worker.rb lib/fluent/plugin/out_droonga.rb Modified: lib/droonga/worker.rb (+95 -1) =================================================================== --- lib/droonga/worker.rb 2013-04-02 18:13:35 +0900 (fd8561a) +++ lib/droonga/worker.rb 2013-04-03 13:49:37 +0900 (71e4318) @@ -17,6 +17,8 @@ require 'groonga' require "droonga/handler_plugin" +require "fluent-logger" +require "json" module Droonga class Worker @@ -25,6 +27,9 @@ module Droonga @database =****@conte*****_database(database) @queue_name = queue_name @handlers = [] + @outputs = {} + @finish = false + @status = :IDLE end def add_handler(name) @@ -36,15 +41,63 @@ module Droonga @handlers.each do |handler| handler.shutdown end + @outputs.each do |dest, output| + output[:logger].close if output[:logger] + end @database.close @context.close @database = @context = nil end + def start + # TODO: doesn't work + Signal.trap(:TERM) do + @finish = true + exit! 0 if @status == :IDLE + end + queue = @context[@queue_name] + while !@finish + value = nil + queue.pull do |record| + @status = :BUSY + value = record["value"] if record + end + if value +# value.force_encoding("UTF-8") +# envelope = MessagePack.unpack(value) + envelope = JSON.parse(value) + process_message(envelope) if value + end + @status = :IDLE + end + end + + def post_message(envelope) +# value = envelope.to_msgpack +# value.force_encoding("UTF-8") + value = envelope.to_json + queue = @context[@queue_name] + queue.push do |record| + record["value"] = value + end + end + def process_message(envelope) command = envelope["type"] handler = find_handler(command) - handler.handle(command, envelope["body"]) + result = handler.handle(command, envelope["body"]) + output = get_output(envelope) + if output + response = { + inReplyTo: envelope["id"], + statusCode: 200, + type: (envelope["type"] || "") + ".result", + body: { + result: result + } + } + output.post("message", response) + end end private @@ -53,5 +106,46 @@ module Droonga handler.handlable?(command) end end + + def get_output(event) + receiver = event["replyTo"] + return nil unless receiver + unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ + raise "format: hostname:port/tag(?params)" + end + host = $1 + port = $2 + tag = $3 + params = $4 + + host_port = "#{host}:#{port}" + @outputs[host_port] ||= {} + output = @outputs[host_port] + + has_connection_id = (not params.nil? \ + and params =~ /[\?&;]connection_id=([^&;]+)/) + if output[:logger].nil? or has_connection_id + connection_id = $1 + if not has_connection_id or output[:connection_id] != connection_id + output[:connection_id] = connection_id + logger = create_logger(tag, :host => host, :port => port.to_i) + # output[:logger] should be closed if it exists beforehand? + output[:logger] = logger + end + end + + has_client_session_id = (not params.nil? \ + and params =~ /[\?&;]client_session_id=([^&;]+)/) + if has_client_session_id + client_session_id = $1 + # some generic way to handle client_session_id is expected + end + + output[:logger] + end + + def create_logger(tag, options) + Fluent::Logger::FluentLogger.new(tag, options) + end end end Modified: lib/fluent/plugin/out_droonga.rb (+20 -63) =================================================================== --- lib/fluent/plugin/out_droonga.rb 2013-04-02 18:13:35 +0900 (f5ac822) +++ lib/fluent/plugin/out_droonga.rb 2013-04-03 13:49:37 +0900 (3485e8d) @@ -15,7 +15,6 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "fluent-logger" require "droonga/worker" require "droonga/plugin" @@ -23,6 +22,7 @@ module Fluent class DroongaOutput < Output Plugin.register_output("droonga", self) + config_param :n_workers, :integer, :default => 1 config_param :database, :string, :default => "droonga.db" config_param :queue_name, :string, :default => "DroongaQueue" config_param :handlers, :default => [] do |value| @@ -36,16 +36,27 @@ module Fluent def start super - # prefork @workers + @workers = [] + @n_workers.times do + pid = Process.fork + if pid + @workers << pid + next + end + # child process + begin + create_worker.start + exit! 0 + end + end @worker = create_worker - @outputs = {} end def shutdown super @worker.shutdown - @outputs.each do |dest, output| - output[:logger].close if output[:logger] + @workers.each do |pid| + Process.kill(:TERM, pid) end end @@ -58,60 +69,10 @@ module Fluent end def dispatch(tag, time, record) - # Post to peers or execute it as needed - exec(tag, time, record) - end - - def get_output(event) - receiver = event["replyTo"] - return nil unless receiver - unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ - raise "format: hostname:port/tag(?params)" - end - host = $1 - port = $2 - tag = $3 - params = $4 - - host_port = "#{host}:#{port}" - @outputs[host_port] ||= {} - output = @outputs[host_port] - - has_connection_id = (not params.nil? \ - and params =~ /[\?&;]connection_id=([^&;]+)/) - if output[:logger].nil? or has_connection_id - connection_id = $1 - if not has_connection_id or output[:connection_id] != connection_id - output[:connection_id] = connection_id - logger = create_logger(tag, :host => host, :port => port.to_i) - # output[:logger] should be closed if it exists beforehand? - output[:logger] = logger - end - end - - has_client_session_id = (not params.nil? \ - and params =~ /[\?&;]client_session_id=([^&;]+)/) - if has_client_session_id - client_session_id = $1 - # some generic way to handle client_session_id is expected - end - - output[:logger] - end - - def exec(tag, time, record) - result =****@worke*****_message(record) - output = get_output(record) - if output - response = { - inReplyTo: record["id"], - statusCode: 200, - type: (record["type"] || "") + ".result", - body: { - result: result - } - } - output.post("message", response) + if****@worke*****? + @worker.process_message(record) + else + @worker.post_message(record) end end @@ -130,9 +91,5 @@ module Fluent end worker end - - def create_logger(tag, options) - Fluent::Logger::FluentLogger.new(tag, options) - end end end -------------- next part -------------- HTML����������������������������...Download