YUKI Hiroshi
null+****@clear*****
Tue Jan 6 18:26:50 JST 2015
YUKI Hiroshi 2015-01-06 18:26:50 +0900 (Tue, 06 Jan 2015) New Revision: 6bd1683488f4ef46903cc071aece250de89893ff https://github.com/droonga/droonga-engine/commit/6bd1683488f4ef46903cc071aece250de89893ff Message: Unify BufferedForwarder to EngineNode Removed files: lib/droonga/buffered_forwarder.rb Modified files: lib/droonga/cluster.rb lib/droonga/engine_node.rb lib/droonga/engine_state.rb Deleted: lib/droonga/buffered_forwarder.rb (+0 -88) 100644 =================================================================== --- lib/droonga/buffered_forwarder.rb 2015-01-06 17:55:35 +0900 (4426a9d) +++ /dev/null @@ -1,88 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -require "droonga/loggable" -require "droonga/forwarder" -require "droonga/forward_buffer" - -module Droonga - class BufferedForwarder - include Loggable - - def initialize(loop, options={}) - @loop = loop - @cluster_state = options[:cluster_state] - @buffers = {} - @forwarder = Forwarder.new(loop, :buffering => true) - end - - def start - logger.trace("start: start") - resume - logger.trace("start: done") - end - - def shutdown - logger.trace("shutdown: start") - @forwarder.shutdown - logger.trace("shutdown: done") - end - - def resume - @forwarder.resume - @buffers.each do |node_name, buffer| - if writable_node?(node_name) - buffer.start_forward - end - end - end - - def forward(message, destination) - logger.trace("forward: start") - receiver = destination["to"] - receiver_is_node = (receiver =~ /\A([^:]+:\d+\/[^\.]+)/) - node_name = $1 - unless receiver_is_node - @forwarder.forward(message, destination) - return - end - - buffer = @buffers[node_name] ||= ForwardBuffer.new(node_name, @forwarder) - - if not writable_node?(node_name) - buffer.add(message, destination) - elsif buffer.empty? - @forwarder.forward(message, destination) - else - buffer.add(message, destination) - buffer.start_forward - end - - logger.trace("forward: done") - end - - private - def writable_node?(node_name) - @cluster_state.nil? or - not @cluster_state.unwritable_node?(node_name) - end - - def log_tag - "[#{Process.ppid}] buffered-forwarder" - end - end -end Modified: lib/droonga/cluster.rb (+1 -13) =================================================================== --- lib/droonga/cluster.rb 2015-01-06 17:55:35 +0900 (cb90248) +++ lib/droonga/cluster.rb 2015-01-06 18:26:50 +0900 (a26e0ee) @@ -85,7 +85,7 @@ module Droonga receiver_node_name = receiver.match(/\A[^:]+:\d+\/[^.]+/).to_s @engine_nodes.each do |node| if node.name == receiver_node_name - node.forwarder.forward(message, destination) + node.forward(message, destination) return true end end @@ -142,18 +142,6 @@ module Droonga end.collect(&:name) end - def unwritable_node?(node_name) - case node_metadata.role - when NodeMetadata::Role::SERVICE_PROVIDER - absorb_source_nodes.include?(node_name) or - absorb_destination_nodes.include?(node_name) - when NodeMetadata::Role::ABSORB_SOURCE - absorb_destination_nodes.include?(node_name) - else - false - end - end - def on_change @on_change.call if @on_change end Modified: lib/droonga/engine_node.rb (+48 -0) =================================================================== --- lib/droonga/engine_node.rb 2015-01-06 17:55:35 +0900 (d7877b2) +++ lib/droonga/engine_node.rb 2015-01-06 18:26:50 +0900 (0b970ff) @@ -13,7 +13,9 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +require "droonga/loggable" require "droonga/forwarder" +require "droonga/forward_buffer" require "droonga/node_metadata" module Droonga @@ -26,6 +28,35 @@ module Droonga @sender_role = sender_role @forwarder = Forwarder.new(loop, :buffering => true) + @buffer = ForwardBuffer.new(name, @forwarder) + end + + def start + logger.trace("start: start") + resume + logger.trace("start: done") + end + + def shutdown + logger.trace("shutdown: start") + @forwarder.shutdown + logger.trace("shutdown: done") + end + + def resume + @forwarder.resume + @buffer.start_forward if really_writable? + end + + def forward(message, destination) + if not really_writable? + @buffer.add(message, destination) + elsif****@buffe*****? + @forwarder.forward(message, destination) + else + @buffer.add(message, destination) + @buffer.start_forward + end end def live? @@ -74,6 +105,18 @@ module Droonga end end + def really_writable? + return false unless writable? + case @sender_role + when NodeMetadata::Role::SERVICE_PROVIDER + service_provider? + when NodeMetadata::Role::ABSORB_SOURCE + not absorb_destination? + else + true + end + end + def status if forwardable? "active" @@ -84,8 +127,13 @@ module Droonga end end + private def on_change @forwarder.resume end + + def log_tag + "[#{Process.ppid}] engine-node" + end end end Modified: lib/droonga/engine_state.rb (+1 -1) =================================================================== --- lib/droonga/engine_state.rb 2015-01-06 17:55:35 +0900 (a454921) +++ lib/droonga/engine_state.rb 2015-01-06 18:26:50 +0900 (dfcc285) @@ -19,7 +19,7 @@ require "coolio" require "droonga/loggable" require "droonga/event_loop" -require "droonga/buffered_forwarder" +require "droonga/forwarder" require "droonga/replier" module Droonga -------------- next part -------------- HTML����������������������������...Download