YUKI Hiroshi
null+****@clear*****
Fri Apr 17 16:59:17 JST 2015
YUKI Hiroshi 2015-04-17 16:59:17 +0900 (Fri, 17 Apr 2015) New Revision: 44d9c3cd36ceff44681e7ea8b20f6799e96057ff https://github.com/droonga/droonga-engine/commit/44d9c3cd36ceff44681e7ea8b20f6799e96057ff Message: Use DumpClient's features to report progress Modified files: lib/droonga/plugins/system/absorb_data.rb Modified: lib/droonga/plugins/system/absorb_data.rb (+10 -124) =================================================================== --- lib/droonga/plugins/system/absorb_data.rb 2015-04-17 16:58:26 +0900 (d515b35) +++ lib/droonga/plugins/system/absorb_data.rb 2015-04-17 16:59:17 +0900 (06a55ed) @@ -48,16 +48,16 @@ module Droonga @dumper_error_message = nil - dumper = Drndump::DumpClient.new(dumper_params) - dumper.on_finish = lambda do + @dumper = Drndump::DumpClient.new(dumper_params) + @dumper.on_finish = lambda do on_finish logger.trace("start: finish") end - dumper.on_progress = lambda do |message| + @dumper.on_progress = lambda do |message| logger.trace("dump progress", :message => message) end - dumper.on_error = lambda do |error| + @dumper.on_error = lambda do |error| logger.error("unexpected error while dump", :error => error) end @@ -65,20 +65,13 @@ module Droonga @previous_report_time = Time.now begin - @n_processed_messages = 0 - @total_n_source_records = nil - get_total_n_source_records do |count| - @total_n_source_records = count - logger.info("#{count} records to be absorbed") - end logger.info("starting to absorb the source dataset") - @dumper_error_message = dumper.run(dump_options) do |message| + @dumper_error_message =****@dumpe*****(dump_options) do |message| begin message["dataset"] = current_dataset @messenger.forward(message, "to" => my_node_name, "type" => message["type"]) - @n_processed_messages += 1 now = Time.now elapsed_seconds = (now - @previous_report_time).to_i if elapsed_seconds >= progress_interval_seconds @@ -116,7 +109,6 @@ module Droonga if @dumper_error_message error(error_name, @dumper_error_message) else - @total_n_source_records = @n_processed_messages report_progress end rescue Exception => exception @@ -147,40 +139,12 @@ module Droonga end def report_progress - if @n_processed_messages.nil? or @total_n_source_records.nil? - return - end + message = "#{@dumper.progress_percentage}% done " + + "(maybe #{@dumper.formatted_remaining_time} remaining)" forward("#{prefix}.progress", - "nProcessedMessages" => @n_processed_messages, - "percentage" => progress_percentage, - "message" => progress_message) - end - - def progress_percentage - if @total_n_source_records.nil? or @total_n_source_records.zero? - return 0 - end - progress = @n_processed_messages.to_f / @total_n_source_records - [(progress * 100).to_i, 100].min - end - - ONE_MINUTE_IN_SECONDS = 60 - ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60 - - def progress_message - n_remaining_records = [@total_n_source_records - @n_processed_messages, 0].max - - remaining_seconds = n_remaining_records / messages_per_second - remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor - remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS - remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor - remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS - remaining_time = sprintf("%02i:%02i:%02i", - remaining_hours, - remaining_minutes, - remaining_seconds) - - "#{progress_percentage}% done (maybe #{remaining_time} remaining)" + "nProcessedMessages" => @dumper.n_received_messages, + "percentage" => @dumper.progress_percentage, + "message" => message) end def myself @@ -230,84 +194,6 @@ module Droonga DEFAULT_MESSAGES_PER_SECOND end - def source_client_options - { - :host => source_host, - :port => source_port, - :tag => source_tag, - :dataset => source_dataset, - - :protocol => :droonga, - - :receiver_host => myself.host, - :receiver_port => 0, - - :backend => :coolio, - :loop => @loop, - } - end - - def create_source_client - Droonga::Client.new(source_client_options) - end - - def get_source_tables(&block) - client = create_source_client - client.request("dataset" => source_dataset, - "type" => "table_list") do |response| - client.close - unless response - raise EmptyResponse.new("table_list returns nil response") - end - unless response["body"] - raise EmptyBody.new("table_list returns nil result") - end - - message_body = response["body"] - body = message_body[1] - tables = body[1..-1] - table_names = tables.collect do |table| - table[1] - end - yield(table_names) - end - end - - def get_total_n_source_records(&block) - get_source_tables do |source_tables| - queries = {} - source_tables.each do |table| - queries["n_records_of_#{table}"] = { - "source" => table, - "output" => { - "elements" => ["count"], - }, - } - end - client = create_source_client - client.request("dataset" => source_dataset, - "type" => "search", - "body" => { - "timeout" => 10, - "queries" => queries, - }) do |response| - client.close - unless response - raise EmptyResponse.new("search returns nil response") - end - unless response["body"] - raise EmptyBody.new("search returns nil result") - end - - n_records = 0 - response["body"].each do |query_name, result| - n_records += result["count"] - end - yield(n_records) - end - end - end - def log_tag "[#{Process.ppid}] data-absorber" end -------------- next part -------------- HTML����������������������������... Download