YUKI Hiroshi
null+****@clear*****
Fri Apr 10 17:04:29 JST 2015
YUKI Hiroshi 2015-04-10 17:04:29 +0900 (Fri, 10 Apr 2015) New Revision: d02310ccb02bd498021a5dc08af57bc659a7a09b https://github.com/droonga/droonga-engine/commit/d02310ccb02bd498021a5dc08af57bc659a7a09b Message: Re-implement system.absorb-data based on AsyncCommand Modified files: lib/droonga/plugins/system/absorb_data.rb Modified: lib/droonga/plugins/system/absorb_data.rb (+62 -37) =================================================================== --- lib/droonga/plugins/system/absorb_data.rb 2015-04-10 17:04:01 +0900 (01732a3) +++ lib/droonga/plugins/system/absorb_data.rb 2015-04-10 17:04:29 +0900 (27adb7e) @@ -14,7 +14,9 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA require "droonga/plugin" +require "droonga/plugin/async_command" require "droonga/catalog/dataset" +require "droonga/serf" require "droonga/node_name" require "drndump/dumper" @@ -22,7 +24,7 @@ require "drndump/dumper" module Droonga module Plugins module System - class AbsorbDataHandler < Droonga::Handler + class AbsorbDataHandler < AsyncCommand::Handler action.synchronous = true DEFAULT_MESSAGES_PER_SECOND = 100 @@ -33,62 +35,85 @@ module Droonga end end - class DumpFailed < InternalServerError - def initialize(error) - super("source node returns an error.", - error) + class DataAbsorber + private + def prefix + "system.absorb-data" end - end - def handle(message) - raise MissingHostParameter.new unless message.include?("host") + def error_name + "AbsorbFailure" + end + + def error_message + "failed to absorb data" + end + + def handle + dumper = Drndump::Dumper.new(dumper_params) + + serf = Serf.new(my_node_name) + serf.set_tag("absorbing", true) - dumper = Drndump::Dumper.new(dumper_params(message)) + dumper_error_message = dumper.run do |message| + @messenger.forward(message, + "to" => my_node_name, + "type" => message["type"]) + forward("#{prefix}.progress") + end - serf = Serf.new(my_node_name) - serf.set_tag("absorbing", true) + serf.set_tag("absorbing", true) - error_message = dumper.run do |message| - @messenger.forward(message, - "to" => my_node_name, - "type" => message["type"]) + if dumper_error_message + error(error_name, dumper_error_message) + end end - serf.set_tag("absorbing", true) + private + def dumper_params + params =****@reque***** + { + :host => params["host"], + :port => params["port"] || NodeName::DEFAULT_PORT, + :tag => params["tag"] || NodeName::DEFAULT_TAG, + :dataset => params["dataset"] || Catalog::Dataset::DEFAULT_NAME, - raise DumpFailed.new(error_message) if error_message + :receiver_host => myself.host, + :receiver_port => 0, - true - end + :messages_per_second => params["messagesPerSecond"] || DEFAULT_MESSAGES_PER_SECOND, + } + end - private - def dumper_params(message) - { - :host => message["host"], - :port => message["port"] || NodeName::DEFAULT_PORT, - :tag => message["tag"] || NodeName::DEFAULT_TAG, - :dataset => message["dataset"] || Catalog::Dataset::DEFAULT_NAME, - - :receiver_host => myself.host, - :receiver_port => 0, - - :messages_per_second => message["messagesPerSecond"] || DEFAULT_MESSAGES_PER_SECOND, - } + def myself + @myself ||= NodeName.parse(my_node_name) + end + + def my_node_name + @messenger.engine_state.name + end + + def log_tag + "[#{Process.ppid}] data-absorber" + end end - def myself - @myself ||= NodeName.parse(my_node_name) + def handle(message) + raise MissingHostParameter.new unless message.include?("host") + super end - def my_node_name - @messenger.engine_state.name + private + def start(request) + absorber = DataAbsorber.new(loop, messenger, request) + absorber.start end end define_single_step do |step| step.name = "system.absorb-data" step.handler = AbsorbDataHandler - step.collector = Collectors::Or + step.collector = Collectors::And end end end -------------- next part -------------- HTML����������������������������...Download