[Groonga-commit] groonga/fluent-plugin-groonga at 2436e84 [master] out: use remove needless async code

Back to archive index

Kouhei Sutou null+****@clear*****
Wed Oct 15 18:21:20 JST 2014


Kouhei Sutou	2014-10-15 18:21:20 +0900 (Wed, 15 Oct 2014)

  New Revision: 2436e84e1f2a7be7c2c6a53510d9a6dd0efe3d40
  https://github.com/groonga/fluent-plugin-groonga/commit/2436e84e1f2a7be7c2c6a53510d9a6dd0efe3d40

  Message:
    out: use remove needless async code

  Modified files:
    fluent-plugin-groonga.gemspec
    lib/fluent/plugin/out_groonga.rb
    test/test_output.rb

  Modified: fluent-plugin-groonga.gemspec (+2 -2)
===================================================================
--- fluent-plugin-groonga.gemspec    2014-10-15 18:20:50 +0900 (bb1de8e)
+++ fluent-plugin-groonga.gemspec    2014-10-15 18:21:20 +0900 (acfeabd)
@@ -1,6 +1,6 @@
 # -*- mode: ruby; coding: utf-8 -*-
 #
-# Copyright (C) 2012-2013  Kouhei Sutou <kou �� clear-code.com>
+# Copyright (C) 2012-2014  Kouhei Sutou <kou �� clear-code.com>
 #
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -35,7 +35,7 @@ Gem::Specification.new do |spec|
   spec.require_paths = ["lib"]
 
   spec.add_runtime_dependency("fluentd")
-  spec.add_runtime_dependency("gqtp", ">= 1.0.3")
+  spec.add_runtime_dependency("groonga-client")
   spec.add_runtime_dependency("groonga-command-parser")
 
   spec.add_development_dependency("rake")

  Modified: lib/fluent/plugin/out_groonga.rb (+80 -86)
===================================================================
--- lib/fluent/plugin/out_groonga.rb    2014-10-15 18:20:50 +0900 (f4f6c83)
+++ lib/fluent/plugin/out_groonga.rb    2014-10-15 18:21:20 +0900 (cebdf0a)
@@ -17,15 +17,24 @@
 
 require "fileutils"
 
+require "groonga/client"
+
 module Fluent
-  class GroongaOutput < ObjectBufferedOutput
+  class GroongaOutput < BufferedOutput
     Plugin.register_output("groonga", self)
 
     def initialize
       super
     end
 
-    config_param :protocol, :string, :default => "http"
+    config_param :protocol, :default => :http do |value|
+      case value
+      when "http", "gqtp", "command"
+        value.to_sym
+      else
+        raise ConfigError, "must be http, gqtp or command: <#{value}>"
+      end
+    end
     config_param :table, :string, :default => nil
 
     def configure(conf)
@@ -46,31 +55,27 @@ module Fluent
       @client.shutdown
     end
 
-    def write_objects(tag, messages)
-      messages.each do |time, record|
+    def format(tag, time, record)
+      [tag, time, record].to_msgpack
+    end
+
+    def write(chunk)
+      chunk.msgpack_each do |message|
+        tag, _, record = message
         @emitter.emit(tag, record)
       end
     end
 
+    private
     def create_client(protocol)
       case protocol
-      when "http"
-        HTTPClient.new
-      when "gqtp"
-        GQTPClient.new
-      when "command"
+      when :http, :gqtp
+        NetworkClient.new(protocol)
+      when :command
         CommandClient.new
       end
     end
 
-    def create_output(buffer_type, emitter)
-      if buffer_type == "none"
-        RawGroongaOutput.new(emitter)
-      else
-        BufferedGroongaOutput.new(emitter)
-      end
-    end
-
     class Emitter
       def initialize(client, table)
         @client = client
@@ -105,60 +110,32 @@ module Fluent
       end
     end
 
-    class HTTPClient
+    class NetworkClient
       include Configurable
 
       config_param :host, :string, :default => nil
       config_param :port, :integer, :default => nil
 
-      def start
-        @loop = Coolio::Loop.new
-      end
-
-      def shutdown
-      end
-
-      def send(command)
-        client = GroongaHTTPClient.connect(@host, @port)
-        client.request("GET", command.to_uri_format)
-        @loop.attach(client)
-        @loop.run
+      def initialize(protocol)
+        super()
+        @protocol = protocol
       end
 
-      class GroongaHTTPClient < Coolio::HttpClient
-        def on_body_data(data)
-        end
-      end
-    end
-
-    class GQTPClient
-      include Configurable
-
-      config_param :host, :string, :default => nil
-      config_param :port, :integer, :default => nil
-
       def start
-        @loop = Coolio::Loop.new
         @client = nil
       end
 
       def shutdown
         return if****@clien*****?
-        @client.close do
-          @loop.stop
-        end
-        @loop.run
+        @client.close
       end
 
       def send(command)
-        @client ||= GQTP::Client.new(:address => @host,
-                                     :port => @port,
-                                     :connection => :coolio,
-                                     :loop => @loop)
-        @client.send(command.to_command_format) do |header, body|
-          @loop.stop
-        end
-        @loop.run
+        @client ||= Groonga::Client.new(:protocol => @protocol,
+                                        :host     => @host,
+                                        :port     => @port,
+                                        :backend  => :synchronous)
+        @client.execute(command)
       end
     end
 
