[Groonga-commit] droonga/droonga-engine at 4347bb0 [master] Generate messages for progress by the handler of system.absorb-data

Back to archive index

YUKI Hiroshi null+****@clear*****
Fri Apr 10 18:27:40 JST 2015


YUKI Hiroshi	2015-04-10 18:27:40 +0900 (Fri, 10 Apr 2015)

  New Revision: 4347bb0562e18f9885058f2879b81f3bdcda6638
  https://github.com/droonga/droonga-engine/commit/4347bb0562e18f9885058f2879b81f3bdcda6638

  Message:
    Generate messages for progress by the handler of system.absorb-data

  Modified files:
    lib/droonga/data_absorber.rb
    lib/droonga/plugins/system/absorb_data.rb

  Modified: lib/droonga/data_absorber.rb (+5 -90)
===================================================================
--- lib/droonga/data_absorber.rb    2015-04-10 18:16:19 +0900 (db7688a)
+++ lib/droonga/data_absorber.rb    2015-04-10 18:27:40 +0900 (2e9ef0e)
@@ -22,12 +22,6 @@ module Droonga
   class DataAbsorber
     include Loggable
 
-    class EmptyResponse < StandardError
-    end
-
-    class EmptyBody < StandardError
-    end
-
     DEFAULT_MESSAGES_PER_SECOND = 100
 
     TIME_UNKNOWN = -1
@@ -59,13 +53,9 @@ module Droonga
 
       @receiver_port = @params[:receiver_port]
 
-      @destination_client_options = @params[:client_options] || {}
+      @client_options = @params[:client_options] || {}
 
       @error_message = nil
-
-      #XXX We must instantiate the number of total soruce records before absorbing,
-      #    because parallel commands while doing "dump" can be timed out.
-      @total_n_source_records = count_total_n_source_records
     end
 
     def run
@@ -96,10 +86,9 @@ module Droonga
             end
           when "system.absorb-data.progress"
             body = message["body"]
-            @n_prosessed_messages = body["nProcessedMessages"]
-            yield(:n_processed_messages => @n_processed_messages,
-                  :percentage           => progress_percentage,
-                  :message              => progress_message)
+            yield(:n_processed_messages => body["nProcessedMessages"],
+                  :percentage           => body["percentage"],
+                  :message              => body["message"])
           when "system.absorb-data.start"
             n_absorbers += 1
           when "system.absorb-data.end"
@@ -110,39 +99,6 @@ module Droonga
       end
     end
 
-    ONE_MINUTE_IN_SECONDS = 60
-    ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60
-
-    def progress_percentage
-      progress = @n_prosessed_messages / @total_n_source_records
-      [(progress * 100).to_i, 100].min
-    end
-
-    def progress_message
-      n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max
-
-      remaining_seconds  = n_remaining_records / @messages_per_second
-      remaining_hours    = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
-      remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
-      remaining_minutes  = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
-      remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
-      remaining_time     = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)
-
-      "#{progress_percentage}% done (maybe #{remaining_time} remaining)"
-    end
-
-    def source_client
-      options = {
-        :host          => @source_host,
-        :port          => @port,
-        :tag           => @tag,
-        :protocol      => :droonga,
-        :receiver_host => @receiver_host,
-        :receiver_port => 0,
-      }
-      @source_client ||= Droonga::Client.new(options)
-    end
-
     def destination_client
       options = {
         :host          => @destination_host,
@@ -151,7 +107,7 @@ module Droonga
         :protocol      => :droonga,
         :receiver_host => @receiver_host,
         :receiver_port => 0,
-      }.merge(@destination_client_options)
+      }.merge(@client_options)
       @destination_client ||= Droonga::Client.new(options)
     end
 
@@ -160,47 +116,6 @@ module Droonga
     end
 
     private
-    def source_tables
-      response = source_client.request("dataset" => @dataset,
-                                       "type"    => "table_list")
-
-      raise EmptyResponse.new("table_list") unless response
-      raise EmptyBody.new("table_list") unless response["body"]
-
-      message_body = response["body"]
-      body = message_body[1]
-      tables = body[1..-1]
-      tables.collect do |table|
-        table[1]
-      end
-    end
-
-    def count_total_n_source_records
-      queries = {}
-      source_tables.each do |table|
-        queries["n_records_of_#{table}"] = {
-          "source" => table,
-          "output" => {
-            "elements" => ["count"],
-          },
-        }
-      end
-      response = source_client.request("dataset" => @dataset,
-                                       "type"    => "search",
-                                       "body"    => {
-                                         "queries" => queries,
-                                       })
-
-      raise EmptyResponse.new("search") unless response
-      raise EmptyBody.new("search") unless response["body"]
-
-      n_records = 0
-      response["body"].each do |query_name, result|
-        n_records += result["count"]
-      end
-      n_records
-    end
-
     def source_replica_hosts
       @source_replica_hosts ||= get_source_replica_hosts
     end

  Modified: lib/droonga/plugins/system/absorb_data.rb (+122 -11)
