[Groonga-commit] groonga/fluent-plugin-groonga at 42e7888 [master] Stop to emit when response isn't succeeded

Back to archive index

Kouhei Sutou null+****@clear*****
Fri Sep 2 16:31:49 JST 2016


Kouhei Sutou	2016-09-02 16:31:49 +0900 (Fri, 02 Sep 2016)

  New Revision: 42e788806a0a5e2cd9c4571d04ad5383ce3f7a8b
  https://github.com/groonga/fluent-plugin-groonga/commit/42e788806a0a5e2cd9c4571d04ad5383ce3f7a8b

  Message:
    Stop to emit when response isn't succeeded

  Modified files:
    lib/fluent/plugin/in_groonga.rb
    test/test_input.rb

  Modified: lib/fluent/plugin/in_groonga.rb (+89 -5)
===================================================================
--- lib/fluent/plugin/in_groonga.rb    2016-06-05 15:54:20 +0900 (f8bf014)
+++ lib/fluent/plugin/in_groonga.rb    2016-09-02 16:31:49 +0900 (403802b)
@@ -155,8 +155,9 @@ module Fluent
       end
 
       def emit(command, params)
-        return unless emit_command?(command)
-        @input_plugin.router.emit("groonga.command.#{command}",
+        normalized_command = command.split(".")[0]
+        return unless emit_command?(normalized_command)
+        @input_plugin.router.emit("groonga.command.#{normalized_command}",
                                   Engine.now,
                                   params)
       end
@@ -194,13 +195,17 @@ module Fluent
         end
 
         def on_connect
-          @parser = HTTP::Parser.new(self)
           @repeater =****@input*****_repeater(self)
+          @repeater.on_connect_failed do
+            close
+          end
+          @request_handler = RequestHandler.new(@input, @repeater)
+          @response_handler = ResponseHandler.new(self)
         end
 
         def on_read(data)
           begin
-            @parser << data
+            @request_handler << data
           rescue HTTP::Parser::Error
             $log.error("[input][groonga][error] " +
                        "failed to parse HTTP request:",
@@ -210,8 +215,43 @@ module Fluent
           end
         end
 
+        def write(data)
+          @response_handler << data
+          super
+        end
+
+        def on_response_complete(response)
+          case response
+          when Array
+            return_code = response[0][0]
+            if return_code.zero?
+              @input.emit(@request_handler.command,
+                          @request_handler.params)
+            end
+          end
+          on_write_complete do
+            @repeater.close
+          end
+        end
+      end
+
+      class RequestHandler
+        attr_reader :command
+        attr_reader :params
+        def initialize(input, repeater)
+          @input = input
+          @repeater = repeater
+          @parser = Http::Parser.new(self)
+        end
+
+        def <<(chunk)
+          @parser << chunk
+        end
+
         def on_message_begin
           @body = ""
+          @command = nil
+          @params = nil
         end
 
         def on_headers_complete(headers)
@@ -247,8 +287,52 @@ module Fluent
             if command == "load"
               params["values"] = @body unles****@body*****?
             end
-            @input.emit(command, params)
+            @command = command
+            @params = params
+          end
+        end
+      end
+
+      class ResponseHandler
+        def initialize(handler)
+          @handler = handler
+          @parser = Http::Parser.new(self)
+        end
+
+        def <<(chunk)
+          @parser << chunk
+        end
+
+        def on_message_begin
+          @body = ""
+          @content_type = nil
+        end
+
+        def on_headers_complete(headers)
+          headers.each do |name, value|
+            case name
+            when /\AContent-Type\z/i
+              @content_type = value
+            end
+          end
+        end
+
+        def on_body(chunk)
+          @body << chunk
+        end
+
+        def on_message_complete
+          case @content_type
+          when /\Aapplication\/json\z/
+            response = JSON.parse(@body)
+          when /\Aapplication\/x-msgpack\z/
+            response = MessagePack.unpack(@body)
+          when /\Atext\/x-groonga-command-list/
+            response = @body
+          else
+            response = nil
           end
+          @handler.on_response_complete(response)
         end
       end
     end

  Modified: test/test_input.rb (+9 -2)
===================================================================
--- test/test_input.rb    2016-06-05 15:54:20 +0900 (cc8b026)
+++ test/test_input.rb    2016-09-02 16:31:49 +0900 (a6b5f89)
@@ -101,11 +101,16 @@ EOC
     end
 
     def test_target_command
+      @real_response["Content-Type"] = "application/json"
+      @real_response.body = JSON.generate([[0, 0.0, 0.0], true])
       @driver.expect_emit("groonga.command.table_create",
                           @now,
-                          {"name" => "Users"})
+                          {
+                            "name" => "Users",
+                            "flags" => "TABLE_NO_KEY",
+                          })
       @driver.run do
-        get("/d/table_create", "name" => "Users")
+        get("/d/table_create", "name" => "Users", "flags" => "TABLE_NO_KEY")
         assert_equal("200", @last_response.code)
       end
     end
@@ -119,6 +124,8 @@ EOC
     end
 
     def test_load
+      @real_response["Content-Type"] = "application/json"
+      @real_response.body = JSON.generate([[0, 0.0, 0.0], 2])
       json = <<-EOJ
 [
 {"name": "Alice"},
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index