[Groonga-commit] groonga/fluent-plugin-droonga [master] Follow the newest replyTo field specification

Back to archive index

Daijiro MORI null+****@clear*****
Tue Mar 26 18:59:40 JST 2013


Daijiro MORI	2013-03-26 18:59:40 +0900 (Tue, 26 Mar 2013)

  New Revision: 1cc96e1b64f3bda3648793387c1cc15b576e63f1
  https://github.com/groonga/fluent-plugin-droonga/commit/1cc96e1b64f3bda3648793387c1cc15b576e63f1

  Message:
    Follow the newest replyTo field specification

  Modified files:
    lib/fluent/plugin/out_droonga.rb

  Modified: lib/fluent/plugin/out_droonga.rb (+51 -20)
===================================================================
--- lib/fluent/plugin/out_droonga.rb    2013-03-01 14:06:00 +0900 (c3a0e56)
+++ lib/fluent/plugin/out_droonga.rb    2013-03-26 18:59:40 +0900 (b6f6cf1)
@@ -15,8 +15,7 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "socket"
-require "msgpack"
+require "fluent-logger"
 require "droonga/worker"
 
 module Fluent
@@ -36,8 +35,8 @@ module Fluent
     def shutdown
       super
       @worker.shutdown
-      @outputs.each do |dest, socket|
-        socket.close
+      @outputs.each do |dest, output|
+        output[:logger].close if output[:logger]
       end
     end
 
@@ -54,26 +53,58 @@ module Fluent
       exec(tag, time, record)
     end
 
-    def exec(tag, time, record)
-      result =****@worke*****_message(record)
-      if record["replyTo"]
-        post(record["replyTo"], tag, {
-               inReplyTo: record["id"],
-               type: (record["type"] || "") + ".result",
-               body: result
-             })
+    def get_output(event)
+      receiver = event["replyTo"]
+      return nil unless receiver
+      unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
+        raise "format: hostname:port/tag(?params)"
+      end
+      host = $1
+      port = $2
+      tag  = $3
+      params = $4
+
+      host_port = "#{host}:#{port}"
+      @outputs[host_port] ||= {}
+      output = @outputs[host_port]
+
+      has_connection_id = (not params.nil? \
+                           and params =~ /[\?&;]connection_id=([^&;]+)/)
+      if output[:logger].nil? or has_connection_id
+        connection_id = $1
+        if not has_connection_id or output[:connection_id] != connection_id
+          output[:connection_id] = connection_id
+          logger = Fluent::Logger::FluentLogger.new(tag, :host => host,
+                                                         :port=> port.to_i)
+          # output[:logger] should be closed if it exists beforehand?
+          output[:logger] = logger
+        end
       end
+
+      has_client_session_id = (not params.nil? \
+                               and params =~ /[\?&;]client_session_id=([^&;]+)/)
+      if has_client_session_id
+        client_session_id = $1
+        # some generic way to handle client_session_id is expected
+      end
+
+      output[:logger]
     end
 
-    def post(dest, tag, result)
-      unless @outputs[dest]
-        host, port = dest.split(/:/, 2)
-        port = Integer(port)
-        socket = TCPSocket.new(host, port)
-        @outputs[dest] = socket
+    def exec(tag, time, record)
+      result =****@worke*****_message(record)
+      output = get_output(record)
+      if output
+        response = {
+          inReplyTo: record["id"],
+          statusCode: 200,
+          type: (record["type"] || "") + ".result",
+          body: {
+            result: result
+          }
+        }
+        output.post("message", response)
       end
-      data = {"tag" => tag, "data" => result}.to_msgpack
-      @outputs[dest].write(data)
     end
   end
 end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index