Kouhei Sutou
null+****@clear*****
Thu May 15 17:48:46 JST 2014
Kouhei Sutou 2014-05-15 17:48:46 +0900 (Thu, 15 May 2014) New Revision: 892e5dfa19f2c914fdbc6cb0729206872eb4ad3f https://github.com/droonga/droonga-engine/commit/892e5dfa19f2c914fdbc6cb0729206872eb4ad3f Message: dump: split to some classes Modified files: lib/droonga/plugins/dump.rb Modified: lib/droonga/plugins/dump.rb (+83 -38) =================================================================== --- lib/droonga/plugins/dump.rb 2014-05-14 17:52:51 +0900 (9c9f47b) +++ lib/droonga/plugins/dump.rb 2014-05-15 17:48:46 +0900 (b7bbc8e) @@ -26,55 +26,77 @@ module Droonga class Handler < Droonga::Handler def handle(message) - id = message.raw["id"] - dataset = message.raw["dataset"] - replyTo = (message.raw["replyTo"] || {})["to"] - return false unless replyTo + request = Request.new(message) + if request.need_dump? + dumper = Dumper.new(@context, loop, messenger, request) + dumper.start_dump + true + else + false + end + end + end - request = message.request || {} + class Request + def initialize(message) + @message = message + end - dump_start_message = { - "inReplyTo" => id, - "dataset" => dataset, - } - messenger.forward(dump_start_message, - "to" => replyTo, - "type" => "dump.start") + def need_dump? + reply_to + end + + def id + @message["id"] + end + + def dataset + @message.raw["dataset"] + end + + def reply_to + (@message.raw["replyTo"] || {})["to"] + end + + def messages_per_seconds + request = (@message.request || {}) + minimum_messages_per_seconds = 10 + [ + minimum_messages_per_seconds, + (request["messagesPerSecond"] || 10000).to_i, + ].max + end + end + + class Dumper + def initialize(context, loop, messenger, request) + @context = context + @loop = loop + @messenger = messenger + @request = request + end + + def start_dump + setup_forward_data + + forward("dump.start") - messages_per_seconds = request["messagesPerSecond"] || 10000 - messages_per_seconds = [10, messages_per_seconds.to_i].max - messages_per_100msec = messages_per_seconds / 10 dumper = Fiber.new do - n = 0 each_table do |table| table.each do |record| values = {} record.attributes.each do |key, value| values[key] = value unless key.start_with?("_") end - dump_message = { - "inReplyTo" => id, - "dataset" => dataset, - "body" => { - "table" => table.name, - "key" => record.key, - "values" => values, - }, + body = { + "table" => table.name, + "key" => record.key, + "values" => values, } - messenger.forward(dump_message, - "to" => replyTo, - "type" => "dump.record") - n = (n + 1) % messages_per_100msec - Fiber.yield if n.zero? + forward("dump.record", body) end end - dump_end_message = { - "inReplyTo" => id, - "dataset" => dataset, - } - messenger.forward(dump_end_message, - "to" => replyTo, - "type" => "dump.end") + forward("dump.end") end timer = Coolio::TimerWatcher.new(0.1, true) @@ -85,12 +107,35 @@ module Droonga timer.detach end end - loop.attach(timer) - true + @loop.attach(timer) end private + def setup_forward_data + @base_forward_message = { + "inReplyTo" => @request.id, + "dataset" => @request.dataset, + } + @forward_to =****@reque*****_to + @n_forwarded_messages = 0 + @messages_per_100msec =****@reque*****_per_seconds / 10 + end + + def forward(type, body=nil) + forward_message = @base_forward_message + if body + forward_message = forward_message.merge("body" => body) + end + @messenger.forward(forward_message, + "to" => @forward_to, + "type" => type) + + @n_forwarded_messages += 1 + @n_forwarded_messages %= @messages_per_100msec + Fiber.yield if @n_forwarded_messages.zero? + end + def each_table @context.database.each(:ignore_missing_object => true) do |object| next unless object.is_a?(::Groonga::Table) -------------- next part -------------- HTML����������������������������...Download