YUKI Hiroshi
null+****@clear*****
Wed Apr 15 14:24:26 JST 2015
YUKI Hiroshi 2015-04-15 14:24:26 +0900 (Wed, 15 Apr 2015) New Revision: be7210a3f79236a85424a2446e522f47d409df1f https://github.com/droonga/droonga-engine/commit/be7210a3f79236a85424a2446e522f47d409df1f Message: Cleanup codes around confusing options Modified files: bin/droonga-engine-join Modified: bin/droonga-engine-join (+70 -48) =================================================================== --- bin/droonga-engine-join 2015-04-15 14:12:25 +0900 (f815d4a) +++ bin/droonga-engine-join 2015-04-15 14:24:26 +0900 (66cd06d) @@ -23,8 +23,9 @@ require "coolio" require "droonga/engine/version" require "droonga/path" +require "droonga/node_name" +require "droonga/catalog/dataset" require "droonga/catalog_fetcher" -require "droonga/catalog_generator" require "droonga/catalog_loader" require "droonga/safe_file_writer" require "droonga/data_absorber_client" @@ -38,9 +39,12 @@ class JoinCommand parse_options trap_signals - puts "Start to join a new node #{@options[:host]}" - puts " to the cluster of #{@options["replica-source-host"]}" + puts "Start to join a new node #{joining_node.host}" + puts " to the cluster of #{source_node.host}" puts " via #{@options["receiver-host"]} (this host)" + puts " port = #{joining_node.port}" + puts " tag = #{joining_node.tag}" + puts " dataset = #{dataset}" puts "" puts "Source Cluster ID: #{source_cluster_id}" puts "" @@ -76,33 +80,36 @@ class JoinCommand option.on("no-copy", "Don't copy data from the source cluster.", :default => false) - option.separator("Connections:") + option.separator("Target:") option.on(:host=, "Host name of the new node to be joined.", :required => true) option.on("replica-source-host=", "Host name of the soruce node in the cluster to be connected.", :required => true) - option.on("receiver-host=", - "Host name of this host.", - :default => Socket.gethostname) - option.on(:dataset=, - "Dataset name of for the node to be joined.", - :default => Droonga::CatalogGenerator::DEFAULT_DATASET) + option.on(:port=, "Port number of the source cluster to be connected.", :as => Integer, - :default => Droonga::CatalogGenerator::DEFAULT_PORT) + :default => Droonga::NodeName::DEFAULT_PORT) option.on(:tag=, "Tag name of the soruce cluster to be connected.", - :default => Droonga::CatalogGenerator::DEFAULT_TAG) + :default => Droonga::NodeName::DEFAULT_TAG) + option.on(:dataset=, + "Dataset name of for the node to be joined.", + :default => Droonga::Catalog::Dataset::DEFAULT_NAME) + + option.separator("Connections:") + option.on("receiver-host=", + "Host name of this host.", + :default => Socket.gethostname) + + option.separator("Miscellaneous:") option.on("records-per-second=", "Maximum number of records per second to be copied. " + "'#{Droonga::Client::RateLimiter::NO_LIMIT}' means no limit.", :as => Integer, :default => Droonga::DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND) - - option.separator("Miscellaneous:") option.on("progress-interval-seconds=", "Interval seconds to report progress.", :as => Integer, @@ -116,12 +123,20 @@ class JoinCommand exit(false) end + def dataset + @options[:dataset] + end + def joining_node - "#{@options[:host]}:#{@options[:port]}/#{@options[:tag]}" + @joining_node ||= NodeName.new(:host => @options[:host], + :port => @options[:port], + :tag => @options[:tag]) end def source_node - "#{@options["replica-source-host"]}:#{@options[:port]}/#{@options[:tag]}" + @source_node ||= NodeName.new(:host => @options["replica-source-host"], + :port => @options[:port], + :tag => @options[:tag]) end def source_cluster_id @@ -142,11 +157,11 @@ class JoinCommand end def fetch_source_catalog - fetcher = Droonga::CatalogFetcher.new(:host => @options["replica-source-host"], - :port => @options[:port], - :tag => @options[:tag], + fetcher = Droonga::CatalogFetcher.new(:host => source_node.host, + :port => source_node.port, + :tag => source_node.tag, :receiver_host => @options["receiver-host"]) - fetcher.fetch(:dataset => @options[:dataset]) + fetcher.fetch(:dataset => dataset) end def run_remote_command(target, command, options) @@ -160,14 +175,21 @@ class JoinCommand def prepare_absorber absorber_options = { - :dataset => @options[:dataset], - :source_host => @options["replica-source-host"], - :destination_host => @options[:host], + :host => joining_node.host, + :port => joining_node.port, + :tag => joining_node.tag, + :dataset => dataset, + + :source_host => source_node.host, + :source_port => source_node.port, + :source_tag => source_node.tag, + :source_dataset => dataset, + :receiver_host => @options["receiver-host"], - :port => @options[:port], - :tag => @options[:tag], + :messages_per_second => @options["records-per-second"], :progress_interval_seconds => @options["progress-interval-seconds"], + :client_options => { :backend => :coolio, :loop => @loop, @@ -179,8 +201,8 @@ class JoinCommand def set_source_node_role if absorber.source_node_suspendable? puts("Changing role of the source node...") - run_remote_command(source_node, "change_role", - "node" => source_node, + run_remote_command(source_node.to_s, "change_role", + "node" => source_node.to_s, "role" => Droonga::NodeMetadata::Role::ABSORB_SOURCE) wait_until_restarted(source_node) end @@ -189,8 +211,8 @@ class JoinCommand def set_joining_node_role puts("Changing role of the joining node...") - run_remote_command(joining_node, "change_role", - "node" => joining_node, + run_remote_command(joining_node.to_s, "change_role", + "node" => joining_node.to_s, "role" => Droonga::NodeMetadata::Role::ABSORB_DESTINATION) wait_until_restarted(joining_node) @joining_node_role_changed = true @@ -199,30 +221,30 @@ class JoinCommand def reset_source_node_role if absorber.source_node_suspendable? puts("Restoring role of the source node...") - run_remote_command(source_node, "change_role", - "node" => source_node, + run_remote_command(source_node.to_s, "change_role", + "node" => source_node.to_s, "role" => Droonga::NodeMetadata::Role::SERVICE_PROVIDER) - wait_until_restarted(source_node) + wait_until_restarted(source_node.to_s) end @source_node_role_changed = false end def reset_joining_node_role puts("Restoring role of the joining node...") - run_remote_command(joining_node, "change_role", - "node" => joining_node, + run_remote_command(joining_node.to_s, "change_role", + "node" => joining_node.to_s, "role" => Droonga::NodeMetadata::Role::SERVICE_PROVIDER) - wait_until_restarted(joining_node) + wait_until_restarted(joining_node.to_s) @joining_node_role_changed = false end def do_join puts("Joining new replica to the cluster...") - run_remote_command(joining_node, "join", - "node" => joining_node, + run_remote_command(joining_node.to_s, "join", + "node" => joining_node.to_s, "type" => "replica", - "source" => source_node, - "dataset" => @options[:dataset]) + "source" => source_node.to_s, + "dataset" => dataset) wait_until_restarted(joining_node) end @@ -249,25 +271,25 @@ class JoinCommand end def set_effective_message_timestamp - response = run_remote_command(source_node, "report_metadata", - "node" => source_node, + response = run_remote_command(source_node.to_s, "report_metadata", + "node" => source_node.to_s, "key" => "last_processed_message_timestamp") timestamp = response["value"] if timestamp and not timestamp.empty? puts "The timestamp of the last processed message in the source node: #{timestamp}" puts "Setting effective message timestamp for the destination node..." - response = run_remote_command(joining_node, "accept_messages_newer_than", - "node" => joining_node, + response = run_remote_command(joining_node.to_s, "accept_messages_newer_than", + "node" => joining_node.to_s, "timestamp" => timestamp) end end def register_to_existing_nodes puts("Register new node to existing hosts in the cluster...") - run_remote_command(source_node, "add_replicas", + run_remote_command(source_node.to_s, "add_replicas", "cluster_id" => source_cluster_id, - "dataset" => @options[:dataset], - "hosts" => [@options[:host]]) + "dataset" => dataset, + "hosts" => [joining_node.host]) wait_until_restarted(source_node) @node_registered = true end @@ -276,8 +298,8 @@ class JoinCommand puts("Unregister new node from existing hosts in the cluster...") run_remote_command(source_node, "remove_replicas", "cluster_id" => source_cluster_id, - "dataset" => @options[:dataset], - "hosts" => [@options[:host]]) + "dataset" => dataset, + "hosts" => [joining_node.host]) wait_until_restarted(source_node) @node_registered = false end -------------- next part -------------- HTML����������������������������...Download