Yoji Shidara
null+****@clear*****
Thu Oct 31 17:26:10 JST 2013
Yoji Shidara 2013-10-31 17:26:10 +0900 (Thu, 31 Oct 2013) New Revision: 7a1b849d068c90f440d38e5e4578693c036b6383 https://github.com/droonga/fluent-plugin-droonga/commit/7a1b849d068c90f440d38e5e4578693c036b6383 Merged 9f0933d: Merge pull request #23 from droonga/extract-job-queue-operations Message: Extract job_queue operations into JobQueue class Modified files: lib/droonga/executor.rb lib/droonga/job_queue.rb Modified: lib/droonga/executor.rb (+6 -35) =================================================================== --- lib/droonga/executor.rb 2013-10-31 15:32:18 +0900 (2fa386e) +++ lib/droonga/executor.rb 2013-10-31 17:26:10 +0900 (7047840) @@ -15,7 +15,6 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "msgpack" require "fluent-logger" require "groonga" @@ -50,16 +49,14 @@ module Droonga @outputs.each do |dest, output| output[:logger].close if output[:logger] end - @queue = nil if @database @database.close @context.close @database = @context = nil end - if @queue_database - @queue_database.close - @queue_context.close - @queue_database = @queue_context = nil + if @job_queue + @job_queue.close + @job_queue = nil end $log.trace("#{log_tag}: shutdown: done") end @@ -93,7 +90,7 @@ module Droonga def execute_one $log.trace("#{log_tag}: execute_one: start") - message = pull_message + message = @job_queue.pull_message unless message $log.trace("#{log_tag}: execute_one: abort: no message") return @@ -158,7 +155,7 @@ module Droonga envelope["arguments"] = arguments message = ['', Time.now.to_f, envelope] end - push_message(message) + @job_queue.push_message(message) end end end @@ -229,27 +226,6 @@ module Droonga [envelope["body"], envelope["type"], envelope["arguments"]] end - def push_message(message) - $log.trace("#{log_tag}: push_message: start") - packed_message = message.to_msgpack - @queue.push do |record| - record.message = packed_message - end - $log.trace("#{log_tag}: push_message: done") - end - - def pull_message - packed_message = nil - @queue.pull do |record| - if record - packed_message = record.message - record.delete - end - end - return nil unless packed_message - MessagePack.unpack(packed_message) - end - def load_handlers @handler_names.each do |handler_name| plugin = Droonga::Plugin.new("handler", handler_name) @@ -261,12 +237,7 @@ module Droonga if @database_name && !@database_name.empty? @context = Groonga::Context.new @database =****@conte*****_database(@database_name) - - @queue_context = Groonga::Context.new - @queue_database = @queue_context.open_database(@database_name) - @queue_context.encoding = :none - - @queue = @queue_context[@queue_name] + @job_queue = JobQueue.open(@database_name, @queue_name) end @handler_names.each do |handler_name| add_handler(handler_name) Modified: lib/droonga/job_queue.rb (+54 -0) =================================================================== --- lib/droonga/job_queue.rb 2013-10-31 15:32:18 +0900 (4ac60c2) +++ lib/droonga/job_queue.rb 2013-10-31 17:26:10 +0900 (9961b09) @@ -16,6 +16,7 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA require "droonga/job_queue_schema" +require "msgpack" module Droonga class JobQueue @@ -24,6 +25,59 @@ module Droonga schema = JobQueueSchema.new(database_path, queue_name) schema.ensure_created end + + def open(database_path, queue_name) + job_queue = new(database_path, queue_name) + job_queue.open + job_queue + end + end + + def initialize(database_path, queue_name) + @database_path = database_path + @queue_name = queue_name + end + + def open + @context = Groonga::Context.new + @database =****@conte*****_database(@database_path) + @context.encoding = :none + + @queue = @context[@queue_name] + end + + def push_message(message) + $log.trace("#{log_tag}: push_message: start") + packed_message = message.to_msgpack + @queue.push do |record| + record.message = packed_message + end + $log.trace("#{log_tag}: push_message: done") + end + + def pull_message + packed_message = nil + @queue.pull do |record| + if record + packed_message = record.message + record.delete + end + end + return nil unless packed_message + MessagePack.unpack(packed_message) + end + + def close + @queue = nil + if @database + @database.close + @context.close + @database = @context = nil + end + end + + def log_tag + "[#{Process.ppid}][#{Process.pid}] job_queue" end end end -------------- next part -------------- HTML����������������������������...Download