[Groonga-commit] droonga/droonga-engine at 2094f6b [buffered-forward] Use FluentMessageSender directly for other engine nodes

Back to archive index

YUKI Hiroshi null+****@clear*****
Tue Jan 6 19:25:15 JST 2015


YUKI Hiroshi	2015-01-06 19:25:15 +0900 (Tue, 06 Jan 2015)

  New Revision: 2094f6bb4c17b4d3819b9863821e7cc84ff729f1
  https://github.com/droonga/droonga-engine/commit/2094f6bb4c17b4d3819b9863821e7cc84ff729f1

  Message:
    Use FluentMessageSender directly for other engine nodes
    
    Conflicts:
    	lib/droonga/engine_node.rb

  Modified files:
    lib/droonga/engine_node.rb

  Modified: lib/droonga/engine_node.rb (+33 -7)
===================================================================
--- lib/droonga/engine_node.rb    2015-01-06 19:23:09 +0900 (ed62d67)
+++ lib/droonga/engine_node.rb    2015-01-06 19:25:15 +0900 (8446b0e)
@@ -14,8 +14,8 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
 require "droonga/loggable"
-require "droonga/forwarder"
 require "droonga/forward_buffer"
+require "droonga/fluent_message_sender"
 require "droonga/node_metadata"
 
 module Droonga
@@ -27,20 +27,29 @@ module Droonga
       @state = state
       @sender_role = sender_role
 
-      @forwarder = Forwarder.new(loop, :buffering => true)
       @buffer = ForwardBuffer.new(name, @forwarder)
+
+      unless @name =~ /\A(.*):(\d+)\/([^.]+)\z/
+        raise "name format: hostname:port/tag"
+      end
+      host = $1
+      port = $2
+      tag  = $3
+      @sender = FluentMessageSender.new(loop, host, port,
+                                        :buffering => true)
+      @sender.start
     end
 
     def start
       logger.trace("start: start")
-      @forwarder.start
+      @sender.resume
       @buffer.start_forward if really_writable?
       logger.trace("start: done")
     end
 
     def shutdown
       logger.trace("shutdown: start")
-      @forwarder.shutdown
+      @sender.shutdown
       logger.trace("shutdown: done")
     end
 
@@ -48,13 +57,30 @@ module Droonga
       if not really_writable?
         @buffer.add(message, destination)
       elsif****@buffe*****?
-        @forwarder.forward(message, destination)
+        @output(message, destination)
       else
         @buffer.add(message, destination)
         @buffer.start_forward
       end
     end
 
+    def output(message, destination)
+      command = destination["type"]
+      receiver = destination["to"]
+      arguments = destination["arguments"]
+
+      override_message = {
+        "type" => command,
+      }
+      override_message["arguments"] = arguments if arguments
+      message = message.merge(override_message)
+      output_tag = "#{tag}.message"
+      log_info = "<#{receiver}>:<#{output_tag}>"
+      logger.trace("forward: start: #{log_info}")
+      @sender.send(output_tag, message)
+      logger.trace("forward: end")
+    end
+
     def live?
       @state.nil? or @state["live"]
     end
@@ -123,11 +149,11 @@ module Droonga
       end
     end
 
-    private
     def on_change
-      @forwarder.resume
+      @sender.resume
     end
 
+    private
     def log_tag
       "[#{Process.ppid}] engine-node"
     end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index