[Groonga-commit] droonga/droonga-engine at 44d9c3c [master] Use DumpClient's features to report progress

Back to archive index

YUKI Hiroshi null+****@clear*****
Fri Apr 17 16:59:17 JST 2015


YUKI Hiroshi	2015-04-17 16:59:17 +0900 (Fri, 17 Apr 2015)

  New Revision: 44d9c3cd36ceff44681e7ea8b20f6799e96057ff
  https://github.com/droonga/droonga-engine/commit/44d9c3cd36ceff44681e7ea8b20f6799e96057ff

  Message:
    Use DumpClient's features to report progress

  Modified files:
    lib/droonga/plugins/system/absorb_data.rb

  Modified: lib/droonga/plugins/system/absorb_data.rb (+10 -124)
===================================================================
--- lib/droonga/plugins/system/absorb_data.rb    2015-04-17 16:58:26 +0900 (d515b35)
+++ lib/droonga/plugins/system/absorb_data.rb    2015-04-17 16:59:17 +0900 (06a55ed)
@@ -48,16 +48,16 @@ module Droonga
 
             @dumper_error_message = nil
 
-            dumper = Drndump::DumpClient.new(dumper_params)
-            dumper.on_finish = lambda do
+            @dumper = Drndump::DumpClient.new(dumper_params)
+            @dumper.on_finish = lambda do
               on_finish
               logger.trace("start: finish")
             end
-            dumper.on_progress = lambda do |message|
+            @dumper.on_progress = lambda do |message|
               logger.trace("dump progress",
                            :message => message)
             end
-            dumper.on_error = lambda do |error|
+            @dumper.on_error = lambda do |error|
               logger.error("unexpected error while dump",
                            :error => error)
             end
@@ -65,20 +65,13 @@ module Droonga
             @previous_report_time = Time.now
 
             begin
-              @n_processed_messages = 0
-              @total_n_source_records = nil
-              get_total_n_source_records do |count|
-                @total_n_source_records = count
-                logger.info("#{count} records to be absorbed")
-              end
               logger.info("starting to absorb the source dataset")
-              @dumper_error_message = dumper.run(dump_options) do |message|
+              @dumper_error_message =****@dumpe*****(dump_options) do |message|
                 begin
                   message["dataset"] = current_dataset
                   @messenger.forward(message,
                                      "to"   => my_node_name,
                                      "type" => message["type"])
-                  @n_processed_messages += 1
                   now = Time.now
                   elapsed_seconds = (now - @previous_report_time).to_i
                   if elapsed_seconds >= progress_interval_seconds
@@ -116,7 +109,6 @@ module Droonga
               if @dumper_error_message
                 error(error_name, @dumper_error_message)
               else
-                @total_n_source_records = @n_processed_messages
                 report_progress
               end
             rescue Exception => exception
@@ -147,40 +139,12 @@ module Droonga
           end
 
           def report_progress
-            if @n_processed_messages.nil? or @total_n_source_records.nil?
-              return
-            end
+            message = "#{@dumper.progress_percentage}% done " +
+                        "(maybe #{@dumper.formatted_remaining_time} remaining)"
             forward("#{prefix}.progress",
-                    "nProcessedMessages" => @n_processed_messages,
-                    "percentage"         => progress_percentage,
-                    "message"            => progress_message)
-          end
-
-          def progress_percentage
-            if @total_n_source_records.nil? or @total_n_source_records.zero?
-              return 0
-            end
-            progress = @n_processed_messages.to_f / @total_n_source_records
-            [(progress * 100).to_i, 100].min
-          end
-
-          ONE_MINUTE_IN_SECONDS = 60
-          ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60
-
-          def progress_message
-            n_remaining_records = [@total_n_source_records - @n_processed_messages, 0].max
-
-            remaining_seconds  = n_remaining_records / messages_per_second
-            remaining_hours    = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
-            remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
-            remaining_minutes  = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
-            remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
-            remaining_time     = sprintf("%02i:%02i:%02i",
-                                         remaining_hours,
-                                         remaining_minutes,
-                                         remaining_seconds)
-
-            "#{progress_percentage}% done (maybe #{remaining_time} remaining)"
+                    "nProcessedMessages" => @dumper.n_received_messages,
+                    "percentage"         => @dumper.progress_percentage,
+                    "message"            => message)
           end
 
           def myself
@@ -230,84 +194,6 @@ module Droonga
                                        DEFAULT_MESSAGES_PER_SECOND
           end
 
-          def source_client_options
-            {
-              :host    => source_host,
-              :port    => source_port,
-              :tag     => source_tag,
-              :dataset => source_dataset,
-
-              :protocol => :droonga,
-
-              :receiver_host => myself.host,
-              :receiver_port => 0,
-
-              :backend => :coolio,
-              :loop    => @loop,
-            }
-          end
-
-          def create_source_client
-            Droonga::Client.new(source_client_options)
-          end
-
-          def get_source_tables(&block)
-            client = create_source_client
-            client.request("dataset" => source_dataset,
-                           "type"    => "table_list") do |response|
-              client.close
-              unless response
-                raise EmptyResponse.new("table_list returns nil response")
-              end
-              unless response["body"]
-                raise EmptyBody.new("table_list returns nil result")
-              end
-
-              message_body = response["body"]
-              body = message_body[1]
-              tables = body[1..-1]
-              table_names = tables.collect do |table|
-                table[1]
-              end
-              yield(table_names)
-            end
-          end
-
-          def get_total_n_source_records(&block)
-            get_source_tables do |source_tables|
-              queries = {}
-              source_tables.each do |table|
-                queries["n_records_of_#{table}"] = {
-                  "source" => table,
-                  "output" => {
-                    "elements" => ["count"],
-                  },
-                }
-              end
-              client = create_source_client
-              client.request("dataset" => source_dataset,
-                             "type"    => "search",
-                             "body"    => {
-                               "timeout" => 10,
-                               "queries" => queries,
-                             }) do |response|
-                client.close
-                unless response
-                  raise EmptyResponse.new("search returns nil response")
-                end
-                unless response["body"]
-                  raise EmptyBody.new("search returns nil result")
-                end
-
-                n_records = 0
-                response["body"].each do |query_name, result|
-                  n_records += result["count"]
-                end
-                yield(n_records)
-              end
-            end
-          end
-
           def log_tag
             "[#{Process.ppid}] data-absorber"
           end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index