Kouhei Sutou
null+****@clear*****
Wed Oct 15 18:21:20 JST 2014
Kouhei Sutou 2014-10-15 18:21:20 +0900 (Wed, 15 Oct 2014) New Revision: 2436e84e1f2a7be7c2c6a53510d9a6dd0efe3d40 https://github.com/groonga/fluent-plugin-groonga/commit/2436e84e1f2a7be7c2c6a53510d9a6dd0efe3d40 Message: out: use remove needless async code Modified files: fluent-plugin-groonga.gemspec lib/fluent/plugin/out_groonga.rb test/test_output.rb Modified: fluent-plugin-groonga.gemspec (+2 -2) =================================================================== --- fluent-plugin-groonga.gemspec 2014-10-15 18:20:50 +0900 (bb1de8e) +++ fluent-plugin-groonga.gemspec 2014-10-15 18:21:20 +0900 (acfeabd) @@ -1,6 +1,6 @@ # -*- mode: ruby; coding: utf-8 -*- # -# Copyright (C) 2012-2013 Kouhei Sutou <kou �� clear-code.com> +# Copyright (C) 2012-2014 Kouhei Sutou <kou �� clear-code.com> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -35,7 +35,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_runtime_dependency("fluentd") - spec.add_runtime_dependency("gqtp", ">= 1.0.3") + spec.add_runtime_dependency("groonga-client") spec.add_runtime_dependency("groonga-command-parser") spec.add_development_dependency("rake") Modified: lib/fluent/plugin/out_groonga.rb (+80 -86) =================================================================== --- lib/fluent/plugin/out_groonga.rb 2014-10-15 18:20:50 +0900 (f4f6c83) +++ lib/fluent/plugin/out_groonga.rb 2014-10-15 18:21:20 +0900 (cebdf0a) @@ -17,15 +17,24 @@ require "fileutils" +require "groonga/client" + module Fluent - class GroongaOutput < ObjectBufferedOutput + class GroongaOutput < BufferedOutput Plugin.register_output("groonga", self) def initialize super end - config_param :protocol, :string, :default => "http" + config_param :protocol, :default => :http do |value| + case value + when "http", "gqtp", "command" + value.to_sym + else + raise ConfigError, "must be http, gqtp or command: <#{value}>" + end + end config_param :table, :string, :default => nil def configure(conf) @@ -46,31 +55,27 @@ module Fluent @client.shutdown end - def write_objects(tag, messages) - messages.each do |time, record| + def format(tag, time, record) + [tag, time, record].to_msgpack + end + + def write(chunk) + chunk.msgpack_each do |message| + tag, _, record = message @emitter.emit(tag, record) end end + private def create_client(protocol) case protocol - when "http" - HTTPClient.new - when "gqtp" - GQTPClient.new - when "command" + when :http, :gqtp + NetworkClient.new(protocol) + when :command CommandClient.new end end - def create_output(buffer_type, emitter) - if buffer_type == "none" - RawGroongaOutput.new(emitter) - else - BufferedGroongaOutput.new(emitter) - end - end - class Emitter def initialize(client, table) @client = client @@ -105,60 +110,32 @@ module Fluent end end - class HTTPClient + class NetworkClient include Configurable config_param :host, :string, :default => nil config_param :port, :integer, :default => nil - def start - @loop = Coolio::Loop.new - end - - def shutdown - end - - def send(command) - client = GroongaHTTPClient.connect(@host, @port) - client.request("GET", command.to_uri_format) - @loop.attach(client) - @loop.run + def initialize(protocol) + super() + @protocol = protocol end - class GroongaHTTPClient < Coolio::HttpClient - def on_body_data(data) - end - end - end - - class GQTPClient - include Configurable - - config_param :host, :string, :default => nil - config_param :port, :integer, :default => nil - def start - @loop = Coolio::Loop.new @client = nil end def shutdown return if****@clien*****? - @client.close do - @loop.stop - end - @loop.run + @client.close end def send(command) - @client ||= GQTP::Client.new(:address => @host, - :port => @port, - :connection => :coolio, - :loop => @loop) - @client.send(command.to_command_format) do |header, body| - @loop.stop - end - @loop.run + @client ||= Groonga::Client.new(:protocol => @protocol, + :host => @host, + :port => @port, + :backend => :synchronous) + @client.execute(command) end end @@ -181,13 +158,13 @@ module Fluent def start run_groonga - wrap_io end def shutdown - @groonga_input.close - @groonga_output.close - @groonga_error.close + @input.close + read_output("shutdown") + @output.close + @error.close Process.waitpid(@pid) end @@ -196,27 +173,29 @@ module Fluent if command.name == "load" body = command.arguments.delete(:values) end - @groonga_input.write("#{command.to_uri_format}\n") + uri = command.to_uri_format + @input.write("#{uri}\n") if body body.each_line do |line| - @groonga_input.write("#{line}\n") + @input.write("#{line}\n") end end - @loop.run + @input.flush + read_output(uri) end private def run_groonga env = {} - @input = IO.pipe("ASCII-8BIT") - @output = IO.pipe("ASCII-8BIT") - @error = IO.pipe("ASCII-8BIT") - input_fd = @input[0].to_i - output_fd = @output[1].to_i + input = IO.pipe("ASCII-8BIT") + output = IO.pipe("ASCII-8BIT") + error = IO.pipe("ASCII-8BIT") + input_fd = input[0].to_i + output_fd = output[1].to_i options = { input_fd => input_fd, output_fd => output_fd, - :err => @error[1], + :err => error[1], } arguments = @arguments arguments += [ @@ -229,27 +208,42 @@ module Fluent end arguments << @database @pid = spawn(env, @groonga, *arguments, options) - @input[0].close - @output[1].close - @error[1].close - end - - def wrap_io - @loop = Coolio::Loop.new + input[0].close + @input = input[1] + output[1].close + @output = output[0] + error[1].close + @error = error[0] + end + + def read_output(context) + output_message = "" + error_message = "" + + loop do + readables = IO.select([@output, @error], nil, nil, 0) + break if readables.nil? + + readables.each do |readable| + case readable + when @output + output_message << @output.gets + when @error + error_message << @error.gets + end + end + end - @groonga_input = Coolio::IO.new(@input[1]) - on_write_complete = lambda do - @loop.stop + unless output_message.empty? + Engine.log.debug("[output][groonga][output]", + :context => context, + :message => output_message) end - @groonga_input.on_write_complete do - on_write_complete.call + unless error_message.empty? + Engine.log.error("[output][groonga][error]", + :context => context, + :message => error_message) end - @groonga_output = Coolio::IO.new(@output[0]) - @groonga_error = Coolio::IO.new(@error[0]) - - @loop.attach(@groonga_input) - @loop.attach(@groonga_output) - @loop.attach(@groonga_error) end end end Modified: test/test_output.rb (+14 -15) =================================================================== --- test/test_output.rb 2014-10-15 18:20:50 +0900 (6293830) +++ test/test_output.rb 2014-10-15 18:21:20 +0900 (202d37b) @@ -47,10 +47,13 @@ EOC class HTTPTest < self setup :before => :append def setup_real_server + @request_parser = HTTP::Parser.new + @request_body = "" + @response_body = nil + @real_host = "127.0.0.1" @real_port = 29292 - @real_server_pid = fork do - exit + @real_server_thread = Thread.new do real_server = TCPServer.new(@real_host, @real_port) response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log) real_response = WEBrick::HTTPResponse.new(response_config) @@ -58,14 +61,11 @@ EOC request_body = "" client = real_server.accept real_server.close - parser = HTTP::Parser.new - parser.on_headers_complete = lambda do |headers| - request_headers = headers - end - parser.on_body = lambda do |chunk| - request_body << chunk + @request_parser.on_body = lambda do |chunk| + @request_body << chunk end - parser.on_message_complete = lambda do + @request_parser.on_message_complete = lambda do + real_response.body = @response_body real_response.send_response(client) client.close end @@ -74,16 +74,14 @@ EOC break if client.closed? data = client.readpartial(4096) break if data.nil? - parser << data + @request_parser << data end end end teardown def teardown_real_server - Process.kill(:INT, @real_server_pid) - Process.kill(:KILL, @real_server_pid) - Process.waitpid(@real_server_pid) + @real_server_thread.join end def configuration @@ -96,13 +94,14 @@ EOC class CommandTest < self def test_basic_command + @response_body = JSON.generate([[0, 0.0, 0.0], true]) driver = create_driver("groonga.command.table_create") time = Time.parse("2012-10-26T08:45:42Z").to_i driver.run do driver.emit({"name" => "Users"}, time) end - # p @request_headers - # p @request_body + assert_equal("/d/table_create?name=Users", + @request_parser.request_url) end end end -------------- next part -------------- HTML����������������������������... Download