[Groonga-commit] droonga/droonga-client-ruby at 40a0201 [master] Implement Cool.io backend for Droonga protocol

Back to archive index

Kouhei Sutou null+****@clear*****
Thu Apr 10 17:09:13 JST 2014


Kouhei Sutou	2014-04-10 17:09:13 +0900 (Thu, 10 Apr 2014)

  New Revision: 40a0201338f6e7cfe0d3ea004e7b6d48261a3e46
  https://github.com/droonga/droonga-client-ruby/commit/40a0201338f6e7cfe0d3ea004e7b6d48261a3e46

  Message:
    Implement Cool.io backend for Droonga protocol

  Added files:
    lib/droonga/client/connection/droonga-protocol/coolio.rb
  Modified files:
    droonga-client.gemspec

  Modified: droonga-client.gemspec (+1 -0)
===================================================================
--- droonga-client.gemspec    2014-04-10 15:01:11 +0900 (6220797)
+++ droonga-client.gemspec    2014-04-10 17:09:13 +0900 (be5f1d2)
@@ -39,6 +39,7 @@ Gem::Specification.new do |spec|
   spec.add_runtime_dependency "fluent-logger"
   spec.add_runtime_dependency "rack"
   spec.add_runtime_dependency "yajl-ruby"
+  spec.add_runtime_dependency "droonga-message-pack-packer"
 
   spec.add_development_dependency "bundler", "~> 1.3"
   spec.add_development_dependency "rake"

  Added: lib/droonga/client/connection/droonga-protocol/coolio.rb (+232 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/client/connection/droonga-protocol/coolio.rb    2014-04-10 17:09:13 +0900 (9e4893b)
@@ -0,0 +1,232 @@
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "coolio"
+require "droonga/message-pack-packer"
+
+module Droonga
+  class Client
+    module Connection
+      class DroongaProtocol
+        class Coolio
+          class Request
+            def initialize(receiver, id, loop)
+              @receiver = receiver
+              @id = id
+              @loop = loop
+            end
+
+            def wait
+              return if****@recei*****?(@id)
+              until****@recei*****?(@id)
+                @loop.run_once
+              end
+            end
+          end
+
+          class InfiniteRequest
+            def initialize(loop)
+              @loop = loop
+            end
+
+            def wait
+              @loop.run
+            end
+          end
+
+          class Sender < ::Coolio::TCPSocket
+            def initialize(socket)
+              super(socket)
+            end
+
+            def send(tag, data)
+              fluent_message = [tag, Time.now.to_i, data]
+              packed_fluent_message = MessagePackPacker.pack(fluent_message)
+              write(packed_fluent_message)
+            end
+          end
+
+          class Receiver < ::Coolio::TCPServer
+            def initialize(*args)
+              super(*args) do |engine|
+                @engines << engine
+                handle_engine(engine)
+              end
+              @requests = {}
+              @engines = []
+            end
+
+            def close
+              super
+              @engines.each do |engine|
+                engine.close
+              end
+              @engines.clear
+            end
+
+            def host
+              @listen_socket.addr[3]
+            end
+
+            def port
+              @listen_socket.addr[1]
+            end
+
+            def droonga_name
+              "#{host}:#{port}/droonga"
+            end
+
+            def register(id, &callback)
+              @requests[id] = {
+                :received => false,
+                :callback => callback,
+              }
+            end
+
+            def unregister(id)
+              @requests.delete(id)
+            end
+
+            def received?(id)
+              if****@reque*****?(id)
+                @requests[id][:received]
+              else
+                true
+              end
+            end
+
+            private
+            def handle_engine(engine)
+              unpacker = MessagePack::Unpacker.new
+              on_read = lambda do |data|
+                unpacker.feed_each(data) do |fluent_message|
+                  tag, time, droonga_message = fluent_message
+                  id = droonga_message["inReplyTo"]
+                  request = @requests[id]
+                  next if request.nil?
+                  request[:received] = true
+                  request[:callback].call(fluent_message)
+                end
+              end
+              engine.on_read do |data|
+                on_read.call(data)
+              end
+
+              on_close = lambda do
+                @engines.delete(engine)
+              end
+              engine.on_close do
+                on_close.call
+              end
+            end
+          end
+
+          def initialize(host, port, tag, options={})
+            @host = host
+            @port = port
+            @tag = tag
+            default_options = {
+            }
+            @options = default_options.merge(options)
+            @loop = options[:loop] || ::Coolio::Loop.default
+
+            @sender = Sender.connect(@host, @port)
+            @sender.attach(@loop)
+            @receiver_host = @options[:receiver_host] || Socket.gethostname
+            @receiver_port = @options[:receiver_port] || 0
+            @receiver = Receiver.new(@receiver_host, @receiver_port)
+            @receiver.attach(@loop)
+          end
+
+          def request(message, options={}, &block)
+            id = message["id"] || generate_id
+            message = message.merge("id" => id,
+                                    "replyTo" => @receiver.droonga_name)
+            send(message, options)
+
+            sync = block.nil?
+            if sync
+              response = nil
+              block = lambda do |_response|
+                response = _response
+              end
+            end
+            @receiver.register(id) do
+              @receiver.unregister(id)
+              block.call
+            end
+            request = Request.new(@receiver, id, @loop)
+            if sync
+              request.wait
+              response
+            else
+              request
+            end
+          end
+
+          def subscribe(message, options={}, &block)
+            id = message["id"] || generate_id
+            message = message.merge("id" => id,
+                                    "from" => @receiver.droonga_name)
+            send(message, options)
+
+            request = InfiniteRequest.new(@loop)
+            sync = block.nil?
+            if sync
+              yielder = nil
+              buffer = []
+              @receiver.register(id) do |response|
+                if yielder
+                  while (old_response = buffer.shift)
+                    yielder << old_response
+                  end
+                  yielder << response
+                else
+                  buffer << response
+                end
+              end
+              Enumerator.new do |_yielder|
+                yielder = _yielder
+                request.wait
+              end
+            else
+              @receiver.register(id, &block)
+              request
+            end
+          end
+
+          def send(message, options={}, &block)
+            if message["id"].nil? or message["date"].nil?
+              id = message["id"] || generate_id
+              date = message["date"] || Time.now
+              message = message.merge("id" => id, "date" => date)
+            end
+            @sender.send("#{@tag}.message", message)
+          end
+
+          def close
+            @sender.close
+            @receiver.close
+          end
+
+          private
+          def generate_id
+            Time.now.to_f.to_s
+          end
+        end
+      end
+    end
+  end
+end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index