YUKI Hiroshi
null+****@clear*****
Tue Apr 21 22:05:09 JST 2015
YUKI Hiroshi 2015-04-21 22:05:09 +0900 (Tue, 21 Apr 2015) New Revision: 91c675bf801c8303930f3aafb490e03cdccb00b7 https://github.com/droonga/droonga-engine/commit/91c675bf801c8303930f3aafb490e03cdccb00b7 Message: Count total number of actually absorbed objects correctly Modified files: lib/droonga/plugins/system/absorb_data.rb Modified: lib/droonga/plugins/system/absorb_data.rb (+78 -11) =================================================================== --- lib/droonga/plugins/system/absorb_data.rb 2015-04-21 22:02:23 +0900 (934e29a) +++ lib/droonga/plugins/system/absorb_data.rb 2015-04-21 22:05:09 +0900 (c0d8a13) @@ -13,12 +13,13 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +require "fiber" + require "droonga/plugin" require "droonga/plugin/async_command" require "droonga/catalog/dataset" require "droonga/serf" require "droonga/node_name" -require "droonga/database_scanner" require "drndump/dump_client" @@ -37,23 +38,26 @@ module Droonga end class DataAbsorber < AsyncCommand::AsyncHandler - include DatabaseScanner - class EmptyResponse < StandardError end class EmptyBody < StandardError end - def initialize(context, loop, messenger, request) - @context = context - super(loop, messenger, request) - end - def start logger.trace("start: start") on_start + count_total_n_objects do |n_objects| + @initial_n_objects = n_objects + do_absorb + end + + logger.trace("start: done") + end + + def do_absorb + logger.trace("do_absorb: start") @dumper_error_message = nil @dumper = Drndump::DumpClient.new(dumper_params) @@ -108,7 +112,7 @@ module Droonga end on_finish if @dumper_error_message - logger.trace("start: done") + logger.trace("do_absorb: done") end private @@ -125,7 +129,70 @@ module Droonga end def ensure_completely_restored(&block) - yield + runner = Fiber.new do + completely_restored = false + n_expected_objects =****@dumpe*****_forecasted_messages + while not completely_restored + count_total_n_objects do |count| + n_restored_objects = count - @initial_n_objects + logger.trace("ensure_completely_restored: check", + :current => n_restored_objects, + :forecasted => n_expected_objects) + completely_restored ||= n_restored_objects == n_expected_objects + end + Fiber.yield + end + count_client.close + yield + end + + timer = Coolio::TimerWatcher.new(3, true) + timer.on_timer do + if runner.alive? + begin + runner.resume + rescue + timer.detach + # logger.trace("start: watcher detached on unexpected exception", + # :watcher => timer) + logger.exception(error_message, $!) + error(error_name, error_message) + end + else + timer.detach + # logger.trace("start: watcher detached on unexpected exception", + # :watcher => timer) + end + end + @loop.attach(timer) + end + + def count_total_n_objects(&block) + count_message = { + "type" => "system.object-count", + "dataset" => current_dataset, + "body" => { + "output" => ["total"], + }, + } + count_client.request(count_message) do |response| + yield(response["body"]["total"]) + end + end + + def count_client + @count_client ||= Droonga::Client.new(count_client_options) + end + + def count_client_options + { + :host => myself.host, + :port => myself.port, + :tag => myself.tag, + :protocol => :droonga, + :backend => :coolio, + :loop => @loop, + } end def on_finish @@ -234,7 +301,7 @@ module Droonga private def start(request) - absorber = DataAbsorber.new(@context, loop, messenger, request) + absorber = DataAbsorber.new(loop, messenger, request) absorber.start end end -------------- next part -------------- HTML����������������������������...Download