[Groonga-commit] groonga/fluent-plugin-groonga [master] Work http -> gqtp stream

Back to archive index

Kouhei Sutou null+****@clear*****
Wed Nov 28 14:20:26 JST 2012


Kouhei Sutou	2012-11-28 14:20:26 +0900 (Wed, 28 Nov 2012)

  New Revision: 7d9fd226c156e278f71fcfc295bf75e2747a2553
  https://github.com/groonga/fluent-plugin-groonga/commit/7d9fd226c156e278f71fcfc295bf75e2747a2553

  Log:
    Work http -> gqtp stream
    
    Yay!

  Modified files:
    doc/text/fluent.conf
    lib/fluent/plugin/in_groonga.rb
    lib/fluent/plugin/out_groonga.rb

  Modified: doc/text/fluent.conf (+1 -0)
===================================================================
--- doc/text/fluent.conf    2012-11-26 23:22:35 +0900 (717a64e)
+++ doc/text/fluent.conf    2012-11-28 14:20:26 +0900 (98e6ad8)
@@ -13,4 +13,5 @@
   # database /tmp/groonga/db
   protocol http
   port 30041
+  flush_interval 1
 </match>

  Modified: lib/fluent/plugin/in_groonga.rb (+11 -2)
===================================================================
--- lib/fluent/plugin/in_groonga.rb    2012-11-26 23:22:35 +0900 (7960d3b)
+++ lib/fluent/plugin/in_groonga.rb    2012-11-28 14:20:26 +0900 (aa3c98d)
@@ -225,12 +225,17 @@ module Fluent
           @parser << data
           @repeater.write(data)
         end
+
+        def on_close
+          @parser.close
+        end
       end
 
       class Parser < GQTP::Parser
         def initialize(input)
           super()
           @input = input
+          initialize_command_parser
         end
 
         def on_body(chunk)
@@ -238,18 +243,22 @@ module Fluent
         end
 
         def on_complete
+          @command_parser << "\n"
+        end
+
+        def close
           @command_parser.finish
         end
 
         private
-        def reset
-          super
+        def initialize_command_parser
           @command_parser = Groonga::Command::Parser.new
           @command_parser.on_command do |command|
             @input.emit(command.name, command.arguments)
           end
           @command_parser.on_load_value do |command, value|
             arguments = command.arguments.dup
+            arguments[:columns] = command.columns.join(", ")
             arguments[:values] = Yajl::Encoder.encode([value])
             @input.emit(command.name, arguments)
           end

  Modified: lib/fluent/plugin/out_groonga.rb (+15 -10)
===================================================================
--- lib/fluent/plugin/out_groonga.rb    2012-11-26 23:22:35 +0900 (407a569)
+++ lib/fluent/plugin/out_groonga.rb    2012-11-28 14:20:26 +0900 (09888e2)
@@ -90,10 +90,21 @@ module Fluent
 
       def start
         @loop = Coolio::Loop.new
+        @queue = Queue.new
+        @thread = Thread.new do
+          loop do
+            path =****@queue*****
+            break if path.nil?
+            client = GroongaHTTPClient.connect(@host, @port)
+            client.request("GET", path)
+            @loop.attach(client)
+            @loop.run
+          end
+        end
       end
 
       def shutdown
-        @loop.stop
+        @queue.push(nil)
         @thread.join if @thread
       end
 
@@ -105,18 +116,12 @@ module Fluent
         unless http_arguments.empty?
           path << "?#{http_arguments.join('&')}"
         end
-        client = GroongaHTTPClient.connect(@host, @port)
-        client.request("GET", path)
-        @loop.attach(client)
-        if****@threa*****?
-          @thread = Thread.new do
-            @loop.run
-            @thread = nil
-          end
-        end
+        @queue.push(path)
       end
 
       class GroongaHTTPClient < Coolio::HttpClient
+        def on_body_data(data)
+        end
       end
     end
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index