@@ -181,13 +158,13 @@ module Fluent
 
       def start
         run_groonga
-        wrap_io
       end
 
       def shutdown
-        @groonga_input.close
-        @groonga_output.close
-        @groonga_error.close
+        @input.close
+        read_output("shutdown")
+        @output.close
+        @error.close
         Process.waitpid(@pid)
       end
 
@@ -196,27 +173,29 @@ module Fluent
         if command.name == "load"
           body = command.arguments.delete(:values)
         end
-        @groonga_input.write("#{command.to_uri_format}\n")
+        uri = command.to_uri_format
+        @input.write("#{uri}\n")
         if body
           body.each_line do |line|
-            @groonga_input.write("#{line}\n")
+            @input.write("#{line}\n")
           end
         end
-        @loop.run
+        @input.flush
+        read_output(uri)
       end
 
       private
       def run_groonga
         env = {}
-        @input = IO.pipe("ASCII-8BIT")
-        @output = IO.pipe("ASCII-8BIT")
-        @error = IO.pipe("ASCII-8BIT")
-        input_fd = @input[0].to_i
-        output_fd = @output[1].to_i
+        input = IO.pipe("ASCII-8BIT")
+        output = IO.pipe("ASCII-8BIT")
+        error = IO.pipe("ASCII-8BIT")
+        input_fd = input[0].to_i
+        output_fd = output[1].to_i
         options = {
           input_fd => input_fd,
           output_fd => output_fd,
-          :err => @error[1],
+          :err => error[1],
         }
         arguments = @arguments
         arguments += [
@@ -229,27 +208,42 @@ module Fluent
         end
         arguments << @database
         @pid = spawn(env, @groonga, *arguments, options)
-        @input[0].close
-        @output[1].close
-        @error[1].close
-      end
-
-      def wrap_io
-        @loop = Coolio::Loop.new
+        input[0].close
+        @input = input[1]
+        output[1].close
+        @output = output[0]
+        error[1].close
+        @error = error[0]
+      end
+
+      def read_output(context)
+        output_message = ""
+        error_message = ""
+
+        loop do
+          readables = IO.select([@output, @error], nil, nil, 0)
+          break if readables.nil?
+
+          readables.each do |readable|
+            case readable
+            when @output
+              output_message << @output.gets
+            when @error
+              error_message << @error.gets
+            end
+          end
+        end
 
-        @groonga_input = Coolio::IO.new(@input[1])
-        on_write_complete = lambda do
-          @loop.stop
+        unless output_message.empty?
+          Engine.log.debug("[output][groonga][output]",
+                           :context => context,
+                           :message => output_message)
         end
-        @groonga_input.on_write_complete do
-          on_write_complete.call
+        unless error_message.empty?
+          Engine.log.error("[output][groonga][error]",
+                           :context => context,
+                           :message => error_message)
         end
-        @groonga_output = Coolio::IO.new(@output[0])
-        @groonga_error = Coolio::IO.new(@error[0])
-
-        @loop.attach(@groonga_input)
-        @loop.attach(@groonga_output)
-        @loop.attach(@groonga_error)
       end
     end
   end

  Modified: test/test_output.rb (+14 -15)
===================================================================
--- test/test_output.rb    2014-10-15 18:20:50 +0900 (6293830)
+++ test/test_output.rb    2014-10-15 18:21:20 +0900 (202d37b)
@@ -47,10 +47,13 @@ EOC
   class HTTPTest < self
     setup :before => :append
     def setup_real_server
+      @request_parser = HTTP::Parser.new
+      @request_body = ""
+      @response_body = nil
+
       @real_host = "127.0.0.1"
       @real_port = 29292
-      @real_server_pid = fork do
-        exit
+      @real_server_thread = Thread.new do
         real_server = TCPServer.new(@real_host, @real_port)
         response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log)
         real_response = WEBrick::HTTPResponse.new(response_config)
@@ -58,14 +61,11 @@ EOC
         request_body = ""
         client = real_server.accept
         real_server.close
-        parser = HTTP::Parser.new
-        parser.on_headers_complete = lambda do |headers|
-          request_headers = headers
-        end
-        parser.on_body = lambda do |chunk|
-          request_body << chunk
+        @request_parser.on_body = lambda do |chunk|
+          @request_body << chunk
         end
-        parser.on_message_complete = lambda do
+        @request_parser.on_message_complete = lambda do
+          real_response.body = @response_body
           real_response.send_response(client)
           client.close
         end
@@ -74,16 +74,14 @@ EOC
           break if client.closed?
           data = client.readpartial(4096)
           break if data.nil?
-          parser << data
+          @request_parser << data
         end
       end
     end
 
     teardown
     def teardown_real_server
-      Process.kill(:INT, @real_server_pid)
-      Process.kill(:KILL, @real_server_pid)
-      Process.waitpid(@real_server_pid)
+      @real_server_thread.join
     end
 
     def configuration
@@ -96,13 +94,14 @@ EOC
 
     class CommandTest < self
       def test_basic_command
+        @response_body = JSON.generate([[0, 0.0, 0.0], true])
         driver = create_driver("groonga.command.table_create")
         time = Time.parse("2012-10-26T08:45:42Z").to_i
         driver.run do
           driver.emit({"name" => "Users"}, time)
         end
-        # p @request_headers
-        # p @request_body
+        assert_equal("/d/table_create?name=Users",
+                     @request_parser.request_url)
       end
     end
   end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index