[Groonga-commit] droonga/droonga-engine at e6a37e5 [master] Add mechanism to skip forwarding of obsolete messages

Back to archive index

YUKI Hiroshi null+****@clear*****
Wed Apr 8 10:51:29 JST 2015


YUKI Hiroshi	2015-04-08 10:51:29 +0900 (Wed, 08 Apr 2015)

  New Revision: e6a37e5d261caab60804ac18db76950e8267e437
  https://github.com/droonga/droonga-engine/commit/e6a37e5d261caab60804ac18db76950e8267e437

  Message:
    Add mechanism to skip forwarding of obsolete messages

  Modified files:
    lib/droonga/engine_node.rb
    lib/droonga/forward_buffer.rb

  Modified: lib/droonga/engine_node.rb (+2 -0)
===================================================================
--- lib/droonga/engine_node.rb    2015-04-07 18:57:55 +0900 (81f118f)
+++ lib/droonga/engine_node.rb    2015-04-08 10:51:29 +0900 (83d852b)
@@ -34,6 +34,8 @@ module Droonga
       @sender_node_metadata = params[:metadata]
 
       @buffer = ForwardBuffer.new(name)
+      boundary_timestamp = accept_messages_newer_than_timestamp
+      @buffer.process_messages_newer_than(boundary_timestamp)
       @buffer.on_forward = lambda do |message, destination|
         output(message, destination)
       end

  Modified: lib/droonga/forward_buffer.rb (+22 -0)
===================================================================
--- lib/droonga/forward_buffer.rb    2015-04-07 18:57:55 +0900 (69d5ed3)
+++ lib/droonga/forward_buffer.rb    2015-04-08 10:51:29 +0900 (7757f8c)
@@ -18,6 +18,7 @@
 require "fileutils"
 require "pathname"
 require "msgpack"
+require "time"
 
 require "droonga/loggable"
 require "droonga/path"
@@ -74,6 +75,10 @@ module Droonga
       @data_directory.children.empty?
     end
 
+    def process_messages_newer_than(timestamp)
+      @process_messages_newer_than_timestamp = timestamp
+    end
+
     private
     def forward(buffered_message_path)
       logger.trace("forward: start (#{buffered_message_path})")
@@ -81,8 +86,25 @@ module Droonga
       @unpacker.feed(file_contents)
       buffered_message =****@unpac*****
       @unpacker.reset
+
+      if @process_messages_newer_than_timestamp
+        message_timestamp = Time.parse(message["date"])
+        logger.trace("Checking boundary of obsolete message",
+                     :newer_than => @process_messages_newer_than_timestamp,
+                     :message_at => message_timestamp)
+        if @process_messages_newer_than_timestamp >= message_timestamp
+          buffered_message = nil
+        else
+          logger.info("New message is detected. The boundary is now cleared.")
+          @process_messages_newer_than_timestamp = nil
+        end
+      end
+
+      if buffered_message
       on_forward(buffered_message["message"],
                  buffered_message["destination"])
+      end
+
       FileUtils.rm_f(buffered_message_path.to_s)
       logger.trace("forward: done (#{buffered_message_path})")
     end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index