Kouhei Sutou
null+****@clear*****
Thu Jan 16 18:31:53 JST 2014
Kouhei Sutou 2014-01-16 18:31:53 +0900 (Thu, 16 Jan 2014) New Revision: 9c3f99fd9f35a9f9a8df7927df657770e57e79bf https://github.com/droonga/droonga-client-ruby/commit/9c3f99fd9f35a9f9a8df7927df657770e57e79bf Message: Support infinite receive Modified files: lib/droonga/client/connection/droonga_protocol.rb Modified: lib/droonga/client/connection/droonga_protocol.rb (+65 -50) =================================================================== --- lib/droonga/client/connection/droonga_protocol.rb 2014-01-16 15:53:40 +0900 (5664102) +++ lib/droonga/client/connection/droonga_protocol.rb 2014-01-16 18:31:53 +0900 (6dc1f0e) @@ -36,17 +36,15 @@ module Droonga def initialize(options={}) default_options = { - :tag => "droonga", - :host => "127.0.0.1", - :port => 24224, - :connect_timeout => 1, - :read_timeout => 0.1, + :tag => "droonga", + :host => "127.0.0.1", + :port => 24224, + :timeout => 1, } options = default_options.merge(options) @logger = Fluent::Logger::FluentLogger.new(options.delete(:tag), options) - @connect_timeout = options[:connect_timeout] - @read_timeout = options[:read_timeout] + @timeout = options[:timeout] end # Sends a request message and receives one or more response @@ -81,27 +79,12 @@ module Droonga message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga" send(message, options) - connect_timeout = options[:connect_timeout] || @connect_timeout - read_timeout = options[:read_timeout] || @read_timeout - receive_options = { - :connect_timeout => connect_timeout, - :read_timeout => read_timeout - } sync = block.nil? if sync - begin - receiver.receive(receive_options) - ensure - receiver.close - end + receive(receiver, options, &block) else thread = Thread.new do - begin - response = receiver.receive(receive_options) - yield(response) if response - ensure - receiver.close - end + receive(receiver, options, &block) end Request.new(thread) end @@ -125,6 +108,29 @@ module Droonga @logger.close end + private + def receive(receiver, options) + timeout = options[:timeout] || @timeout + + receive_options = { + :timeout => timeout, + } + begin + responses = [] + receiver.receive(receive_options) do |response| + responses << response + end + response = responses.first + if block_given? + yield(response) + else + response + end + ensure + receiver.close + end + end + class Receiver def initialize(options={}) default_options = { @@ -133,10 +139,15 @@ module Droonga } options = default_options.merge(options) @socket = TCPServer.new(options[:host], options[:port]) + @read_ios = [@socket] + @client_handlers = {} end def close @socket.close + @client_handlers.each_key do |client| + client.close + end end def host @@ -148,37 +159,41 @@ module Droonga end BUFFER_SIZE = 8192 - def receive(options={}) - responses = [] - select(@socket, options[:connect_timeout]) do - client =****@socke***** - unpacker = MessagePack::Unpacker.new - select(client, options[:read_timeout]) do - data = client.read_nonblock(BUFFER_SIZE) - unpacker.feed_each(data) do |object| - responses << object - end + def receive(options={}, &block) + timeout = options[:timeout] || 1 + loop do + start = Time.new + readable_ios, = IO.select(@read_ios, nil, nil, timeout) + break if readable_ios.nil? + if timeout > 0 + timeout -= (Time.now - start) + timeout /= 2.0 + timeout = 0 if timeout < 0 + end + readable_ios.each do |readable_io| + on_readable(readable_io, &block) end - client.close end - # TODO: ENABLE ME - # if responses.size >= 2 - # responses - # else - responses.first - # end end private - def select(input, timeout) - loop do - start = Time.now - readables, = IO.select([input], nil, nil, timeout) - timeout -= (Time.now - start) - timeout /= 2.0 - timeout = 0 if timeout < 0 - break if readables.nil? - yield(timeout) + def on_readable(io) + case io + when @socket + client =****@socke***** + @read_ios << client + @client_handlers[client] = lambda do + unpacker = MessagePack::Unpacker.new + data = client.read_nonblock(BUFFER_SIZE) + unpacker.feed_each(data) do |object| + yield(object) + end + client.close + @read_ios.delete(client) + @client_handlers.delete(client) + end + else + @client_handlers[io].call end end end -------------- next part -------------- HTML����������������������������...Download