[Groonga-commit] droonga/droonga-engine at 6bd1683 [buffered-forward] Unify BufferedForwarder to EngineNode

Back to archive index

YUKI Hiroshi null+****@clear*****
Tue Jan 6 18:26:50 JST 2015


YUKI Hiroshi	2015-01-06 18:26:50 +0900 (Tue, 06 Jan 2015)

  New Revision: 6bd1683488f4ef46903cc071aece250de89893ff
  https://github.com/droonga/droonga-engine/commit/6bd1683488f4ef46903cc071aece250de89893ff

  Message:
    Unify BufferedForwarder to EngineNode

  Removed files:
    lib/droonga/buffered_forwarder.rb
  Modified files:
    lib/droonga/cluster.rb
    lib/droonga/engine_node.rb
    lib/droonga/engine_state.rb

  Deleted: lib/droonga/buffered_forwarder.rb (+0 -88) 100644
===================================================================
--- lib/droonga/buffered_forwarder.rb    2015-01-06 17:55:35 +0900 (4426a9d)
+++ /dev/null
@@ -1,88 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
-
-require "droonga/loggable"
-require "droonga/forwarder"
-require "droonga/forward_buffer"
-
-module Droonga
-  class BufferedForwarder
-    include Loggable
-
-    def initialize(loop, options={})
-      @loop = loop
-      @cluster_state = options[:cluster_state]
-      @buffers = {}
-      @forwarder = Forwarder.new(loop, :buffering => true)
-    end
-
-    def start
-      logger.trace("start: start")
-      resume
-      logger.trace("start: done")
-    end
-
-    def shutdown
-      logger.trace("shutdown: start")
-      @forwarder.shutdown
-      logger.trace("shutdown: done")
-    end
-
-    def resume
-      @forwarder.resume
-      @buffers.each do |node_name, buffer|
-        if writable_node?(node_name)
-          buffer.start_forward
-        end
-      end
-    end
-
-    def forward(message, destination)
-      logger.trace("forward: start")
-      receiver = destination["to"]
-      receiver_is_node = (receiver =~ /\A([^:]+:\d+\/[^\.]+)/)
-      node_name = $1
-      unless receiver_is_node
-        @forwarder.forward(message, destination)
-        return
-      end
-
-      buffer = @buffers[node_name] ||= ForwardBuffer.new(node_name, @forwarder)
-
-      if not writable_node?(node_name)
-        buffer.add(message, destination)
-      elsif buffer.empty?
-        @forwarder.forward(message, destination)
-      else
-        buffer.add(message, destination)
-        buffer.start_forward
-      end
-
-      logger.trace("forward: done")
-    end
-
-    private
-    def writable_node?(node_name)
-      @cluster_state.nil? or
-        not @cluster_state.unwritable_node?(node_name)
-    end
-
-    def log_tag
-      "[#{Process.ppid}] buffered-forwarder"
-    end
-  end
-end

  Modified: lib/droonga/cluster.rb (+1 -13)
===================================================================
--- lib/droonga/cluster.rb    2015-01-06 17:55:35 +0900 (cb90248)
+++ lib/droonga/cluster.rb    2015-01-06 18:26:50 +0900 (a26e0ee)
@@ -85,7 +85,7 @@ module Droonga
       receiver_node_name = receiver.match(/\A[^:]+:\d+\/[^.]+/).to_s
       @engine_nodes.each do |node|
         if node.name == receiver_node_name
-          node.forwarder.forward(message, destination)
+          node.forward(message, destination)
           return true
         end
       end
@@ -142,18 +142,6 @@ module Droonga
       end.collect(&:name)
     end
 
-    def unwritable_node?(node_name)
-      case node_metadata.role
-      when NodeMetadata::Role::SERVICE_PROVIDER
-        absorb_source_nodes.include?(node_name) or
-          absorb_destination_nodes.include?(node_name)
-      when NodeMetadata::Role::ABSORB_SOURCE
-        absorb_destination_nodes.include?(node_name)
-      else
-        false
-      end
-    end
-
     def on_change
       @on_change.call if @on_change
     end

  Modified: lib/droonga/engine_node.rb (+48 -0)
===================================================================
--- lib/droonga/engine_node.rb    2015-01-06 17:55:35 +0900 (d7877b2)
+++ lib/droonga/engine_node.rb    2015-01-06 18:26:50 +0900 (0b970ff)
@@ -13,7 +13,9 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
+require "droonga/loggable"
 require "droonga/forwarder"
+require "droonga/forward_buffer"
 require "droonga/node_metadata"
 
 module Droonga
@@ -26,6 +28,35 @@ module Droonga
       @sender_role = sender_role
 
       @forwarder = Forwarder.new(loop, :buffering => true)
+      @buffer = ForwardBuffer.new(name, @forwarder)
+    end
+
+    def start
+      logger.trace("start: start")
+      resume
+      logger.trace("start: done")
+    end
+
+    def shutdown
+      logger.trace("shutdown: start")
+      @forwarder.shutdown
+      logger.trace("shutdown: done")
+    end
+
+    def resume
+      @forwarder.resume
+      @buffer.start_forward if really_writable?
+    end
+
+    def forward(message, destination)
+      if not really_writable?
+        @buffer.add(message, destination)
+      elsif****@buffe*****?
+        @forwarder.forward(message, destination)
+      else
+        @buffer.add(message, destination)
+        @buffer.start_forward
+      end
     end
 
     def live?
@@ -74,6 +105,18 @@ module Droonga
       end
     end
 
+    def really_writable?
+      return false unless writable?
+      case @sender_role
+      when NodeMetadata::Role::SERVICE_PROVIDER
+        service_provider?
+      when NodeMetadata::Role::ABSORB_SOURCE
+        not absorb_destination?
+      else
+        true
+      end
+    end
+
     def status
       if forwardable?
         "active"
@@ -84,8 +127,13 @@ module Droonga
       end
     end
 
+    private
     def on_change
       @forwarder.resume
     end
+
+    def log_tag
+      "[#{Process.ppid}] engine-node"
+    end
   end
 end

  Modified: lib/droonga/engine_state.rb (+1 -1)
===================================================================
--- lib/droonga/engine_state.rb    2015-01-06 17:55:35 +0900 (a454921)
+++ lib/droonga/engine_state.rb    2015-01-06 18:26:50 +0900 (dfcc285)
@@ -19,7 +19,7 @@ require "coolio"
 
 require "droonga/loggable"
 require "droonga/event_loop"
-require "droonga/buffered_forwarder"
+require "droonga/forwarder"
 require "droonga/replier"
 
 module Droonga
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index