YUKI Hiroshi
null+****@clear*****
Fri Apr 10 18:27:40 JST 2015
YUKI Hiroshi 2015-04-10 18:27:40 +0900 (Fri, 10 Apr 2015) New Revision: 4347bb0562e18f9885058f2879b81f3bdcda6638 https://github.com/droonga/droonga-engine/commit/4347bb0562e18f9885058f2879b81f3bdcda6638 Message: Generate messages for progress by the handler of system.absorb-data Modified files: lib/droonga/data_absorber.rb lib/droonga/plugins/system/absorb_data.rb Modified: lib/droonga/data_absorber.rb (+5 -90) =================================================================== --- lib/droonga/data_absorber.rb 2015-04-10 18:16:19 +0900 (db7688a) +++ lib/droonga/data_absorber.rb 2015-04-10 18:27:40 +0900 (2e9ef0e) @@ -22,12 +22,6 @@ module Droonga class DataAbsorber include Loggable - class EmptyResponse < StandardError - end - - class EmptyBody < StandardError - end - DEFAULT_MESSAGES_PER_SECOND = 100 TIME_UNKNOWN = -1 @@ -59,13 +53,9 @@ module Droonga @receiver_port = @params[:receiver_port] - @destination_client_options = @params[:client_options] || {} + @client_options = @params[:client_options] || {} @error_message = nil - - #XXX We must instantiate the number of total soruce records before absorbing, - # because parallel commands while doing "dump" can be timed out. - @total_n_source_records = count_total_n_source_records end def run @@ -96,10 +86,9 @@ module Droonga end when "system.absorb-data.progress" body = message["body"] - @n_prosessed_messages = body["nProcessedMessages"] - yield(:n_processed_messages => @n_processed_messages, - :percentage => progress_percentage, - :message => progress_message) + yield(:n_processed_messages => body["nProcessedMessages"], + :percentage => body["percentage"], + :message => body["message"]) when "system.absorb-data.start" n_absorbers += 1 when "system.absorb-data.end" @@ -110,39 +99,6 @@ module Droonga end end - ONE_MINUTE_IN_SECONDS = 60 - ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60 - - def progress_percentage - progress = @n_prosessed_messages / @total_n_source_records - [(progress * 100).to_i, 100].min - end - - def progress_message - n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max - - remaining_seconds = n_remaining_records / @messages_per_second - remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor - remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS - remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor - remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS - remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds) - - "#{progress_percentage}% done (maybe #{remaining_time} remaining)" - end - - def source_client - options = { - :host => @source_host, - :port => @port, - :tag => @tag, - :protocol => :droonga, - :receiver_host => @receiver_host, - :receiver_port => 0, - } - @source_client ||= Droonga::Client.new(options) - end - def destination_client options = { :host => @destination_host, @@ -151,7 +107,7 @@ module Droonga :protocol => :droonga, :receiver_host => @receiver_host, :receiver_port => 0, - }.merge(@destination_client_options) + }.merge(@client_options) @destination_client ||= Droonga::Client.new(options) end @@ -160,47 +116,6 @@ module Droonga end private - def source_tables - response = source_client.request("dataset" => @dataset, - "type" => "table_list") - - raise EmptyResponse.new("table_list") unless response - raise EmptyBody.new("table_list") unless response["body"] - - message_body = response["body"] - body = message_body[1] - tables = body[1..-1] - tables.collect do |table| - table[1] - end - end - - def count_total_n_source_records - queries = {} - source_tables.each do |table| - queries["n_records_of_#{table}"] = { - "source" => table, - "output" => { - "elements" => ["count"], - }, - } - end - response = source_client.request("dataset" => @dataset, - "type" => "search", - "body" => { - "queries" => queries, - }) - - raise EmptyResponse.new("search") unless response - raise EmptyBody.new("search") unless response["body"] - - n_records = 0 - response["body"].each do |query_name, result| - n_records += result["count"] - end - n_records - end - def source_replica_hosts @source_replica_hosts ||= get_source_replica_hosts end Modified: lib/droonga/plugins/system/absorb_data.rb (+122 -11) =================================================================== --- lib/droonga/plugins/system/absorb_data.rb 2015-04-10 18:16:19 +0900 (97fe6e9) +++ lib/droonga/plugins/system/absorb_data.rb 2015-04-10 18:27:40 +0900 (14ce71f) @@ -36,6 +36,12 @@ module Droonga end class DataAbsorber < AsyncCommand::AsyncHandler + class EmptyResponse < StandardError + end + + class EmptyBody < StandardError + end + private def prefix "system.absorb-data" @@ -55,16 +61,21 @@ module Droonga serf = Serf.new(my_node_name) serf.set_tag("absorbing", true) - count = 0 + begin + @total_n_source_records = count_total_n_source_records + @n_processed_messages = 0 dumper_error_message = dumper.run do |message| @messenger.forward(message, "to" => my_node_name, "type" => message["type"]) - count += 1 - report_progress(count) + @n_processed_messages += 1 + report_progress end - forward("#{prefix}.progress", "count" => count) + report_progress + rescue Exception => exception + dumper_error_message = exception.to_s + end serf.set_tag("absorbing", true) @@ -76,10 +87,10 @@ module Droonga def dumper_params params =****@reque***** { - :host => params["host"], - :port => params["port"] || NodeName::DEFAULT_PORT, - :tag => params["tag"] || NodeName::DEFAULT_TAG, - :dataset => params["dataset"] || Catalog::Dataset::DEFAULT_NAME, + :host => source_host, + :port => source_port, + :tag => source_tag, + :dataset => source_dataset, :receiver_host => myself.host, :receiver_port => 0, @@ -88,9 +99,33 @@ module Droonga } end - def report_progress(count) - return unless (count % 100).zero? - forward("#{prefix}.progress", "count" => count) + def report_progress + return unless (@n_processed_messages % 100).zero? + forward("#{prefix}.progress", + "nProcessedMessages" => @n_processed_messages, + "percentage" => progress_percentage, + "message" => progress_message) + end + + def progress_percentage + progress = @n_prosessed_messages / @total_n_source_records + [(progress * 100).to_i, 100].min + end + + ONE_MINUTE_IN_SECONDS = 60 + ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60 + + def progress_message + n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max + + remaining_seconds = n_remaining_records / @messages_per_second + remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor + remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS + remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor + remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS + remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds) + + "#{progress_percentage}% done (maybe #{remaining_time} remaining)" end def myself @@ -101,6 +136,82 @@ module Droonga @messenger.engine_state.name end + def source_host + @source_host ||=****@reque*****["host"] + end + + def source_port + @source_port ||=****@reque*****["port"] || NodeName::DEFAULT_PORT + end + + def source_tag + @source_tag ||=****@reque*****["tag"] || NodeName::DEFAULT_TAG + end + + def source_dataset + @source_dataset ||=****@reque*****["dataset"] || Catalog::Dataset::DEFAULT_NAME + end + + def source_tables + response = source_client.request("dataset" => @dataset, + "type" => "table_list") + + raise EmptyResponse.new("table_list") unless response + raise EmptyBody.new("table_list") unless response["body"] + + message_body = response["body"] + body = message_body[1] + tables = body[1..-1] + tables.collect do |table| + table[1] + end + end + + def source_client_options + params =****@reque***** + options = { + :host => source_host, + :port => source_port, + :tag => source_tag, + :dataset => source_dataset, + + :protocol => :droonga, + + :receiver_host => myself.host, + :receiver_port => 0, + } + end + + def source_client + @source_client ||= Droonga::Client.new(source_client_options) + end + + def count_total_n_source_records + queries = {} + source_tables.each do |table| + queries["n_records_of_#{table}"] = { + "source" => table, + "output" => { + "elements" => ["count"], + }, + } + end + response = source_client.request("dataset" => @dataset, + "type" => "search", + "body" => { + "queries" => queries, + }) + + raise EmptyResponse.new("search") unless response + raise EmptyBody.new("search") unless response["body"] + + n_records = 0 + response["body"].each do |query_name, result| + n_records += result["count"] + end + n_records + end + def log_tag "[#{Process.ppid}] data-absorber" end -------------- next part -------------- HTML����������������������������...Download