[Groonga-commit] droonga/fluent-plugin-droonga at 2a5d076 [master] Extract code that posts message to other process

Back to archive index

Kouhei Sutou null+****@clear*****
Sun Nov 24 23:48:17 JST 2013


Kouhei Sutou	2013-11-24 23:48:17 +0900 (Sun, 24 Nov 2013)

  New Revision: 2a5d0768c4fd2a826e60b2467f6d2d2d2cfd836b
  https://github.com/droonga/fluent-plugin-droonga/commit/2a5d0768c4fd2a826e60b2467f6d2d2d2cfd836b

  Message:
    Extract code that posts message to other process

  Added files:
    lib/droonga/forwarder.rb
  Modified files:
    lib/droonga/handler.rb

  Added: lib/droonga/forwarder.rb (+126 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/forwarder.rb    2013-11-24 23:48:17 +0900 (5ba0d2f)
@@ -0,0 +1,126 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "fluent-logger"
+require "fluent/logger/fluent_logger"
+
+module Droonga
+  class Forwarder
+    def initialize
+      @outputs = {}
+    end
+
+    def shutdown
+      $log.trace("#{log_tag}: shutdown: start")
+      @outputs.each do |dest, output|
+        output[:logger].close if output[:logger]
+      end
+      $log.trace("#{log_tag}: shutdown: done")
+    end
+
+    def forward(envelope, body, destination)
+      $log.trace("#{log_tag}: post: start")
+      command = destination["type"]
+      receiver = destination["to"]
+      arguments = destination["arguments"]
+      synchronous = destination["synchronous"]
+      output(receiver, envelope, body, command, arguments)
+      $log.trace("#{log_tag}: post: done")
+    end
+
+    private
+    def output(receiver, envelope, body, command, arguments)
+      $log.trace("#{log_tag}: output: start")
+      unless receiver.is_a?(String) && command.is_a?(String)
+        $log.trace("#{log_tag}: output: abort: invalid argument",
+                   :receiver => receiver,
+                   :command  => command)
+        return
+      end
+      unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
+        raise "format: hostname:port/tag(?params)"
+      end
+      host = $1
+      port = $2
+      tag  = $3
+      params = $4
+      output = get_output(host, port, params)
+      unless output
+        $log.trace("#{log_tag}: output: abort: no output",
+                   :host   => host,
+                   :port   => port,
+                   :params => params)
+        return
+      end
+      if command =~ /\.result$/
+        message = {
+          inReplyTo: envelope["id"],
+          statusCode: 200,
+          type: command,
+          body: body
+        }
+      else
+        message = envelope.merge(
+          body: body,
+          type: command,
+          arguments: arguments
+        )
+      end
+      output_tag = "#{tag}.message"
+      log_info = "<#{receiver}>:<#{output_tag}>"
+      $log.trace("#{log_tag}: output: post: start: #{log_info}")
+      output.post(output_tag, message)
+      $log.trace("#{log_tag}: output: post: done: #{log_info}")
+      $log.trace("#{log_tag}: output: done")
+    end
+
+    def get_output(host, port, params)
+      host_port = "#{host}:#{port}"
+      @outputs[host_port] ||= {}
+      output = @outputs[host_port]
+
+      has_connection_id = (not params.nil? \
+                           and params =~ /[\?&;]connection_id=([^&;]+)/)
+      if output[:logger].nil? or has_connection_id
+        connection_id = $1
+        if not has_connection_id or output[:connection_id] != connection_id
+          output[:connection_id] = connection_id
+          logger = create_logger(:host => host, :port => port.to_i)
+          # output[:logger] should be closed if it exists beforehand?
+          output[:logger] = logger
+        end
+      end
+
+      has_client_session_id = (not params.nil? \
+                               and params =~ /[\?&;]client_session_id=([^&;]+)/)
+      if has_client_session_id
+        client_session_id = $1
+        # some generic way to handle client_session_id is expected
+      end
+
+      output[:logger]
+    end
+
+    def create_logger(options)
+      Fluent::Logger::FluentLogger.new(nil, options)
+    end
+
+    def log_tag
+      "[#{Process.ppid}][#{Process.pid}] forwarder"
+    end
+  end
+end

  Modified: lib/droonga/handler.rb (+5 -90)
===================================================================
--- lib/droonga/handler.rb    2013-11-24 23:35:12 +0900 (0267071)
+++ lib/droonga/handler.rb    2013-11-24 23:48:17 +0900 (56f9155)
@@ -15,11 +15,10 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "fluent-logger"
-require "fluent/logger/fluent_logger"
 require "groonga"
 
 require "droonga/job_queue"
+require "droonga/forwarder"
 require "droonga/pluggable"
 require "droonga/handler_plugin"
 
@@ -30,7 +29,6 @@ module Droonga
     attr_reader :context, :envelope, :name
 
     def initialize(options={})
-      @outputs = {}
       @options = options
       @name = options[:name]
       @database_name = options[:database]
@@ -41,9 +39,7 @@ module Droonga
     def shutdown
       $log.trace("#{log_tag}: shutdown: start")
       super
-      @outputs.each do |dest, output|
-        output[:logger].close if output[:logger]
-      end
+      @forwarder.shutdown
       if @database
         @database.close
         @context.close
@@ -96,14 +92,8 @@ module Droonga
       @output_values[name] = value
     end
 
-    def post(body, destination)
-      $log.trace("#{log_tag}: post: start")
-      command = destination["type"]
-      receiver = destination["to"]
-      arguments = destination["arguments"]
-      synchronous = destination["synchronous"]
-      output(receiver, body, command, arguments)
-      $log.trace("#{log_tag}: post: done")
+    def post(message, destination)
+      @forwarder.forward(envelope, message, destination)
     end
 
     def handle(command, body, synchronous=nil)
@@ -124,51 +114,6 @@ module Droonga
     end
 
     private
-    def output(receiver, body, command, arguments)
-      $log.trace("#{log_tag}: output: start")
-      unless receiver.is_a?(String) && command.is_a?(String)
-        $log.trace("#{log_tag}: output: abort: invalid argument",
-                   :receiver => receiver,
-                   :command  => command)
-        return
-      end
-      unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
-        raise "format: hostname:port/tag(?params)"
-      end
-      host = $1
-      port = $2
-      tag  = $3
-      params = $4
-      output = get_output(host, port, params)
-      unless output
-        $log.trace("#{log_tag}: output: abort: no output",
-                   :host   => host,
-                   :port   => port,
-                   :params => params)
-        return
-      end
-      if command =~ /\.result$/
-        message = {
-          inReplyTo: envelope["id"],
-          statusCode: 200,
-          type: command,
-          body: body
-        }
-      else
-        message = envelope.merge(
-          body: body,
-          type: command,
-          arguments: arguments
-        )
-      end
-      output_tag = "#{tag}.message"
-      log_info = "<#{receiver}>:<#{output_tag}>"
-      $log.trace("#{log_tag}: output: post: start: #{log_info}")
-      output.post(output_tag, message)
-      $log.trace("#{log_tag}: output: post: done: #{log_info}")
-      $log.trace("#{log_tag}: output: done")
-    end
-
     def parse_envelope(envelope)
       @envelope = envelope
       envelope["via"] ||= []
@@ -182,39 +127,13 @@ module Droonga
         @job_queue = JobQueue.open(@database_name, @queue_name)
       end
       load_plugins(@options[:handlers] || [])
+      @forwarder = Forwarder.new
     end
 
     def instantiate_plugin(name)
       HandlerPlugin.repository.instantiate(name, self)
     end
 
-    def get_output(host, port, params)
-      host_port = "#{host}:#{port}"
-      @outputs[host_port] ||= {}
-      output = @outputs[host_port]
-
-      has_connection_id = (not params.nil? \
-                           and params =~ /[\?&;]connection_id=([^&;]+)/)
-      if output[:logger].nil? or has_connection_id
-        connection_id = $1
-        if not has_connection_id or output[:connection_id] != connection_id
-          output[:connection_id] = connection_id
-          logger = create_logger(:host => host, :port => port.to_i)
-          # output[:logger] should be closed if it exists beforehand?
-          output[:logger] = logger
-        end
-      end
-
-      has_client_session_id = (not params.nil? \
-                               and params =~ /[\?&;]client_session_id=([^&;]+)/)
-      if has_client_session_id
-        client_session_id = $1
-        # some generic way to handle client_session_id is expected
-      end
-
-      output[:logger]
-    end
-
     def process_command(plugin, command, request, arguments)
       return false unless request.is_a? Hash
 
@@ -258,10 +177,6 @@ module Droonga
       end
     end
 
-    def create_logger(options)
-      Fluent::Logger::FluentLogger.new(nil, options)
-    end
-
     def log_tag
       "[#{Process.ppid}][#{Process.pid}] handler"
     end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index