[Groonga-commit] droonga/droonga-engine at 91c675b [master] Count total number of actually absorbed objects correctly

Back to archive index

YUKI Hiroshi null+****@clear*****
Tue Apr 21 22:05:09 JST 2015


YUKI Hiroshi	2015-04-21 22:05:09 +0900 (Tue, 21 Apr 2015)

  New Revision: 91c675bf801c8303930f3aafb490e03cdccb00b7
  https://github.com/droonga/droonga-engine/commit/91c675bf801c8303930f3aafb490e03cdccb00b7

  Message:
    Count total number of actually absorbed objects correctly

  Modified files:
    lib/droonga/plugins/system/absorb_data.rb

  Modified: lib/droonga/plugins/system/absorb_data.rb (+78 -11)
===================================================================
--- lib/droonga/plugins/system/absorb_data.rb    2015-04-21 22:02:23 +0900 (934e29a)
+++ lib/droonga/plugins/system/absorb_data.rb    2015-04-21 22:05:09 +0900 (c0d8a13)
@@ -13,12 +13,13 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
+require "fiber"
+
 require "droonga/plugin"
 require "droonga/plugin/async_command"
 require "droonga/catalog/dataset"
 require "droonga/serf"
 require "droonga/node_name"
-require "droonga/database_scanner"
 
 require "drndump/dump_client"
 
@@ -37,23 +38,26 @@ module Droonga
         end
 
         class DataAbsorber < AsyncCommand::AsyncHandler
-          include DatabaseScanner
-
           class EmptyResponse < StandardError
           end
 
           class EmptyBody < StandardError
           end
 
-          def initialize(context, loop, messenger, request)
-            @context = context
-            super(loop, messenger, request)
-          end
-
           def start
             logger.trace("start: start")
             on_start
 
+            count_total_n_objects do |n_objects|
+              @initial_n_objects = n_objects
+              do_absorb
+            end
+
+            logger.trace("start: done")
+          end
+
+          def do_absorb
+            logger.trace("do_absorb: start")
             @dumper_error_message = nil
 
             @dumper = Drndump::DumpClient.new(dumper_params)
@@ -108,7 +112,7 @@ module Droonga
             end
 
             on_finish if @dumper_error_message
-            logger.trace("start: done")
+            logger.trace("do_absorb: done")
           end
 
           private
@@ -125,7 +129,70 @@ module Droonga
           end
 
           def ensure_completely_restored(&block)
-            yield
+            runner = Fiber.new do
+              completely_restored = false
+              n_expected_objects =****@dumpe*****_forecasted_messages
+              while not completely_restored
+                count_total_n_objects do |count|
+                  n_restored_objects = count - @initial_n_objects
+                  logger.trace("ensure_completely_restored: check",
+                               :current    => n_restored_objects,
+                               :forecasted => n_expected_objects)
+                  completely_restored ||= n_restored_objects == n_expected_objects
+                end
+                Fiber.yield
+              end
+              count_client.close
+              yield
+            end
+
+            timer = Coolio::TimerWatcher.new(3, true)
+            timer.on_timer do
+              if runner.alive?
+                begin
+                  runner.resume
+                rescue
+                  timer.detach
+                  # logger.trace("start: watcher detached on unexpected exception",
+                  #              :watcher => timer)
+                  logger.exception(error_message, $!)
+                  error(error_name, error_message)
+                end
+              else
+                timer.detach
+                # logger.trace("start: watcher detached on unexpected exception",
+                #              :watcher => timer)
+              end
+            end
+            @loop.attach(timer)
+          end
+
+          def count_total_n_objects(&block)
+            count_message = {
+              "type"    => "system.object-count",
+              "dataset" => current_dataset,
+              "body"    => {
+                "output" => ["total"],
+              },
+            }
+            count_client.request(count_message) do |response|
+              yield(response["body"]["total"])
+            end
+          end
+
+          def count_client
+            @count_client ||= Droonga::Client.new(count_client_options)
+          end
+
+          def count_client_options
+            {
+              :host     => myself.host,
+              :port     => myself.port,
+              :tag      => myself.tag,
+              :protocol => :droonga,
+              :backend  => :coolio,
+              :loop     => @loop,
+            }
           end
 
           def on_finish
@@ -234,7 +301,7 @@ module Droonga
 
         private
         def start(request)
-          absorber = DataAbsorber.new(@context, loop, messenger, request)
+          absorber = DataAbsorber.new(loop, messenger, request)
           absorber.start
         end
       end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index