YUKI Hiroshi
null+****@clear*****
Wed Apr 8 10:51:29 JST 2015
YUKI Hiroshi 2015-04-08 10:51:29 +0900 (Wed, 08 Apr 2015) New Revision: e6a37e5d261caab60804ac18db76950e8267e437 https://github.com/droonga/droonga-engine/commit/e6a37e5d261caab60804ac18db76950e8267e437 Message: Add mechanism to skip forwarding of obsolete messages Modified files: lib/droonga/engine_node.rb lib/droonga/forward_buffer.rb Modified: lib/droonga/engine_node.rb (+2 -0) =================================================================== --- lib/droonga/engine_node.rb 2015-04-07 18:57:55 +0900 (81f118f) +++ lib/droonga/engine_node.rb 2015-04-08 10:51:29 +0900 (83d852b) @@ -34,6 +34,8 @@ module Droonga @sender_node_metadata = params[:metadata] @buffer = ForwardBuffer.new(name) + boundary_timestamp = accept_messages_newer_than_timestamp + @buffer.process_messages_newer_than(boundary_timestamp) @buffer.on_forward = lambda do |message, destination| output(message, destination) end Modified: lib/droonga/forward_buffer.rb (+22 -0) =================================================================== --- lib/droonga/forward_buffer.rb 2015-04-07 18:57:55 +0900 (69d5ed3) +++ lib/droonga/forward_buffer.rb 2015-04-08 10:51:29 +0900 (7757f8c) @@ -18,6 +18,7 @@ require "fileutils" require "pathname" require "msgpack" +require "time" require "droonga/loggable" require "droonga/path" @@ -74,6 +75,10 @@ module Droonga @data_directory.children.empty? end + def process_messages_newer_than(timestamp) + @process_messages_newer_than_timestamp = timestamp + end + private def forward(buffered_message_path) logger.trace("forward: start (#{buffered_message_path})") @@ -81,8 +86,25 @@ module Droonga @unpacker.feed(file_contents) buffered_message =****@unpac***** @unpacker.reset + + if @process_messages_newer_than_timestamp + message_timestamp = Time.parse(message["date"]) + logger.trace("Checking boundary of obsolete message", + :newer_than => @process_messages_newer_than_timestamp, + :message_at => message_timestamp) + if @process_messages_newer_than_timestamp >= message_timestamp + buffered_message = nil + else + logger.info("New message is detected. The boundary is now cleared.") + @process_messages_newer_than_timestamp = nil + end + end + + if buffered_message on_forward(buffered_message["message"], buffered_message["destination"]) + end + FileUtils.rm_f(buffered_message_path.to_s) logger.trace("forward: done (#{buffered_message_path})") end -------------- next part -------------- HTML����������������������������...Download