[Groonga-commit] groonga/fluent-plugin-groonga at e88e0c4 [master] in: use server plugin helper

Back to archive index

Kouhei Sutou null+****@clear*****
Mon Apr 24 17:06:20 JST 2017


Kouhei Sutou	2017-04-24 17:06:20 +0900 (Mon, 24 Apr 2017)

  New Revision: e88e0c4144f5f51980e729c4f91ba1d1728b1296
  https://github.com/groonga/fluent-plugin-groonga/commit/e88e0c4144f5f51980e729c4f91ba1d1728b1296

  Message:
    in: use server plugin helper

  Modified files:
    lib/fluent/plugin/in_groonga.rb

  Modified: lib/fluent/plugin/in_groonga.rb (+76 -88)
===================================================================
--- lib/fluent/plugin/in_groonga.rb    2017-04-24 13:53:16 +0900 (90567ed)
+++ lib/fluent/plugin/in_groonga.rb    2017-04-24 17:06:20 +0900 (74ccdd8)
@@ -22,14 +22,15 @@ require "http_parser"
 require "gqtp"
 require "groonga/command/parser"
 
-require "fluent/input"
-require "fluent/process"
+require "fluent/plugin/input"
 
 module Fluent
   module Plugin
     class GroongaInput < Input
       Plugin.register_input("groonga", self)
 
+      helpers :server
+
       def initialize
         super
       end
@@ -49,12 +50,43 @@ module Fluent
 
       def start
         super
-        @input.start
+
+        port =****@input*****
+        bind =****@input*****
+        log.info("[input][groonga][connect] listening port",
+                 :port => port, :bind => bind)
+        server_create_connection(:groonga_input,
+                                 port,
+                                 :proto => :tcp,
+                                 :bind => bind) do |connection|
+          handler = nil
+          real_host =****@input*****_host
+          real_port =****@input*****_port
+          repeater = Coolio::TCPSocket.connect(real_host, real_port)
+          repeater.on_connect_failed do
+            log.error("[input][groonga][connect][error] " +
+                      "failed to connect to Groonga:",
+                      :real_host => real_host,
+                      :real_port => real_port)
+            connection.close
+          end
+          repeater.on_read do |data|
+            handler.write_back(data)
+          end
+          repeater.on_close do
+            handler.close
+          end
+          event_loop_attach(repeater)
+
+          handler =****@input*****_handler(connection, repeater)
+          connection.data do |data|
+            handler.on_read(data)
+          end
+        end
       end
 
       def shutdown
         super
-        @input.shutdown
       end
 
       class Repeater < Coolio::TCPSocket
@@ -74,7 +106,6 @@ module Fluent
 
       class BaseInput
         include Configurable
-        include DetachMultiProcessMixin
 
         config_param :bind, :string, :default => "0.0.0.0"
         config_param :port, :integer, :default => nil
@@ -131,37 +162,6 @@ module Fluent
           @real_port ||= default_port
         end
 
-        def start
-          listen_socket = TCPServer.new(@bind, @port)
-          # detach_multi_process do
-            @loop = Coolio::Loop.new
-
-            @socket = Coolio::TCPServer.new(listen_socket, nil,
-                                            handler_class, self)
-            @loop.attach(@socket)
-
-            @shutdown_notifier = Coolio::AsyncWatcher.new
-            @loop.attach(@shutdown_notifier)
-
-            @thread = Thread.new do
-              run
-            end
-          # end
-        end
-
-        def shutdown
-          @loop.stop
-          @socket.close
-          @shutdown_notifier.signal
-          @thread.join
-        end
-
-        def create_repeater(client)
-          repeater = Repeater.connect(@real_host, @real_port, client)
-          repeater.attach(@loop)
-          repeater
-        end
-
         def emit(command, params)
           normalized_command = command.split(".")[0]
           return unless emit_command?(normalized_command)
@@ -175,14 +175,6 @@ module Fluent
         end
 
         private
-        def run
-          @loop.run
-        rescue
-          log.error("[input][groonga][error] unexpected error",
-                    :error => "#{$!.class}: #{$!}")
-          log.error_backtrace
-        end
-
         def emit_command?(command)
           return true if @emit_commands.empty?
           @emit_commands.any? do |pattern|
