YUKI Hiroshi
null+****@clear*****
Tue Mar 31 18:45:54 JST 2015
YUKI Hiroshi 2015-03-31 18:45:54 +0900 (Tue, 31 Mar 2015) New Revision: 4985e7fbe918752a4375a6f5f32ff96fd0dc1ae3 https://github.com/droonga/droonga-engine/commit/4985e7fbe918752a4375a6f5f32ff96fd0dc1ae3 Message: Introduce ForwardBuffer to support adding of new replica without stopping of writing messages Added files: lib/droonga/forward_buffer.rb Modified files: lib/droonga/engine_node.rb lib/droonga/fluent_message_sender.rb lib/droonga/path.rb Modified: lib/droonga/engine_node.rb (+24 -1) =================================================================== --- lib/droonga/engine_node.rb 2015-03-31 18:17:25 +0900 (deaa4eb) +++ lib/droonga/engine_node.rb 2015-03-31 18:45:54 +0900 (2e68d5d) @@ -14,6 +14,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA require "droonga/loggable" +require "droonga/forward_buffer" require "droonga/fluent_message_sender" require "droonga/node_metadata" @@ -29,6 +30,8 @@ module Droonga @state = state @sender_node_metadata = params[:metadata] + @buffer = ForwardBuffer.new(name) + parsed_name = parse_node_name(@name) @sender = FluentMessageSender.new(loop, parsed_name[:host], @@ -41,6 +44,7 @@ module Droonga def start logger.trace("start: start") @sender.resume + @buffer.start_forward if really_writable? logger.trace("start: done") end @@ -51,7 +55,14 @@ module Droonga end def forward(message, destination) - output(message, destination) + if not really_writable? + @buffer.add(message, destination) + elsif****@buffe*****? + output(message, destination) + else + @buffer.add(message, destination) + @buffer.start_forward + end end def forwardable? @@ -137,6 +148,18 @@ module Droonga role == NodeMetadata::Role::ABSORB_DESTINATION end + def really_writable? + return false unless writable? + case sender_role + when NodeMetadata::Role::SERVICE_PROVIDER + service_provider? + when NodeMetadata::Role::ABSORB_SOURCE + not absorb_destination? + else + true + end + end + def sender_role @sender_node_metadata.role end Modified: lib/droonga/fluent_message_sender.rb (+1 -1) =================================================================== --- lib/droonga/fluent_message_sender.rb 2015-03-31 18:17:25 +0900 (0cbc93a) +++ lib/droonga/fluent_message_sender.rb 2015-03-31 18:45:54 +0900 (81d1063) @@ -91,7 +91,7 @@ module Droonga end if @buffering - data_directory = Path.buffer + "#{@host}:#{@port}" + data_directory = Path.accidental_buffer + "#{@host}:#{@port}" FileUtils.mkdir_p(data_directory.to_s) @socket = BufferedTCPSocket.connect(@host, @port, data_directory) @socket.resume Added: lib/droonga/forward_buffer.rb (+96 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/forward_buffer.rb 2015-03-31 18:45:54 +0900 (bd7cd97) @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2014-2015 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +require "fileutils" +require "pathname" +require "msgpack" + +require "droonga/loggable" +require "droonga/path" +require "droonga/safe_file_writer" + +module Droonga + class ForwardBuffer + include Loggable + + SUFFIX = ".msgpack" + + attr_writer :on_forward + + def initialize(node_name) + @on_forward = nil + + @packer = MessagePack::Packer.new + @unpacker = MessagePack::Unpacker.new + + dirname = node_name.gsub("/", ":") + @data_directory = Path.intentional_buffer + dirname + FileUtils.mkdir_p(@data_directory.to_s) + end + + def add(message, destination) + logger.trace("add: start") + buffered_message = { + "message" => message, + "destination" => destination, + } + @packer.pack(buffered_message) + SafeFileWriter.write(file_path) do |output, file| + output.puts(@packer.to_s) + end + @packer.clear + logger.trace("add: done") + end + + def start_forward + logger.trace("start_forward: start") + Pathname.glob("#{@data_directory}/*#{SUFFIX}").collect do |buffered_message_path| + forward(buffered_message_path) + end + logger.trace("start_forward: done") + end + + def empty? + @data_directory.children.empty? + end + + private + def forward(buffered_message_path) + logger.trace("forward: start (#{buffered_message_path})") + file_contents = buffered_message_path.read + @unpacker.feed(file_contents) + buffered_message =****@unpac***** + @unpacker.reset + on_forward(buffered_message["message"], + buffered_message["destination"]) + FileUtils.rm_f(buffered_message_path.to_s) + logger.trace("forward: done (#{buffered_message_path})") + end + + def file_path(time_stamp=Time.now) + @data_directory + "#{time_stamp.iso8601(6)}#{SUFFIX}" + end + + def on_forward(message, destination) + @on_forward.call(message, destination) if @on_forward + end + + def log_tag + "[#{Process.ppid}] forward-buffer" + end + end +end Modified: lib/droonga/path.rb (+6 -2) =================================================================== --- lib/droonga/path.rb 2015-03-31 18:17:25 +0900 (a882878) +++ lib/droonga/path.rb 2015-03-31 18:45:54 +0900 (e168956) @@ -70,8 +70,12 @@ module Droonga base + "restart.txt" end - def buffer - state + "buffer" + def accidental_buffer + state + "buffer" + "accidental" + end + + def intentional_buffer + state + "buffer" + "intentional" end def serf_event_handler_errors -------------- next part -------------- HTML����������������������������...Download