Kouhei Sutou
null+****@clear*****
Mon Apr 24 17:06:20 JST 2017
Kouhei Sutou 2017-04-24 17:06:20 +0900 (Mon, 24 Apr 2017) New Revision: e88e0c4144f5f51980e729c4f91ba1d1728b1296 https://github.com/groonga/fluent-plugin-groonga/commit/e88e0c4144f5f51980e729c4f91ba1d1728b1296 Message: in: use server plugin helper Modified files: lib/fluent/plugin/in_groonga.rb Modified: lib/fluent/plugin/in_groonga.rb (+76 -88) =================================================================== --- lib/fluent/plugin/in_groonga.rb 2017-04-24 13:53:16 +0900 (90567ed) +++ lib/fluent/plugin/in_groonga.rb 2017-04-24 17:06:20 +0900 (74ccdd8) @@ -22,14 +22,15 @@ require "http_parser" require "gqtp" require "groonga/command/parser" -require "fluent/input" -require "fluent/process" +require "fluent/plugin/input" module Fluent module Plugin class GroongaInput < Input Plugin.register_input("groonga", self) + helpers :server + def initialize super end @@ -49,12 +50,43 @@ module Fluent def start super - @input.start + + port =****@input***** + bind =****@input***** + log.info("[input][groonga][connect] listening port", + :port => port, :bind => bind) + server_create_connection(:groonga_input, + port, + :proto => :tcp, + :bind => bind) do |connection| + handler = nil + real_host =****@input*****_host + real_port =****@input*****_port + repeater = Coolio::TCPSocket.connect(real_host, real_port) + repeater.on_connect_failed do + log.error("[input][groonga][connect][error] " + + "failed to connect to Groonga:", + :real_host => real_host, + :real_port => real_port) + connection.close + end + repeater.on_read do |data| + handler.write_back(data) + end + repeater.on_close do + handler.close + end + event_loop_attach(repeater) + + handler =****@input*****_handler(connection, repeater) + connection.data do |data| + handler.on_read(data) + end + end end def shutdown super - @input.shutdown end class Repeater < Coolio::TCPSocket @@ -74,7 +106,6 @@ module Fluent class BaseInput include Configurable - include DetachMultiProcessMixin config_param :bind, :string, :default => "0.0.0.0" config_param :port, :integer, :default => nil @@ -131,37 +162,6 @@ module Fluent @real_port ||= default_port end - def start - listen_socket = TCPServer.new(@bind, @port) - # detach_multi_process do - @loop = Coolio::Loop.new - - @socket = Coolio::TCPServer.new(listen_socket, nil, - handler_class, self) - @loop.attach(@socket) - - @shutdown_notifier = Coolio::AsyncWatcher.new - @loop.attach(@shutdown_notifier) - - @thread = Thread.new do - run - end - # end - end - - def shutdown - @loop.stop - @socket.close - @shutdown_notifier.signal - @thread.join - end - - def create_repeater(client) - repeater = Repeater.connect(@real_host, @real_port, client) - repeater.attach(@loop) - repeater - end - def emit(command, params) normalized_command = command.split(".")[0] return unless emit_command?(normalized_command) @@ -175,14 +175,6 @@ module Fluent end private - def run - @loop.run - rescue - log.error("[input][groonga][error] unexpected error", - :error => "#{$!.class}: #{$!}") - log.error_backtrace - end - def emit_command?(command) return true if @emit_commands.empty? @emit_commands.any? do |pattern| @@ -192,30 +184,20 @@ module Fluent end class HTTPInput < BaseInput + def create_handler(connection, repeater) + Handler.new(self, connection, repeater) + end + private def default_port 10041 end - def handler_class - Handler - end - - class Handler < Coolio::Socket - def initialize(socket, input) - super(socket) + class Handler + def initialize(input, connection, repeater) @input = input - end - - def on_connect - @repeater =****@input*****_repeater(self) - @repeater.on_connect_failed do - @input.log.error("[input][groonga][connect][error] " + - "failed to connect to Groonga:", - :real_host => @input.real_host, - :real_port => @input.real_port) - close - end + @connection = connection + @repeater = repeater @request_handler = RequestHandler.new(@input, @repeater) @response_handler = ResponseHandler.new(self, @input) end @@ -249,7 +231,7 @@ module Fluent reply_error_response("500 Internal Server Error") return end - write(data) + @connection.write(data) end def on_response_complete(response) @@ -257,11 +239,15 @@ module Fluent @input.emit(@request_handler.command, @request_handler.params) end - on_write_complete do + @connection.on(:write_complete) do @repeater.close end end + def close + @connection.close + end + private def need_emit?(response) case @request_handler.command @@ -279,13 +265,12 @@ module Fluent end def reply_error_response(status) - write("HTTP1.1 #{status}\r\n") - write("Server: fluent-plugin-groonga\r\n") - write("Connection: close\r\n") - write("Content-Length: 0\r\n") - write("\r\n") - disable - on_write_complete do + @connection.write("HTTP1.1 #{status}\r\n") + @connection.write("Server: fluent-plugin-groonga\r\n") + @connection.write("Connection: close\r\n") + @connection.write("Content-Length: 0\r\n") + @connection.write("\r\n") + @connection.on(:write_complete) do @repeater.close end end @@ -410,37 +395,40 @@ module Fluent end class GQTPInput < BaseInput + def create_handler(connection, repeater) + Handler.new(self, connection, repeater) + end + private def default_port 10043 end - def handler_class - Handler - end - - class Handler < Coolio::Socket - def initialize(socket, input) - super(socket) + class Handler + def initialize(input, connection, repeater) @input = input - end + @connection = connection + @repeater = repeater - def on_connect - @parser = Parser.new(@input) - @repeater =****@input*****_repeater(self) + @request_parser = RequestParser.new(@input) end def on_read(data) - @parser << data + @request_parser << data @repeater.write(data) end - def on_close - @parser.close + def write_back(data) + @connection.write(data) + end + + def close + @request_parser.close + @connection.close end end - class Parser < GQTP::Parser + class RequestParser < GQTP::Parser def initialize(input) super() @input = input @@ -463,13 +451,13 @@ module Fluent def initialize_command_parser @command_parser = Groonga::Command::Parser.new @command_parser.on_command do |command| - @input.emit(command.name, command.arguments) + @input.emit(command.command_name, command.arguments) end @command_parser.on_load_value do |command, value| arguments = command.arguments.dup arguments[:columns] = command.columns.join(", ") arguments[:values] = Yajl::Encoder.encode([value]) - @input.emit(command.name, arguments) + @input.emit(command.command_name, arguments) end end end -------------- next part -------------- HTML����������������������������...Download