@@ -192,30 +184,20 @@ module Fluent
       end
 
       class HTTPInput < BaseInput
+        def create_handler(connection, repeater)
+          Handler.new(self, connection, repeater)
+        end
+
         private
         def default_port
           10041
         end
 
-        def handler_class
-          Handler
-        end
-
-        class Handler < Coolio::Socket
-          def initialize(socket, input)
-            super(socket)
+        class Handler
+          def initialize(input, connection, repeater)
             @input = input
-          end
-
-          def on_connect
-            @repeater =****@input*****_repeater(self)
-            @repeater.on_connect_failed do
-              @input.log.error("[input][groonga][connect][error] " +
-                               "failed to connect to Groonga:",
-                               :real_host => @input.real_host,
-                               :real_port => @input.real_port)
-              close
-            end
+            @connection = connection
+            @repeater = repeater
             @request_handler = RequestHandler.new(@input, @repeater)
             @response_handler = ResponseHandler.new(self, @input)
           end
@@ -249,7 +231,7 @@ module Fluent
               reply_error_response("500 Internal Server Error")
               return
             end
-            write(data)
+            @connection.write(data)
           end
 
           def on_response_complete(response)
@@ -257,11 +239,15 @@ module Fluent
               @input.emit(@request_handler.command,
                           @request_handler.params)
             end
-            on_write_complete do
+            @connection.on(:write_complete) do
               @repeater.close
             end
           end
 
+          def close
+            @connection.close
+          end
+
           private
           def need_emit?(response)
             case @request_handler.command
@@ -279,13 +265,12 @@ module Fluent
           end
 
           def reply_error_response(status)
-            write("HTTP1.1 #{status}\r\n")
-            write("Server: fluent-plugin-groonga\r\n")
-            write("Connection: close\r\n")
-            write("Content-Length: 0\r\n")
-            write("\r\n")
-            disable
-            on_write_complete do
+            @connection.write("HTTP1.1 #{status}\r\n")
+            @connection.write("Server: fluent-plugin-groonga\r\n")
+            @connection.write("Connection: close\r\n")
+            @connection.write("Content-Length: 0\r\n")
+            @connection.write("\r\n")
+            @connection.on(:write_complete) do
               @repeater.close
             end
           end
@@ -410,37 +395,40 @@ module Fluent
       end
 
       class GQTPInput < BaseInput
+        def create_handler(connection, repeater)
+          Handler.new(self, connection, repeater)
+        end
+
         private
         def default_port
           10043
         end
 
-        def handler_class
-          Handler
-        end
-
-        class Handler < Coolio::Socket
-          def initialize(socket, input)
-            super(socket)
+        class Handler
+          def initialize(input, connection, repeater)
             @input = input
-          end
+            @connection = connection
+            @repeater = repeater
 
-          def on_connect
-            @parser = Parser.new(@input)
-            @repeater =****@input*****_repeater(self)
+            @request_parser = RequestParser.new(@input)
           end
 
           def on_read(data)
-            @parser << data
+            @request_parser << data
             @repeater.write(data)
           end
 
-          def on_close
-            @parser.close
+          def write_back(data)
+            @connection.write(data)
+          end
+
+          def close
+            @request_parser.close
+            @connection.close
           end
         end
 
-        class Parser < GQTP::Parser
+        class RequestParser < GQTP::Parser
           def initialize(input)
             super()
             @input = input
@@ -463,13 +451,13 @@ module Fluent
           def initialize_command_parser
             @command_parser = Groonga::Command::Parser.new
             @command_parser.on_command do |command|
-              @input.emit(command.name, command.arguments)
+              @input.emit(command.command_name, command.arguments)
             end
             @command_parser.on_load_value do |command, value|
               arguments = command.arguments.dup
               arguments[:columns] = command.columns.join(", ")
               arguments[:values] = Yajl::Encoder.encode([value])
-              @input.emit(command.name, arguments)
+              @input.emit(command.command_name, arguments)
             end
           end
         end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index