Kouhei Sutou
null+****@clear*****
Fri Nov 22 15:49:01 JST 2013
Kouhei Sutou 2013-11-22 15:49:01 +0900 (Fri, 22 Nov 2013) New Revision: aaf1da5d224e88efaf5bd233c7aba94937475e45 https://github.com/droonga/fluent-plugin-droonga/commit/aaf1da5d224e88efaf5bd233c7aba94937475e45 Message: Create Partition taht is extracted from Engine Added files: lib/droonga/partition.rb Modified files: lib/droonga/catalog.rb lib/droonga/dispatcher.rb test/unit/test_catalog.rb Modified: lib/droonga/catalog.rb (+1 -1) =================================================================== --- lib/droonga/catalog.rb 2013-11-22 09:56:23 +0900 (81ae561) +++ lib/droonga/catalog.rb 2013-11-22 15:49:01 +0900 (b1e58f0) @@ -63,7 +63,7 @@ module Droonga @options[name] end - def get_engines(name) + def get_partitions(name) device = @catalog["farms"][name]["device"] pattern = Regexp.new("^#{name}\.") results = {} Modified: lib/droonga/dispatcher.rb (+9 -9) =================================================================== --- lib/droonga/dispatcher.rb 2013-11-22 09:56:23 +0900 (4b8a07c) +++ lib/droonga/dispatcher.rb 2013-11-22 15:49:01 +0900 (37b0602) @@ -20,17 +20,17 @@ require "droonga/handler" require "droonga/adapter" require "droonga/catalog" require "droonga/collector" +require "droonga/partition" module Droonga class Dispatcher attr_reader :collectors def initialize(worker, name) - @engines = {} - Droonga.catalog.get_engines(name).each do |name, options| - engine = Droonga::Engine.new(options.merge(:standalone => true, - :with_server => false)) - engine.start - @engines[name] = engine + @partitions = {} + Droonga.catalog.get_partitions(name).each do |name, options| + partition = Droonga::Partition.new(options) + partition.start + @partitions[name] = partition end @worker = worker @name = name @@ -44,8 +44,8 @@ module Droonga end def shutdown - @engines.each do |name, engine| - engine.shutdown + @partitions.each do |name, partition| + partition.shutdown end end @@ -97,7 +97,7 @@ module Droonga post(message, "type" => type, "synchronous"=> synchronous) else envelope =****@worke*****("body" => message, "type" => type) - @engines[route].emit('', Time.now.to_f, envelope, synchronous) + @partitions[route].emit('', Time.now.to_f, envelope, synchronous) end end Added: lib/droonga/partition.rb (+81 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/partition.rb 2013-11-22 15:49:01 +0900 (2eb62fe) @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "serverengine" + +require "droonga/server" +require "droonga/worker" +require "droonga/executor" + +module Droonga + class Partition + DEFAULT_OPTIONS = { + :queue_name => "DroongaQueue", + :n_workers => 0, + } + + def initialize(options={}) + @options = DEFAULT_OPTIONS.merge(options) + end + + def start + if @options[:database] && !@options[:database].empty? + Droonga::JobQueue.ensure_schema(@options[:database], + @options[:queue_name]) + end + start_supervisor if @options[:n_workers] > 0 + @executor = Executor.new(@options.merge(:standalone => true)) + end + + def shutdown + $log.trace("partition: shutdown: start") + @executor.shutdown if @executor + shutdown_supervisor if @supervisor + $log.trace("partition: shutdown: done") + end + + def emit(tag, time, record, synchronous=nil) + $log.trace("[#{Process.pid}] tag: <#{tag}> caller: <#{caller.first}>") + @executor.dispatch(tag, time, record, synchronous) + end + + private + def start_supervisor + @supervisor = ServerEngine::Supervisor.new(Server, Worker) do + force_options = { + :worker_type => "process", + :workers => @options[:n_workers], + :log_level => $log.level, + :server_process_name => "Server[#{@options[:database]}] #$0", + :worker_process_name => "Worker[#{@options[:database]}] #$0" + } + @options.merge(force_options) + end + @supervisor_thread = Thread.new do + @supervisor.main + end + end + + def shutdown_supervisor + $log.trace("supervisor: shutdown: start") + @supervisor.stop(true) + $log.trace("supervisor: shutdown: stopped") + @supervisor_thread.join + $log.trace("supervisor: shutdown: done") + end + end +end Modified: test/unit/test_catalog.rb (+3 -3) =================================================================== --- test/unit/test_catalog.rb 2013-11-22 09:56:23 +0900 (57688eb) +++ test/unit/test_catalog.rb 2013-11-22 15:49:01 +0900 (16cb31a) @@ -24,8 +24,8 @@ class CatalogTest < Test::Unit::TestCase assert_equal(["for_global"], @catalog.option("plugins")) end - def test_get_engines - engines =****@catal*****_engines("localhost:23003/test") + def test_get_partitions + partitions =****@catal*****_partitions("localhost:23003/test") base_path = File.expand_path("../fixtures", __FILE__) assert_equal({ "localhost:23003/test.000" => { @@ -49,7 +49,7 @@ class CatalogTest < Test::Unit::TestCase :n_workers => 0 }, }, - engines) + partitions) end private -------------- next part -------------- HTML����������������������������...Download