Kouhei Sutou
null+****@clear*****
Sun Nov 24 23:48:17 JST 2013
Kouhei Sutou 2013-11-24 23:48:17 +0900 (Sun, 24 Nov 2013) New Revision: 2a5d0768c4fd2a826e60b2467f6d2d2d2cfd836b https://github.com/droonga/fluent-plugin-droonga/commit/2a5d0768c4fd2a826e60b2467f6d2d2d2cfd836b Message: Extract code that posts message to other process Added files: lib/droonga/forwarder.rb Modified files: lib/droonga/handler.rb Added: lib/droonga/forwarder.rb (+126 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/forwarder.rb 2013-11-24 23:48:17 +0900 (5ba0d2f) @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "fluent-logger" +require "fluent/logger/fluent_logger" + +module Droonga + class Forwarder + def initialize + @outputs = {} + end + + def shutdown + $log.trace("#{log_tag}: shutdown: start") + @outputs.each do |dest, output| + output[:logger].close if output[:logger] + end + $log.trace("#{log_tag}: shutdown: done") + end + + def forward(envelope, body, destination) + $log.trace("#{log_tag}: post: start") + command = destination["type"] + receiver = destination["to"] + arguments = destination["arguments"] + synchronous = destination["synchronous"] + output(receiver, envelope, body, command, arguments) + $log.trace("#{log_tag}: post: done") + end + + private + def output(receiver, envelope, body, command, arguments) + $log.trace("#{log_tag}: output: start") + unless receiver.is_a?(String) && command.is_a?(String) + $log.trace("#{log_tag}: output: abort: invalid argument", + :receiver => receiver, + :command => command) + return + end + unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ + raise "format: hostname:port/tag(?params)" + end + host = $1 + port = $2 + tag = $3 + params = $4 + output = get_output(host, port, params) + unless output + $log.trace("#{log_tag}: output: abort: no output", + :host => host, + :port => port, + :params => params) + return + end + if command =~ /\.result$/ + message = { + inReplyTo: envelope["id"], + statusCode: 200, + type: command, + body: body + } + else + message = envelope.merge( + body: body, + type: command, + arguments: arguments + ) + end + output_tag = "#{tag}.message" + log_info = "<#{receiver}>:<#{output_tag}>" + $log.trace("#{log_tag}: output: post: start: #{log_info}") + output.post(output_tag, message) + $log.trace("#{log_tag}: output: post: done: #{log_info}") + $log.trace("#{log_tag}: output: done") + end + + def get_output(host, port, params) + host_port = "#{host}:#{port}" + @outputs[host_port] ||= {} + output = @outputs[host_port] + + has_connection_id = (not params.nil? \ + and params =~ /[\?&;]connection_id=([^&;]+)/) + if output[:logger].nil? or has_connection_id + connection_id = $1 + if not has_connection_id or output[:connection_id] != connection_id + output[:connection_id] = connection_id + logger = create_logger(:host => host, :port => port.to_i) + # output[:logger] should be closed if it exists beforehand? + output[:logger] = logger + end + end + + has_client_session_id = (not params.nil? \ + and params =~ /[\?&;]client_session_id=([^&;]+)/) + if has_client_session_id + client_session_id = $1 + # some generic way to handle client_session_id is expected + end + + output[:logger] + end + + def create_logger(options) + Fluent::Logger::FluentLogger.new(nil, options) + end + + def log_tag + "[#{Process.ppid}][#{Process.pid}] forwarder" + end + end +end Modified: lib/droonga/handler.rb (+5 -90) =================================================================== --- lib/droonga/handler.rb 2013-11-24 23:35:12 +0900 (0267071) +++ lib/droonga/handler.rb 2013-11-24 23:48:17 +0900 (56f9155) @@ -15,11 +15,10 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "fluent-logger" -require "fluent/logger/fluent_logger" require "groonga" require "droonga/job_queue" +require "droonga/forwarder" require "droonga/pluggable" require "droonga/handler_plugin" @@ -30,7 +29,6 @@ module Droonga attr_reader :context, :envelope, :name def initialize(options={}) - @outputs = {} @options = options @name = options[:name] @database_name = options[:database] @@ -41,9 +39,7 @@ module Droonga def shutdown $log.trace("#{log_tag}: shutdown: start") super - @outputs.each do |dest, output| - output[:logger].close if output[:logger] - end + @forwarder.shutdown if @database @database.close @context.close @@ -96,14 +92,8 @@ module Droonga @output_values[name] = value end - def post(body, destination) - $log.trace("#{log_tag}: post: start") - command = destination["type"] - receiver = destination["to"] - arguments = destination["arguments"] - synchronous = destination["synchronous"] - output(receiver, body, command, arguments) - $log.trace("#{log_tag}: post: done") + def post(message, destination) + @forwarder.forward(envelope, message, destination) end def handle(command, body, synchronous=nil) @@ -124,51 +114,6 @@ module Droonga end private - def output(receiver, body, command, arguments) - $log.trace("#{log_tag}: output: start") - unless receiver.is_a?(String) && command.is_a?(String) - $log.trace("#{log_tag}: output: abort: invalid argument", - :receiver => receiver, - :command => command) - return - end - unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ - raise "format: hostname:port/tag(?params)" - end - host = $1 - port = $2 - tag = $3 - params = $4 - output = get_output(host, port, params) - unless output - $log.trace("#{log_tag}: output: abort: no output", - :host => host, - :port => port, - :params => params) - return - end - if command =~ /\.result$/ - message = { - inReplyTo: envelope["id"], - statusCode: 200, - type: command, - body: body - } - else - message = envelope.merge( - body: body, - type: command, - arguments: arguments - ) - end - output_tag = "#{tag}.message" - log_info = "<#{receiver}>:<#{output_tag}>" - $log.trace("#{log_tag}: output: post: start: #{log_info}") - output.post(output_tag, message) - $log.trace("#{log_tag}: output: post: done: #{log_info}") - $log.trace("#{log_tag}: output: done") - end - def parse_envelope(envelope) @envelope = envelope envelope["via"] ||= [] @@ -182,39 +127,13 @@ module Droonga @job_queue = JobQueue.open(@database_name, @queue_name) end load_plugins(@options[:handlers] || []) + @forwarder = Forwarder.new end def instantiate_plugin(name) HandlerPlugin.repository.instantiate(name, self) end - def get_output(host, port, params) - host_port = "#{host}:#{port}" - @outputs[host_port] ||= {} - output = @outputs[host_port] - - has_connection_id = (not params.nil? \ - and params =~ /[\?&;]connection_id=([^&;]+)/) - if output[:logger].nil? or has_connection_id - connection_id = $1 - if not has_connection_id or output[:connection_id] != connection_id - output[:connection_id] = connection_id - logger = create_logger(:host => host, :port => port.to_i) - # output[:logger] should be closed if it exists beforehand? - output[:logger] = logger - end - end - - has_client_session_id = (not params.nil? \ - and params =~ /[\?&;]client_session_id=([^&;]+)/) - if has_client_session_id - client_session_id = $1 - # some generic way to handle client_session_id is expected - end - - output[:logger] - end - def process_command(plugin, command, request, arguments) return false unless request.is_a? Hash @@ -258,10 +177,6 @@ module Droonga end end - def create_logger(options) - Fluent::Logger::FluentLogger.new(nil, options) - end - def log_tag "[#{Process.ppid}][#{Process.pid}] handler" end -------------- next part -------------- HTML����������������������������... Download