[Groonga-commit] droonga/droonga-engine at d02310c [master] Re-implement system.absorb-data based on AsyncCommand

Back to archive index

YUKI Hiroshi null+****@clear*****
Fri Apr 10 17:04:29 JST 2015


YUKI Hiroshi	2015-04-10 17:04:29 +0900 (Fri, 10 Apr 2015)

  New Revision: d02310ccb02bd498021a5dc08af57bc659a7a09b
  https://github.com/droonga/droonga-engine/commit/d02310ccb02bd498021a5dc08af57bc659a7a09b

  Message:
    Re-implement system.absorb-data based on AsyncCommand

  Modified files:
    lib/droonga/plugins/system/absorb_data.rb

  Modified: lib/droonga/plugins/system/absorb_data.rb (+62 -37)
===================================================================
--- lib/droonga/plugins/system/absorb_data.rb    2015-04-10 17:04:01 +0900 (01732a3)
+++ lib/droonga/plugins/system/absorb_data.rb    2015-04-10 17:04:29 +0900 (27adb7e)
@@ -14,7 +14,9 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
 require "droonga/plugin"
+require "droonga/plugin/async_command"
 require "droonga/catalog/dataset"
+require "droonga/serf"
 require "droonga/node_name"
 
 require "drndump/dumper"
@@ -22,7 +24,7 @@ require "drndump/dumper"
 module Droonga
   module Plugins
     module System
-      class AbsorbDataHandler < Droonga::Handler
+      class AbsorbDataHandler < AsyncCommand::Handler
         action.synchronous = true
 
         DEFAULT_MESSAGES_PER_SECOND = 100
@@ -33,62 +35,85 @@ module Droonga
           end
         end
 
-        class DumpFailed < InternalServerError
-          def initialize(error)
-            super("source node returns an error.",
-                  error)
+        class DataAbsorber
+          private
+          def prefix
+            "system.absorb-data"
           end
-        end
 
-        def handle(message)
-          raise MissingHostParameter.new unless message.include?("host")
+          def error_name
+            "AbsorbFailure"
+          end
+
+          def error_message
+            "failed to absorb data"
+          end
+
+          def handle
+            dumper = Drndump::Dumper.new(dumper_params)
+
+            serf = Serf.new(my_node_name)
+            serf.set_tag("absorbing", true)
 
-          dumper = Drndump::Dumper.new(dumper_params(message))
+            dumper_error_message = dumper.run do |message|
+              @messenger.forward(message,
+                                 "to"   => my_node_name,
+                                 "type" => message["type"])
+              forward("#{prefix}.progress")
+            end
 
-          serf = Serf.new(my_node_name)
-          serf.set_tag("absorbing", true)
+            serf.set_tag("absorbing", true)
 
-          error_message = dumper.run do |message|
-            @messenger.forward(message,
-                               "to"   => my_node_name,
-                               "type" => message["type"])
+            if dumper_error_message
+              error(error_name, dumper_error_message)
+            end
           end
 
-          serf.set_tag("absorbing", true)
+          private
+          def dumper_params
+            params =****@reque*****
+            {
+              :host    => params["host"],
+              :port    => params["port"]    || NodeName::DEFAULT_PORT,
+              :tag     => params["tag"]     || NodeName::DEFAULT_TAG,
+              :dataset => params["dataset"] || Catalog::Dataset::DEFAULT_NAME,
 
-          raise DumpFailed.new(error_message) if error_message
+              :receiver_host => myself.host,
+              :receiver_port => 0,
 
-          true
-        end
+              :messages_per_second => params["messagesPerSecond"] || DEFAULT_MESSAGES_PER_SECOND,
+            }
+          end
 
-        private
-        def dumper_params(message)
-          {
-            :host    => message["host"],
-            :port    => message["port"]    || NodeName::DEFAULT_PORT,
-            :tag     => message["tag"]     || NodeName::DEFAULT_TAG,
-            :dataset => message["dataset"] || Catalog::Dataset::DEFAULT_NAME,
-
-            :receiver_host => myself.host,
-            :receiver_port => 0,
-
-            :messages_per_second => message["messagesPerSecond"] || DEFAULT_MESSAGES_PER_SECOND,
-          }
+          def myself
+            @myself ||= NodeName.parse(my_node_name)
+          end
+
+          def my_node_name
+            @messenger.engine_state.name
+          end
+
+          def log_tag
+            "[#{Process.ppid}] data-absorber"
+          end
         end
 
-        def myself
-          @myself ||= NodeName.parse(my_node_name)
+        def handle(message)
+          raise MissingHostParameter.new unless message.include?("host")
+          super
         end
 
-        def my_node_name
-          @messenger.engine_state.name
+        private
+        def start(request)
+          absorber = DataAbsorber.new(loop, messenger, request)
+          absorber.start
         end
       end
 
       define_single_step do |step|
         step.name = "system.absorb-data"
         step.handler = AbsorbDataHandler
-        step.collector = Collectors::Or
+        step.collector = Collectors::And
       end
     end
   end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index