Kouhei Sutou
null+****@clear*****
Wed Nov 28 14:20:26 JST 2012
Kouhei Sutou 2012-11-28 14:20:26 +0900 (Wed, 28 Nov 2012) New Revision: 7d9fd226c156e278f71fcfc295bf75e2747a2553 https://github.com/groonga/fluent-plugin-groonga/commit/7d9fd226c156e278f71fcfc295bf75e2747a2553 Log: Work http -> gqtp stream Yay! Modified files: doc/text/fluent.conf lib/fluent/plugin/in_groonga.rb lib/fluent/plugin/out_groonga.rb Modified: doc/text/fluent.conf (+1 -0) =================================================================== --- doc/text/fluent.conf 2012-11-26 23:22:35 +0900 (717a64e) +++ doc/text/fluent.conf 2012-11-28 14:20:26 +0900 (98e6ad8) @@ -13,4 +13,5 @@ # database /tmp/groonga/db protocol http port 30041 + flush_interval 1 </match> Modified: lib/fluent/plugin/in_groonga.rb (+11 -2) =================================================================== --- lib/fluent/plugin/in_groonga.rb 2012-11-26 23:22:35 +0900 (7960d3b) +++ lib/fluent/plugin/in_groonga.rb 2012-11-28 14:20:26 +0900 (aa3c98d) @@ -225,12 +225,17 @@ module Fluent @parser << data @repeater.write(data) end + + def on_close + @parser.close + end end class Parser < GQTP::Parser def initialize(input) super() @input = input + initialize_command_parser end def on_body(chunk) @@ -238,18 +243,22 @@ module Fluent end def on_complete + @command_parser << "\n" + end + + def close @command_parser.finish end private - def reset - super + def initialize_command_parser @command_parser = Groonga::Command::Parser.new @command_parser.on_command do |command| @input.emit(command.name, command.arguments) end @command_parser.on_load_value do |command, value| arguments = command.arguments.dup + arguments[:columns] = command.columns.join(", ") arguments[:values] = Yajl::Encoder.encode([value]) @input.emit(command.name, arguments) end Modified: lib/fluent/plugin/out_groonga.rb (+15 -10) =================================================================== --- lib/fluent/plugin/out_groonga.rb 2012-11-26 23:22:35 +0900 (407a569) +++ lib/fluent/plugin/out_groonga.rb 2012-11-28 14:20:26 +0900 (09888e2) @@ -90,10 +90,21 @@ module Fluent def start @loop = Coolio::Loop.new + @queue = Queue.new + @thread = Thread.new do + loop do + path =****@queue***** + break if path.nil? + client = GroongaHTTPClient.connect(@host, @port) + client.request("GET", path) + @loop.attach(client) + @loop.run + end + end end def shutdown - @loop.stop + @queue.push(nil) @thread.join if @thread end @@ -105,18 +116,12 @@ module Fluent unless http_arguments.empty? path << "?#{http_arguments.join('&')}" end - client = GroongaHTTPClient.connect(@host, @port) - client.request("GET", path) - @loop.attach(client) - if****@threa*****? - @thread = Thread.new do - @loop.run - @thread = nil - end - end + @queue.push(path) end class GroongaHTTPClient < Coolio::HttpClient + def on_body_data(data) + end end end -------------- next part -------------- HTML����������������������������...Download