[Groonga-commit] droonga/fluent-plugin-droonga at aaf1da5 [master] Create Partition taht is extracted from Engine

Back to archive index

Kouhei Sutou null+****@clear*****
Fri Nov 22 15:49:01 JST 2013


Kouhei Sutou	2013-11-22 15:49:01 +0900 (Fri, 22 Nov 2013)

  New Revision: aaf1da5d224e88efaf5bd233c7aba94937475e45
  https://github.com/droonga/fluent-plugin-droonga/commit/aaf1da5d224e88efaf5bd233c7aba94937475e45

  Message:
    Create Partition taht is extracted from Engine

  Added files:
    lib/droonga/partition.rb
  Modified files:
    lib/droonga/catalog.rb
    lib/droonga/dispatcher.rb
    test/unit/test_catalog.rb

  Modified: lib/droonga/catalog.rb (+1 -1)
===================================================================
--- lib/droonga/catalog.rb    2013-11-22 09:56:23 +0900 (81ae561)
+++ lib/droonga/catalog.rb    2013-11-22 15:49:01 +0900 (b1e58f0)
@@ -63,7 +63,7 @@ module Droonga
       @options[name]
     end
 
-    def get_engines(name)
+    def get_partitions(name)
       device = @catalog["farms"][name]["device"]
       pattern = Regexp.new("^#{name}\.")
       results = {}

  Modified: lib/droonga/dispatcher.rb (+9 -9)
===================================================================
--- lib/droonga/dispatcher.rb    2013-11-22 09:56:23 +0900 (4b8a07c)
+++ lib/droonga/dispatcher.rb    2013-11-22 15:49:01 +0900 (37b0602)
@@ -20,17 +20,17 @@ require "droonga/handler"
 require "droonga/adapter"
 require "droonga/catalog"
 require "droonga/collector"
+require "droonga/partition"
 
 module Droonga
   class Dispatcher
     attr_reader :collectors
     def initialize(worker, name)
-      @engines = {}
-      Droonga.catalog.get_engines(name).each do |name, options|
-        engine = Droonga::Engine.new(options.merge(:standalone => true,
-                                                   :with_server => false))
-        engine.start
-        @engines[name] = engine
+      @partitions = {}
+      Droonga.catalog.get_partitions(name).each do |name, options|
+        partition = Droonga::Partition.new(options)
+        partition.start
+        @partitions[name] = partition
       end
       @worker = worker
       @name = name
@@ -44,8 +44,8 @@ module Droonga
     end
 
     def shutdown
-      @engines.each do |name, engine|
-        engine.shutdown
+      @partitions.each do |name, partition|
+        partition.shutdown
       end
     end
 
@@ -97,7 +97,7 @@ module Droonga
         post(message, "type" => type, "synchronous"=> synchronous)
       else
         envelope =****@worke*****("body" => message, "type" => type)
-        @engines[route].emit('', Time.now.to_f, envelope, synchronous)
+        @partitions[route].emit('', Time.now.to_f, envelope, synchronous)
       end
     end
 

  Added: lib/droonga/partition.rb (+81 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/partition.rb    2013-11-22 15:49:01 +0900 (2eb62fe)
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "serverengine"
+
+require "droonga/server"
+require "droonga/worker"
+require "droonga/executor"
+
+module Droonga
+  class Partition
+    DEFAULT_OPTIONS = {
+      :queue_name => "DroongaQueue",
+      :n_workers  => 0,
+    }
+
+    def initialize(options={})
+      @options = DEFAULT_OPTIONS.merge(options)
+    end
+
+    def start
+      if @options[:database] && !@options[:database].empty?
+        Droonga::JobQueue.ensure_schema(@options[:database],
+                                        @options[:queue_name])
+      end
+      start_supervisor if @options[:n_workers] > 0
+      @executor = Executor.new(@options.merge(:standalone => true))
+    end
+
+    def shutdown
+      $log.trace("partition: shutdown: start")
+      @executor.shutdown if @executor
+      shutdown_supervisor if @supervisor
+      $log.trace("partition: shutdown: done")
+    end
+
+    def emit(tag, time, record, synchronous=nil)
+      $log.trace("[#{Process.pid}] tag: <#{tag}> caller: <#{caller.first}>")
+      @executor.dispatch(tag, time, record, synchronous)
+    end
+
+    private
+    def start_supervisor
+      @supervisor = ServerEngine::Supervisor.new(Server, Worker) do
+        force_options = {
+          :worker_type   => "process",
+          :workers       => @options[:n_workers],
+          :log_level     => $log.level,
+          :server_process_name => "Server[#{@options[:database]}] #$0",
+          :worker_process_name => "Worker[#{@options[:database]}] #$0"
+        }
+        @options.merge(force_options)
+      end
+      @supervisor_thread = Thread.new do
+        @supervisor.main
+      end
+    end
+
+    def shutdown_supervisor
+      $log.trace("supervisor: shutdown: start")
+      @supervisor.stop(true)
+      $log.trace("supervisor: shutdown: stopped")
+      @supervisor_thread.join
+      $log.trace("supervisor: shutdown: done")
+    end
+  end
+end

  Modified: test/unit/test_catalog.rb (+3 -3)
===================================================================
--- test/unit/test_catalog.rb    2013-11-22 09:56:23 +0900 (57688eb)
+++ test/unit/test_catalog.rb    2013-11-22 15:49:01 +0900 (16cb31a)
@@ -24,8 +24,8 @@ class CatalogTest < Test::Unit::TestCase
     assert_equal(["for_global"], @catalog.option("plugins"))
   end
 
-  def test_get_engines
-    engines =****@catal*****_engines("localhost:23003/test")
+  def test_get_partitions
+    partitions =****@catal*****_partitions("localhost:23003/test")
     base_path = File.expand_path("../fixtures", __FILE__)
     assert_equal({
                    "localhost:23003/test.000" => {
@@ -49,7 +49,7 @@ class CatalogTest < Test::Unit::TestCase
                      :n_workers => 0
                    },
                  },
-                 engines)
+                 partitions)
   end
 
   private
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index