[Groonga-commit] droonga/fluent-plugin-droonga at 7a1b849 [master] Extract job_queue operations into JobQueue class

Back to archive index

Yoji Shidara null+****@clear*****
Thu Oct 31 17:26:10 JST 2013


Yoji Shidara	2013-10-31 17:26:10 +0900 (Thu, 31 Oct 2013)

  New Revision: 7a1b849d068c90f440d38e5e4578693c036b6383
  https://github.com/droonga/fluent-plugin-droonga/commit/7a1b849d068c90f440d38e5e4578693c036b6383

  Merged 9f0933d: Merge pull request #23 from droonga/extract-job-queue-operations

  Message:
    Extract job_queue operations into JobQueue class

  Modified files:
    lib/droonga/executor.rb
    lib/droonga/job_queue.rb

  Modified: lib/droonga/executor.rb (+6 -35)
===================================================================
--- lib/droonga/executor.rb    2013-10-31 15:32:18 +0900 (2fa386e)
+++ lib/droonga/executor.rb    2013-10-31 17:26:10 +0900 (7047840)
@@ -15,7 +15,6 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "msgpack"
 require "fluent-logger"
 require "groonga"
 
@@ -50,16 +49,14 @@ module Droonga
       @outputs.each do |dest, output|
         output[:logger].close if output[:logger]
       end
-      @queue = nil
       if @database
         @database.close
         @context.close
         @database = @context = nil
       end
-      if @queue_database
-        @queue_database.close
-        @queue_context.close
-        @queue_database = @queue_context = nil
+      if @job_queue
+        @job_queue.close
+        @job_queue = nil
       end
       $log.trace("#{log_tag}: shutdown: done")
     end
@@ -93,7 +90,7 @@ module Droonga
 
     def execute_one
       $log.trace("#{log_tag}: execute_one: start")
-      message = pull_message
+      message = @job_queue.pull_message
       unless message
         $log.trace("#{log_tag}: execute_one: abort: no message")
         return
@@ -158,7 +155,7 @@ module Droonga
               envelope["arguments"] = arguments
               message = ['', Time.now.to_f, envelope]
             end
-            push_message(message)
+            @job_queue.push_message(message)
           end
         end
       end
@@ -229,27 +226,6 @@ module Droonga
       [envelope["body"], envelope["type"], envelope["arguments"]]
     end
 
-    def push_message(message)
-      $log.trace("#{log_tag}: push_message: start")
-      packed_message = message.to_msgpack
-      @queue.push do |record|
-        record.message = packed_message
-      end
-      $log.trace("#{log_tag}: push_message: done")
-    end
-
-    def pull_message
-      packed_message = nil
-      @queue.pull do |record|
-        if record
-          packed_message = record.message
-          record.delete
-        end
-      end
-      return nil unless packed_message
-      MessagePack.unpack(packed_message)
-    end
-
     def load_handlers
       @handler_names.each do |handler_name|
         plugin = Droonga::Plugin.new("handler", handler_name)
@@ -261,12 +237,7 @@ module Droonga
       if @database_name && !@database_name.empty?
         @context = Groonga::Context.new
         @database =****@conte*****_database(@database_name)
-
-        @queue_context = Groonga::Context.new
-        @queue_database = @queue_context.open_database(@database_name)
-        @queue_context.encoding = :none
-
-        @queue = @queue_context[@queue_name]
+        @job_queue = JobQueue.open(@database_name, @queue_name)
       end
       @handler_names.each do |handler_name|
         add_handler(handler_name)

  Modified: lib/droonga/job_queue.rb (+54 -0)
===================================================================
--- lib/droonga/job_queue.rb    2013-10-31 15:32:18 +0900 (4ac60c2)
+++ lib/droonga/job_queue.rb    2013-10-31 17:26:10 +0900 (9961b09)
@@ -16,6 +16,7 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 require "droonga/job_queue_schema"
+require "msgpack"
 
 module Droonga
   class JobQueue
@@ -24,6 +25,59 @@ module Droonga
         schema = JobQueueSchema.new(database_path, queue_name)
         schema.ensure_created
       end
+
+      def open(database_path, queue_name)
+        job_queue = new(database_path, queue_name)
+        job_queue.open
+        job_queue
+      end
+    end
+
+    def initialize(database_path, queue_name)
+      @database_path = database_path
+      @queue_name = queue_name
+    end
+
+    def open
+      @context = Groonga::Context.new
+      @database =****@conte*****_database(@database_path)
+      @context.encoding = :none
+
+      @queue = @context[@queue_name]
+    end
+
+    def push_message(message)
+      $log.trace("#{log_tag}: push_message: start")
+      packed_message = message.to_msgpack
+      @queue.push do |record|
+        record.message = packed_message
+      end
+      $log.trace("#{log_tag}: push_message: done")
+    end
+
+    def pull_message
+      packed_message = nil
+      @queue.pull do |record|
+        if record
+          packed_message = record.message
+          record.delete
+        end
+      end
+      return nil unless packed_message
+      MessagePack.unpack(packed_message)
+    end
+
+    def close
+      @queue = nil
+      if @database
+        @database.close
+        @context.close
+        @database = @context = nil
+      end
+    end
+
+    def log_tag
+      "[#{Process.ppid}][#{Process.pid}] job_queue"
     end
   end
 end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index