YUKI Hiroshi
null+****@clear*****
Tue Jan 6 19:25:15 JST 2015
YUKI Hiroshi 2015-01-06 19:25:15 +0900 (Tue, 06 Jan 2015) New Revision: 2094f6bb4c17b4d3819b9863821e7cc84ff729f1 https://github.com/droonga/droonga-engine/commit/2094f6bb4c17b4d3819b9863821e7cc84ff729f1 Message: Use FluentMessageSender directly for other engine nodes Conflicts: lib/droonga/engine_node.rb Modified files: lib/droonga/engine_node.rb Modified: lib/droonga/engine_node.rb (+33 -7) =================================================================== --- lib/droonga/engine_node.rb 2015-01-06 19:23:09 +0900 (ed62d67) +++ lib/droonga/engine_node.rb 2015-01-06 19:25:15 +0900 (8446b0e) @@ -14,8 +14,8 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA require "droonga/loggable" -require "droonga/forwarder" require "droonga/forward_buffer" +require "droonga/fluent_message_sender" require "droonga/node_metadata" module Droonga @@ -27,20 +27,29 @@ module Droonga @state = state @sender_role = sender_role - @forwarder = Forwarder.new(loop, :buffering => true) @buffer = ForwardBuffer.new(name, @forwarder) + + unless @name =~ /\A(.*):(\d+)\/([^.]+)\z/ + raise "name format: hostname:port/tag" + end + host = $1 + port = $2 + tag = $3 + @sender = FluentMessageSender.new(loop, host, port, + :buffering => true) + @sender.start end def start logger.trace("start: start") - @forwarder.start + @sender.resume @buffer.start_forward if really_writable? logger.trace("start: done") end def shutdown logger.trace("shutdown: start") - @forwarder.shutdown + @sender.shutdown logger.trace("shutdown: done") end @@ -48,13 +57,30 @@ module Droonga if not really_writable? @buffer.add(message, destination) elsif****@buffe*****? - @forwarder.forward(message, destination) + @output(message, destination) else @buffer.add(message, destination) @buffer.start_forward end end + def output(message, destination) + command = destination["type"] + receiver = destination["to"] + arguments = destination["arguments"] + + override_message = { + "type" => command, + } + override_message["arguments"] = arguments if arguments + message = message.merge(override_message) + output_tag = "#{tag}.message" + log_info = "<#{receiver}>:<#{output_tag}>" + logger.trace("forward: start: #{log_info}") + @sender.send(output_tag, message) + logger.trace("forward: end") + end + def live? @state.nil? or @state["live"] end @@ -123,11 +149,11 @@ module Droonga end end - private def on_change - @forwarder.resume + @sender.resume end + private def log_tag "[#{Process.ppid}] engine-node" end -------------- next part -------------- HTML����������������������������...Download