YUKI Hiroshi
null+****@clear*****
Fri Nov 21 17:40:44 JST 2014
YUKI Hiroshi 2014-11-21 17:40:44 +0900 (Fri, 21 Nov 2014) New Revision: 004cdcb87786436b6f7273f62a9af71c8d42bdc3 https://github.com/droonga/droonga-engine/commit/004cdcb87786436b6f7273f62a9af71c8d42bdc3 Message: Report progress by data absorber Modified files: bin/droonga-engine-absorb-data lib/droonga/data_absorber.rb Modified: bin/droonga-engine-absorb-data (+1 -0) =================================================================== --- bin/droonga-engine-absorb-data 2014-11-21 17:01:06 +0900 (11c6a23) +++ bin/droonga-engine-absorb-data 2014-11-21 17:40:44 +0900 (59b7941) @@ -24,6 +24,7 @@ require "droonga/catalog_generator" require "droonga/path" require "droonga/data_absorber" require "droonga/serf" +require "droonga/client" options = OpenStruct.new options.port = Droonga::CatalogGenerator::DEFAULT_PORT Modified: lib/droonga/data_absorber.rb (+113 -21) =================================================================== --- lib/droonga/data_absorber.rb 2014-11-21 17:01:06 +0900 (0b5a4fc) +++ lib/droonga/data_absorber.rb 2014-11-21 17:40:44 +0900 (d01063a) @@ -15,31 +15,53 @@ require "open3" +require "droonga/loggable" +require "droonga/client" + module Droonga class DataAbsorber + include Loggable + DEFAULT_MESSAGES_PER_SECOND = 100 + TIME_UNKNOWN = -1 + PROGRESS_UNKNOWN = -1 + class << self def absorb(params) - drndump = params[:drndump] || "drndump" + new(params).absorb + end + end + + attr_reader :params + def initialize(params) + @params = params + + @params[:messages_per_second] ||= DEFAULT_MESSAGES_PER_SECOND + @params[:drndump] ||= "drndump" + # We should use droonga-send instead of droonga-request, + # because droonga-request is too slow. + @params[:client] ||= "droonga-send" + end + + def absorb + drndump = @params[:drndump] drndump_options = [] - drndump_options += ["--host", params[:source_host]] if params[:source_host] - drndump_options += ["--port", params[:port].to_s] if params[:port] - drndump_options += ["--tag", params[:tag]] if params[:tag] - drndump_options += ["--dataset", params[:dataset]] if params[:dataset] - drndump_options += ["--receiver-host", params[:destination_host]] - drndump_options += ["--receiver-port", params[:receiver_port].to_s] if params[:receiver_port] - - #TODO: We should use droonga-send instead of droonga-request, - # because droonga-request is too slow. - client = params[:client] || "droonga-send" + drndump_options += ["--host", @params[:source_host]] if @params[:source_host] + drndump_options += ["--port", @params[:port].to_s] if @params[:port] + drndump_options += ["--tag", @params[:tag]] if @params[:tag] + drndump_options += ["--dataset", @params[:dataset]] if @params[:dataset] + drndump_options += ["--receiver-host", @params[:destination_host]] + drndump_options += ["--receiver-port", @params[:receiver_port].to_s] if @params[:receiver_port] + + client = @params[:client] client_options = [] if client.include?("droonga-request") - client_options += ["--host", params[:destination_host]] - client_options += ["--port", params[:port].to_s] if params[:port] - client_options += ["--tag", params[:tag]] if params[:tag] - client_options += ["--receiver-host", params[:destination_host]] - client_options += ["--receiver-port", params[:receiver_port].to_s] if params[:receiver_port] + client_options += ["--host", @params[:destination_host]] + client_options += ["--port", @params[:port].to_s] if @params[:port] + client_options += ["--tag", @params[:tag]] if @params[:tag] + client_options += ["--receiver-host", @params[:destination_host]] + client_options += ["--receiver-port", @params[:receiver_port].to_s] if @params[:receiver_port] elsif client.include?("droonga-send") #XXX Don't use round-robin with multiple endpoints # even if there are too much data. @@ -49,12 +71,11 @@ module Droonga # So, we always use just one endpoint for now, # even if there are too much data. server = "droonga:#{params[:destination_host]}" - server = "#{server}:#{params[:port].to_s}" if params[:port] - server = "#{server}/#{params[:tag].to_s}" if params[:tag] + server = "#{server}:#{params[:port].to_s}" if @params[:port] + server = "#{server}/#{params[:tag].to_s}" if @params[:tag] client_options += ["--server", server] #XXX We should restrict the traffic to avoid overflowing! - params[:messages_per_second] ||= DEFAULT_MESSAGES_PER_SECOND - client_options += ["--messages-per-second", params[:messages_per_second]] + client_options += ["--messages-per-second", @params[:messages_per_second]] else raise ArgumentError.new("Unknwon type client: #{client}") end @@ -62,14 +83,85 @@ module Droonga drndump_command_line = [drndump] + drndump_options client_command_line = [client] + client_options + calculated_required_time = required_time_in_seconds + unless calculated_required_time == TIME_UNKNOWN + logger.info("calculated required time: #{calculated_required_time}sec") + if block_given? + yield(:required_time_in_seconds => calculated_required_time) + end + + start = Time.new.to_i env = {} Open3.pipeline_r([env, *drndump_command_line], [env, *client_command_line]) do |last_stdout, thread| last_stdout.each do |output| - yield output if block_given? + progress = nil + if calculated_required_time == TIME_UNKNOWN or + calculated_required_time <= 0 + progress = PROGRESS_UNKNOWN + else + progress = (Time.new.to_i - start) / calculated_required_time + end + yield(:progress => progress, + :output => output) end end end + + def required_time_in_seconds + @params[:client].include?("droonga-send") + total_n_source_records / @params[:messages_per_second] + else + TIME_UNKNOWN + end + end + + def source_client + options = { + :host => @params[:source_host], + :port => @params[:port], + :tag => @params[:tag], + :progocol => :droonga, + :receiver_host => @params[:destination_host], + :receiver_port => 0, + } + @source_client ||= Droonga::Client.new(options) + end + + def source_tables + response = source_client.request("dataset" => @params[:dataset], + "type" => "table_list") + body = response["body"][1] + tables = body[1..-1] + tables.collect do |table| + table[1] + end + end + + def total_n_source_records + queries = {} + source_tables.each do |table| + queries["n_records_of_#{table}"] = { + "source" => table, + "output" => { + "elements" => ["count"], + }, + } + end + response = source_client.request("dataset" => @params[:dataset], + "type" => "search", + "body" => { + "queries" => queries, + }) + n_records = 0 + response["body"].each do |query_name, result| + n_records += result["count"] + end + n_records + end + + def log_tag + "data-absorber" end end end -------------- next part -------------- HTML����������������������������...Download