[Groonga-commit] droonga/droonga-engine at d0f59ff [master] Add droonga-engine command

Back to archive index

Kouhei Sutou null+****@clear*****
Wed Apr 16 19:21:53 JST 2014


Kouhei Sutou	2014-04-16 19:21:53 +0900 (Wed, 16 Apr 2014)

  New Revision: d0f59ff4b3cefad2f5d79ea8c495b1ede98ad632
  https://github.com/droonga/droonga-engine/commit/d0f59ff4b3cefad2f5d79ea8c495b1ede98ad632

  Message:
    Add droonga-engine command
    
    It is not worked yet. Sorry.

  Added files:
    bin/droonga-engine
    lib/droonga/fluent_message_receiver.rb

  Added: bin/droonga-engine (+61 -0) 100755
===================================================================
--- /dev/null
+++ bin/droonga-engine    2014-04-16 19:21:53 +0900 (57e6911)
@@ -0,0 +1,61 @@
+#!/usr/bin/env ruby
+#
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/engine"
+require "droonga/event_loop"
+require "droonga/fluent_message_receiver"
+require "droonga/plugin_loader"
+
+Droonga::PluginLoader.load_all
+
+raw_loop = Coolio::Loop.default
+loop = Droonga::EventLoop.new(raw_loop)
+
+engine = Droonga::Engine.new(:name => "droonga")
+engine.start
+
+receiver = Droonga::FluentMessageReceiver.new(loop) do |tag, time, record|
+  prefix, type, *arguments = tag.split(/\./)
+  if type.nil? or type.empty? or type == "message"
+    message = record
+  else
+    message = {
+      "type" => type,
+      "arguments" => arguments,
+      "body" => record
+    }
+  end
+  reply_to = message["replyTo"]
+  if reply_to.is_a? String
+    message["replyTo"] = {
+      "type" => "#{message["type"]}.result",
+      "to" => reply_to
+    }
+  end
+  message
+
+  engine.process(message)
+end
+receiver.start
+
+begin
+  raw_loop.run
+rescue Interrupt
+end
+
+receiver.shutdown
+engine.shutdown

  Added: lib/droonga/fluent_message_receiver.rb (+191 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/fluent_message_receiver.rb    2014-04-16 19:21:53 +0900 (db14b8a)
@@ -0,0 +1,191 @@
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "socket"
+require "ipaddr"
+
+require "msgpack"
+
+require "droonga/loggable"
+require "droonga/event_loop"
+
+module Droonga
+  class FluentMessageReceiver
+    include Loggable
+
+    def initialize(loop, options={}, &on_message)
+      @loop = loop
+      @host = options[:host] || "0.0.0.0"
+      @port = options[:port] || 24224
+      @server = nil
+      @clients = []
+      @on_message = on_message
+    end
+
+    def start
+      logger.trace("start: start")
+      start_heartbeat_receiver
+      start_server
+      logger.trace("start: done")
+    end
+
+    def shutdown
+      logger.trace("shutdown: start")
+      shutdown_server
+      shutdown_clients
+      shutdown_heartbeat_receiver
+      logger.trace("shutdown: done")
+    end
+
+    private
+    def start_heartbeat_receiver
+      logger.trace("start_heartbeat_receiver: start")
+      @heartbeat_receiver = HeartbeatReceiver.new(@loop, @host, @port)
+      @heartbeat_receiver.start
+      logger.trace("start_heartbeat_receiver: done")
+    end
+
+    def shutdown_heartbeat_receiver
+      logger.trace("shutdown_heartbeat_receiver: start")
+      @heartbeat_receiver.shutdown
+      logger.trace("shutdown_heartbeat_receiver: done")
+    end
+
+    def start_server
+      logger.trace("start_server: start")
+
+      @clients = []
+      @server = Coolio::TCPServer.new(@host, @port) do |connection|
+        client = Client.new(connection) do |tag, time, record|
+          @on_message.call(tag, time, record)
+        end
+        @clients << client
+      end
+      @loop.attach(@server)
+
+      logger.trace("start_server: done")
+    end
+
+    def shutdown_server
+      @server.close
+    end
+
+    def shutdown_clients
+      @clients.each do |client|
+        client.close
+      end
+    end
+
+    def log_tag
+      "fluent-message-receiver"
+    end
+
+    class HeartbeatReceiver
+      def initialize(loop, host, port)
+        @loop = loop
+        @host = host
+        @port = port
+      end
+
+      def start
+        @socket = UDPSocket.new(address_family)
+        @socket.bind(@host, @port)
+
+        @watcher = Coolio::IOWatcher.new(@socket, "r")
+        on_readable = lambda do
+          receive_heartbeat
+        end
+        @watcher.on_readable do
+          on_readable.call
+        end
+        @loop.attach(@watcher)
+      end
+
+      def shutdown
+        @watcher.detach
+        @socket.close
+      end
+
+      private
+      def address_family
+        ip_address = IPAddr.new(IPSocket.getaddress(@host))
+        ip_address.family
+      end
+
+      def receive_heartbeat
+        address = nil
+        begin
+          _, address =****@socke*****(1024)
+        rescue SystamCallError
+          return
+        end
+
+        host = address[3]
+        port = address[1]
+        send_back_heartbeat(host, port)
+      end
+
+      def send_back_heartbeat(host, port)
+        data = "\0"
+        flags = 0
+        begin
+          @socket.send(data, flags, host, port)
+        rescue SystemCallError
+        end
+      end
+    end
+
+    class Client
+      include Loggable
+
+      def initialize(io, &on_message)
+        @io = io
+        @on_message = on_message
+        @unpacker = MessagePack::Unpacker.new
+        on_read = lambda do |data|
+          feed(data)
+        end
+        @io.on_read do |data|
+          on_read.call(data)
+        end
+      end
+
+      private
+      def feed(data)
+        @unpacker.feed_each(data) do |object|
+          tag = object[0]
+          case object[1]
+          when String # PackedForward message
+            entries = MessagePack.unpack(object[1])
+          when Array # Forward message
+            entries = object[1]
+          when Integer # Message message
+            entries = [[object[1], object[2]]]
+          else
+            logger.error("unknown message", :message => object)
+            next
+          end
+          entries.each do |time, record|
+            @on_message.call(tag, time || Time.now.to_i, record)
+          end
+        end
+      end
+
+      def log_tag
+        "fluent-message-receiver::client"
+      end
+    end
+  end
+end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index