[Groonga-commit] droonga/droonga-client-ruby at 9c3f99f [master] Support infinite receive

Back to archive index

Kouhei Sutou null+****@clear*****
Thu Jan 16 18:31:53 JST 2014


Kouhei Sutou	2014-01-16 18:31:53 +0900 (Thu, 16 Jan 2014)

  New Revision: 9c3f99fd9f35a9f9a8df7927df657770e57e79bf
  https://github.com/droonga/droonga-client-ruby/commit/9c3f99fd9f35a9f9a8df7927df657770e57e79bf

  Message:
    Support infinite receive

  Modified files:
    lib/droonga/client/connection/droonga_protocol.rb

  Modified: lib/droonga/client/connection/droonga_protocol.rb (+65 -50)
===================================================================
--- lib/droonga/client/connection/droonga_protocol.rb    2014-01-16 15:53:40 +0900 (5664102)
+++ lib/droonga/client/connection/droonga_protocol.rb    2014-01-16 18:31:53 +0900 (6dc1f0e)
@@ -36,17 +36,15 @@ module Droonga
 
         def initialize(options={})
           default_options = {
-            :tag             => "droonga",
-            :host            => "127.0.0.1",
-            :port            => 24224,
-            :connect_timeout => 1,
-            :read_timeout    => 0.1,
+            :tag     => "droonga",
+            :host    => "127.0.0.1",
+            :port    => 24224,
+            :timeout => 1,
           }
           options = default_options.merge(options)
           @logger = Fluent::Logger::FluentLogger.new(options.delete(:tag),
                                                      options)
-          @connect_timeout = options[:connect_timeout]
-          @read_timeout = options[:read_timeout]
+          @timeout = options[:timeout]
         end
 
         # Sends a request message and receives one or more response
@@ -81,27 +79,12 @@ module Droonga
           message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga"
           send(message, options)
 
-          connect_timeout = options[:connect_timeout] || @connect_timeout
-          read_timeout = options[:read_timeout] || @read_timeout
-          receive_options = {
-            :connect_timeout => connect_timeout,
-            :read_timeout    => read_timeout
-          }
           sync = block.nil?
           if sync
-            begin
-              receiver.receive(receive_options)
-            ensure
-              receiver.close
-            end
+            receive(receiver, options, &block)
           else
             thread = Thread.new do
-              begin
-                response = receiver.receive(receive_options)
-                yield(response) if response
-              ensure
-                receiver.close
-              end
+              receive(receiver, options, &block)
             end
             Request.new(thread)
           end
@@ -125,6 +108,29 @@ module Droonga
           @logger.close
         end
 
+        private
+        def receive(receiver, options)
+          timeout = options[:timeout] || @timeout
+
+          receive_options = {
+            :timeout => timeout,
+          }
+          begin
+            responses = []
+            receiver.receive(receive_options) do |response|
+              responses << response
+            end
+            response = responses.first
+            if block_given?
+              yield(response)
+            else
+              response
+            end
+          ensure
+            receiver.close
+          end
+        end
+
         class Receiver
           def initialize(options={})
             default_options = {
@@ -133,10 +139,15 @@ module Droonga
             }
             options = default_options.merge(options)
             @socket = TCPServer.new(options[:host], options[:port])
+            @read_ios = [@socket]
+            @client_handlers = {}
           end
 
           def close
             @socket.close
+            @client_handlers.each_key do |client|
+              client.close
+            end
           end
 
           def host
@@ -148,37 +159,41 @@ module Droonga
           end
 
           BUFFER_SIZE = 8192
-          def receive(options={})
-            responses = []
-            select(@socket, options[:connect_timeout]) do
-              client =****@socke*****
-              unpacker = MessagePack::Unpacker.new
-              select(client, options[:read_timeout]) do
-                data = client.read_nonblock(BUFFER_SIZE)
-                unpacker.feed_each(data) do |object|
-                  responses << object
-                end
+          def receive(options={}, &block)
+            timeout = options[:timeout] || 1
+            loop do
+              start = Time.new
+              readable_ios, = IO.select(@read_ios, nil, nil, timeout)
+              break if readable_ios.nil?
+              if timeout > 0
+                timeout -= (Time.now - start)
+                timeout /= 2.0
+                timeout = 0 if timeout < 0
+              end
+              readable_ios.each do |readable_io|
+                on_readable(readable_io, &block)
               end
-              client.close
             end
-            # TODO: ENABLE ME
-            # if responses.size >= 2
-            #   responses
-            # else
-              responses.first
-            # end
           end
 
           private
-          def select(input, timeout)
-            loop do
-              start = Time.now
-              readables, = IO.select([input], nil, nil, timeout)
-              timeout -= (Time.now - start)
-              timeout /= 2.0
-              timeout = 0 if timeout < 0
-              break if readables.nil?
-              yield(timeout)
+          def on_readable(io)
+            case io
+            when @socket
+              client =****@socke*****
+              @read_ios << client
+              @client_handlers[client] = lambda do
+                unpacker = MessagePack::Unpacker.new
+                data = client.read_nonblock(BUFFER_SIZE)
+                unpacker.feed_each(data) do |object|
+                  yield(object)
+                end
+                client.close
+                @read_ios.delete(client)
+                @client_handlers.delete(client)
+              end
+            else
+              @client_handlers[io].call
             end
           end
         end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index