YUKI Hiroshi
null+****@clear*****
Wed Nov 26 15:40:38 JST 2014
YUKI Hiroshi 2014-11-26 15:40:38 +0900 (Wed, 26 Nov 2014) New Revision: da739cce2eba57bb3c5b3e9941e51355959d8120 https://github.com/droonga/droonga-engine/commit/da739cce2eba57bb3c5b3e9941e51355959d8120 Message: Implement droonga-engine-join with a class Modified files: bin/droonga-engine-join Modified: bin/droonga-engine-join (+79 -46) =================================================================== --- bin/droonga-engine-join 2014-11-21 22:03:14 +0900 (17c9cc2) +++ bin/droonga-engine-join 2014-11-26 15:40:38 +0900 (29c5360) @@ -26,37 +26,57 @@ require "droonga/safe_file_writer" require "droonga/data_absorber" require "droonga/serf" -options = nil -begin - options = Slop.parse(:help => true) do |option| - option.on("no-copy", "Don't copy data from the source cluster.", - :default => false) - - option.separator("Connections:") - option.on(:host=, - "Host name of the new node to be joined.", - :required => true) - option.on("replica-source-host=", - "Host name of the soruce node in the cluster to be connected.", - :required => true) - option.on(:dataset=, - "Dataset name of for the node to be joined.", - :default => Droonga::CatalogGenerator::DEFAULT_DATASET) - option.on(:port=, - "Port number of the source cluster to be connected.", - :as => Integer, - :default => Droonga::CatalogGenerator::DEFAULT_PORT) - option.on(:tag=, - "Tag name of the soruce cluster to be connected.", - :default => Droonga::CatalogGenerator::DEFAULT_TAG) +class JoinCommand + def run + parse_options + set_node_role + do_join + sleep(5) #TODO: wait for restarting of the joining node. this should be done more safely. + do_copy unless options["no-copy"] + set_effective_message_timestamp + update_other_nodes + reset_node_role + puts("Done.") + exit(true) + end + + private + def parse_options + options = Slop.parse(:help => true) do |option| + option.on("no-copy", "Don't copy data from the source cluster.", + :default => false) + + option.separator("Connections:") + option.on(:host=, + "Host name of the new node to be joined.", + :required => true) + option.on("replica-source-host=", + "Host name of the soruce node in the cluster to be connected.", + :required => true) + option.on(:dataset=, + "Dataset name of for the node to be joined.", + :default => Droonga::CatalogGenerator::DEFAULT_DATASET) + option.on(:port=, + "Port number of the source cluster to be connected.", + :as => Integer, + :default => Droonga::CatalogGenerator::DEFAULT_PORT) + option.on(:tag=, + "Tag name of the soruce cluster to be connected.", + :default => Droonga::CatalogGenerator::DEFAULT_TAG) + end + @options = options + rescue Slop::MissingOptionError => error + $stderr.puts(error) + exit(false) end -rescue Slop::MissingOptionError => e - $stderr.puts(e) - exit(false) -end -joining_node = "#{options[:host]}:#{options[:port]}/#{options[:tag]}" -source_node = "#{options["replica-source-host"]}:#{options[:port]}/#{options[:tag]}" + def joining_node + "#{@options[:host]}:#{@options[:port]}/#{@options[:tag]}" + end + + def source_node + "#{@options["replica-source-host"]}:#{@options[:port]}/#{@options[:tag]}" + end def run_remote_command(target, command, options) serf = Droonga::Serf.new(nil, target) @@ -66,6 +86,11 @@ def run_remote_command(target, command, options) result[:response] end + def absorber + @absorber ||= prepare_absorber + end + + def prepare_absorber absorber_options = { :dataset => options[:dataset], :source_host => options["replica-source-host"], @@ -74,7 +99,9 @@ absorber_options = { :tag => options[:tag], } absorber = Droonga::DataAbsorber.new(absorber_options) + end + def set_node_role if absorber.source_node_suspendable? run_remote_command(source_node, "change_role", "node" => source_node, @@ -83,8 +110,20 @@ end run_remote_command(joining_node, "change_role", "node" => joining_node, "role" => "destination") + end + + def reset_node_role + if absorber.source_node_suspendable? + run_remote_command(source_node, "change_role", + "node" => source_node, + "role" => "") + end + run_remote_command(joining_node, "change_role", + "node" => joining_node, + "role" => "") + end -start_time_in_seconds = Time.new.to_i + def do_join puts("Joining new replica to the cluster...") run_remote_command(joining_node, "join", "node" => joining_node, @@ -92,9 +131,10 @@ run_remote_command(joining_node, "join", "source" => source_node, "dataset" => options[:dataset], "copy" => !options["no-copy"]) -sleep(5) #TODO: wait for restarting of the joining node. this should be done more safely. + end -unless options["no-copy"] + def do_copy + @start_time_in_seconds = Time.new.to_i puts("Copying data from the source node...") last_progress = "" while true @@ -107,7 +147,7 @@ unless options["no-copy"] break unless absorbing end - progress = absorber.report_progress(start_time_in_seconds) + progress = absorber.report_progress(@start_time_in_seconds) if progress printf("%s", "#{" " * last_progress.size}\r") printf("%s", "#{progress}\r") @@ -115,8 +155,9 @@ unless options["no-copy"] end end puts "" -end + end + def set_effective_message_timestamp response = run_remote_command(source_node, "report_status", "node" => source_node, "key" => "last_processed_message_timestamp") @@ -129,22 +170,14 @@ if timestamp and not timestamp.empty? "key" => "effective_message_timestamp", "value" => timestamp) end + end + def update_other_nodes puts("Update existing hosts in the cluster...") run_remote_command(source_node, "add_replicas", "dataset" => options[:dataset], "hosts" => [options[:host]]) - -if absorber.source_node_suspendable? - run_remote_command(source_node, "change_role", - "node" => source_node, - "role" => "") + end end -run_remote_command(joining_node, "change_role", - "node" => joining_node, - "role" => "") - - -puts("Done.") -exit(true) +JoinCommand.new.run -------------- next part -------------- HTML����������������������������... Download