===================================================================
--- lib/droonga/plugins/system/absorb_data.rb    2015-04-10 18:16:19 +0900 (97fe6e9)
+++ lib/droonga/plugins/system/absorb_data.rb    2015-04-10 18:27:40 +0900 (14ce71f)
@@ -36,6 +36,12 @@ module Droonga
         end
 
         class DataAbsorber < AsyncCommand::AsyncHandler
+          class EmptyResponse < StandardError
+          end
+
+          class EmptyBody < StandardError
+          end
+
           private
           def prefix
             "system.absorb-data"
@@ -55,16 +61,21 @@ module Droonga
             serf = Serf.new(my_node_name)
             serf.set_tag("absorbing", true)
 
-            count = 0
+            begin
+              @total_n_source_records = count_total_n_source_records
+            @n_processed_messages = 0
             dumper_error_message = dumper.run do |message|
               @messenger.forward(message,
                                  "to"   => my_node_name,
                                  "type" => message["type"])
-              count += 1
-              report_progress(count)
+              @n_processed_messages += 1
+              report_progress
             end
 
-            forward("#{prefix}.progress", "count" => count)
+            report_progress
+            rescue Exception => exception
+              dumper_error_message = exception.to_s
+            end
 
             serf.set_tag("absorbing", true)
 
@@ -76,10 +87,10 @@ module Droonga
           def dumper_params
             params =****@reque*****
             {
-              :host    => params["host"],
-              :port    => params["port"]    || NodeName::DEFAULT_PORT,
-              :tag     => params["tag"]     || NodeName::DEFAULT_TAG,
-              :dataset => params["dataset"] || Catalog::Dataset::DEFAULT_NAME,
+              :host    => source_host,
+              :port    => source_port,
+              :tag     => source_tag,
+              :dataset => source_dataset,
 
               :receiver_host => myself.host,
               :receiver_port => 0,
@@ -88,9 +99,33 @@ module Droonga
             }
           end
 
-          def report_progress(count)
-            return unless (count % 100).zero?
-            forward("#{prefix}.progress", "count" => count)
+          def report_progress
+            return unless (@n_processed_messages % 100).zero?
+            forward("#{prefix}.progress",
+                    "nProcessedMessages" => @n_processed_messages,
+                    "percentage"         => progress_percentage,
+                    "message"            => progress_message)
+          end
+
+          def progress_percentage
+            progress = @n_prosessed_messages / @total_n_source_records
+            [(progress * 100).to_i, 100].min
+          end
+
+          ONE_MINUTE_IN_SECONDS = 60
+          ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60
+
+          def progress_message
+            n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max
+
+            remaining_seconds  = n_remaining_records / @messages_per_second
+            remaining_hours    = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
+            remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
+            remaining_minutes  = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
+            remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
+            remaining_time     = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)
+
+            "#{progress_percentage}% done (maybe #{remaining_time} remaining)"
           end
 
           def myself
@@ -101,6 +136,82 @@ module Droonga
             @messenger.engine_state.name
           end
 
+          def source_host
+            @source_host ||=****@reque*****["host"]
+          end
+
+          def source_port
+            @source_port ||=****@reque*****["port"] || NodeName::DEFAULT_PORT
+          end
+
+          def source_tag
+            @source_tag ||=****@reque*****["tag"] || NodeName::DEFAULT_TAG
+          end
+
+          def source_dataset
+            @source_dataset ||=****@reque*****["dataset"] || Catalog::Dataset::DEFAULT_NAME
+          end
+
+          def source_tables
+            response = source_client.request("dataset" => @dataset,
+                                             "type"    => "table_list")
+
+            raise EmptyResponse.new("table_list") unless response
+            raise EmptyBody.new("table_list") unless response["body"]
+
+            message_body = response["body"]
+            body = message_body[1]
+            tables = body[1..-1]
+            tables.collect do |table|
+              table[1]
+            end
+          end
+
+          def source_client_options
+            params =****@reque*****
+            options = {
+              :host    => source_host,
+              :port    => source_port,
+              :tag     => source_tag,
+              :dataset => source_dataset,
+
+              :protocol => :droonga,
+
+              :receiver_host => myself.host,
+              :receiver_port => 0,
+            }
+          end
+
+          def source_client
+            @source_client ||= Droonga::Client.new(source_client_options)
+          end
+
+          def count_total_n_source_records
+            queries = {}
+            source_tables.each do |table|
+              queries["n_records_of_#{table}"] = {
+                "source" => table,
+                "output" => {
+                  "elements" => ["count"],
+                },
+              }
+            end
+            response = source_client.request("dataset" => @dataset,
+                                             "type"    => "search",
+                                             "body"    => {
+                                               "queries" => queries,
+                                             })
+
+            raise EmptyResponse.new("search") unless response
+            raise EmptyBody.new("search") unless response["body"]
+
+            n_records = 0
+            response["body"].each do |query_name, result|
+              n_records += result["count"]
+            end
+            n_records
+          end
+
           def log_tag
             "[#{Process.ppid}] data-absorber"
           end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index