Kouhei Sutou
null+****@clear*****
Fri Sep 2 16:31:49 JST 2016
Kouhei Sutou 2016-09-02 16:31:49 +0900 (Fri, 02 Sep 2016) New Revision: 42e788806a0a5e2cd9c4571d04ad5383ce3f7a8b https://github.com/groonga/fluent-plugin-groonga/commit/42e788806a0a5e2cd9c4571d04ad5383ce3f7a8b Message: Stop to emit when response isn't succeeded Modified files: lib/fluent/plugin/in_groonga.rb test/test_input.rb Modified: lib/fluent/plugin/in_groonga.rb (+89 -5) =================================================================== --- lib/fluent/plugin/in_groonga.rb 2016-06-05 15:54:20 +0900 (f8bf014) +++ lib/fluent/plugin/in_groonga.rb 2016-09-02 16:31:49 +0900 (403802b) @@ -155,8 +155,9 @@ module Fluent end def emit(command, params) - return unless emit_command?(command) - @input_plugin.router.emit("groonga.command.#{command}", + normalized_command = command.split(".")[0] + return unless emit_command?(normalized_command) + @input_plugin.router.emit("groonga.command.#{normalized_command}", Engine.now, params) end @@ -194,13 +195,17 @@ module Fluent end def on_connect - @parser = HTTP::Parser.new(self) @repeater =****@input*****_repeater(self) + @repeater.on_connect_failed do + close + end + @request_handler = RequestHandler.new(@input, @repeater) + @response_handler = ResponseHandler.new(self) end def on_read(data) begin - @parser << data + @request_handler << data rescue HTTP::Parser::Error $log.error("[input][groonga][error] " + "failed to parse HTTP request:", @@ -210,8 +215,43 @@ module Fluent end end + def write(data) + @response_handler << data + super + end + + def on_response_complete(response) + case response + when Array + return_code = response[0][0] + if return_code.zero? + @input.emit(@request_handler.command, + @request_handler.params) + end + end + on_write_complete do + @repeater.close + end + end + end + + class RequestHandler + attr_reader :command + attr_reader :params + def initialize(input, repeater) + @input = input + @repeater = repeater + @parser = Http::Parser.new(self) + end + + def <<(chunk) + @parser << chunk + end + def on_message_begin @body = "" + @command = nil + @params = nil end def on_headers_complete(headers) @@ -247,8 +287,52 @@ module Fluent if command == "load" params["values"] = @body unles****@body*****? end - @input.emit(command, params) + @command = command + @params = params + end + end + end + + class ResponseHandler + def initialize(handler) + @handler = handler + @parser = Http::Parser.new(self) + end + + def <<(chunk) + @parser << chunk + end + + def on_message_begin + @body = "" + @content_type = nil + end + + def on_headers_complete(headers) + headers.each do |name, value| + case name + when /\AContent-Type\z/i + @content_type = value + end + end + end + + def on_body(chunk) + @body << chunk + end + + def on_message_complete + case @content_type + when /\Aapplication\/json\z/ + response = JSON.parse(@body) + when /\Aapplication\/x-msgpack\z/ + response = MessagePack.unpack(@body) + when /\Atext\/x-groonga-command-list/ + response = @body + else + response = nil end + @handler.on_response_complete(response) end end end Modified: test/test_input.rb (+9 -2) =================================================================== --- test/test_input.rb 2016-06-05 15:54:20 +0900 (cc8b026) +++ test/test_input.rb 2016-09-02 16:31:49 +0900 (a6b5f89) @@ -101,11 +101,16 @@ EOC end def test_target_command + @real_response["Content-Type"] = "application/json" + @real_response.body = JSON.generate([[0, 0.0, 0.0], true]) @driver.expect_emit("groonga.command.table_create", @now, - {"name" => "Users"}) + { + "name" => "Users", + "flags" => "TABLE_NO_KEY", + }) @driver.run do - get("/d/table_create", "name" => "Users") + get("/d/table_create", "name" => "Users", "flags" => "TABLE_NO_KEY") assert_equal("200", @last_response.code) end end @@ -119,6 +124,8 @@ EOC end def test_load + @real_response["Content-Type"] = "application/json" + @real_response.body = JSON.generate([[0, 0.0, 0.0], 2]) json = <<-EOJ [ {"name": "Alice"}, -------------- next part -------------- HTML����������������������������... Download