[Groonga-commit] droonga/droonga-engine at 4985e7f [master] Introduce ForwardBuffer to support adding of new replica without stopping of writing messages

Back to archive index

YUKI Hiroshi null+****@clear*****
Tue Mar 31 18:45:54 JST 2015


YUKI Hiroshi	2015-03-31 18:45:54 +0900 (Tue, 31 Mar 2015)

  New Revision: 4985e7fbe918752a4375a6f5f32ff96fd0dc1ae3
  https://github.com/droonga/droonga-engine/commit/4985e7fbe918752a4375a6f5f32ff96fd0dc1ae3

  Message:
    Introduce ForwardBuffer to support adding of new replica without stopping of writing messages

  Added files:
    lib/droonga/forward_buffer.rb
  Modified files:
    lib/droonga/engine_node.rb
    lib/droonga/fluent_message_sender.rb
    lib/droonga/path.rb

  Modified: lib/droonga/engine_node.rb (+24 -1)
===================================================================
--- lib/droonga/engine_node.rb    2015-03-31 18:17:25 +0900 (deaa4eb)
+++ lib/droonga/engine_node.rb    2015-03-31 18:45:54 +0900 (2e68d5d)
@@ -14,6 +14,7 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
 require "droonga/loggable"
+require "droonga/forward_buffer"
 require "droonga/fluent_message_sender"
 require "droonga/node_metadata"
 
@@ -29,6 +30,8 @@ module Droonga
       @state = state
       @sender_node_metadata = params[:metadata]
 
+      @buffer = ForwardBuffer.new(name)
+
       parsed_name = parse_node_name(@name)
       @sender = FluentMessageSender.new(loop,
                                         parsed_name[:host],
@@ -41,6 +44,7 @@ module Droonga
     def start
       logger.trace("start: start")
       @sender.resume
+      @buffer.start_forward if really_writable?
       logger.trace("start: done")
     end
 
@@ -51,7 +55,14 @@ module Droonga
     end
 
     def forward(message, destination)
-      output(message, destination)
+      if not really_writable?
+        @buffer.add(message, destination)
+      elsif****@buffe*****?
+        output(message, destination)
+      else
+        @buffer.add(message, destination)
+        @buffer.start_forward
+      end
     end
 
     def forwardable?
@@ -137,6 +148,18 @@ module Droonga
       role == NodeMetadata::Role::ABSORB_DESTINATION
     end
 
+    def really_writable?
+      return false unless writable?
+      case sender_role
+      when NodeMetadata::Role::SERVICE_PROVIDER
+        service_provider?
+      when NodeMetadata::Role::ABSORB_SOURCE
+        not absorb_destination?
+      else
+        true
+      end
+    end
+
     def sender_role
       @sender_node_metadata.role
     end

  Modified: lib/droonga/fluent_message_sender.rb (+1 -1)
===================================================================
--- lib/droonga/fluent_message_sender.rb    2015-03-31 18:17:25 +0900 (0cbc93a)
+++ lib/droonga/fluent_message_sender.rb    2015-03-31 18:45:54 +0900 (81d1063)
@@ -91,7 +91,7 @@ module Droonga
       end
 
       if @buffering
-        data_directory = Path.buffer + "#{@host}:#{@port}"
+        data_directory = Path.accidental_buffer + "#{@host}:#{@port}"
         FileUtils.mkdir_p(data_directory.to_s)
         @socket = BufferedTCPSocket.connect(@host, @port, data_directory)
         @socket.resume

  Added: lib/droonga/forward_buffer.rb (+96 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/forward_buffer.rb    2015-03-31 18:45:54 +0900 (bd7cd97)
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2014-2015 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+
+require "fileutils"
+require "pathname"
+require "msgpack"
+
+require "droonga/loggable"
+require "droonga/path"
+require "droonga/safe_file_writer"
+
+module Droonga
+  class ForwardBuffer
+    include Loggable
+
+    SUFFIX = ".msgpack"
+
+    attr_writer :on_forward
+
+    def initialize(node_name)
+      @on_forward = nil
+
+      @packer = MessagePack::Packer.new
+      @unpacker = MessagePack::Unpacker.new
+
+      dirname = node_name.gsub("/", ":")
+      @data_directory = Path.intentional_buffer + dirname
+      FileUtils.mkdir_p(@data_directory.to_s)
+    end
+
+    def add(message, destination)
+      logger.trace("add: start")
+      buffered_message = {
+        "message"     => message,
+        "destination" => destination,
+      }
+      @packer.pack(buffered_message)
+      SafeFileWriter.write(file_path) do |output, file|
+        output.puts(@packer.to_s)
+      end
+      @packer.clear
+      logger.trace("add: done")
+    end
+
+    def start_forward
+      logger.trace("start_forward: start")
+      Pathname.glob("#{@data_directory}/*#{SUFFIX}").collect do |buffered_message_path|
+        forward(buffered_message_path)
+      end
+      logger.trace("start_forward: done")
+    end
+
+    def empty?
+      @data_directory.children.empty?
+    end
+
+    private
+    def forward(buffered_message_path)
+      logger.trace("forward: start (#{buffered_message_path})")
+      file_contents = buffered_message_path.read
+      @unpacker.feed(file_contents)
+      buffered_message =****@unpac*****
+      @unpacker.reset
+      on_forward(buffered_message["message"],
+                 buffered_message["destination"])
+      FileUtils.rm_f(buffered_message_path.to_s)
+      logger.trace("forward: done (#{buffered_message_path})")
+    end
+
+    def file_path(time_stamp=Time.now)
+      @data_directory + "#{time_stamp.iso8601(6)}#{SUFFIX}"
+    end
+
+    def on_forward(message, destination)
+      @on_forward.call(message, destination) if @on_forward
+    end
+
+    def log_tag
+      "[#{Process.ppid}] forward-buffer"
+    end
+  end
+end

  Modified: lib/droonga/path.rb (+6 -2)
===================================================================
--- lib/droonga/path.rb    2015-03-31 18:17:25 +0900 (a882878)
+++ lib/droonga/path.rb    2015-03-31 18:45:54 +0900 (e168956)
@@ -70,8 +70,12 @@ module Droonga
         base + "restart.txt"
       end
 
-      def buffer
-        state + "buffer"
+      def accidental_buffer
+        state + "buffer" + "accidental"
+      end
+
+      def intentional_buffer
+        state + "buffer" + "intentional"
       end
 
       def serf_event_handler_errors
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index