Kouhei Sutou
null+****@clear*****
Wed Feb 26 17:25:42 JST 2014
Kouhei Sutou 2014-02-26 17:25:42 +0900 (Wed, 26 Feb 2014) New Revision: 6d80cd2a8de1e9875b07a06abdf765ba0266bb92 https://github.com/droonga/fluent-plugin-droonga/commit/6d80cd2a8de1e9875b07a06abdf765ba0266bb92 Message: catalog: move version 1 specific code to version1.rb Modified files: lib/droonga/catalog/base.rb lib/droonga/catalog/version1.rb test/unit/catalog/test_version1.rb Modified: lib/droonga/catalog/base.rb (+0 -393) =================================================================== --- lib/droonga/catalog/base.rb 2014-02-26 17:21:40 +0900 (b7ecd10) +++ lib/droonga/catalog/base.rb 2014-02-26 17:25:42 +0900 (295a7a7) @@ -27,79 +27,6 @@ module Droonga @data = data @path = path @base_path = File.dirname(path) - @errors = [] - - validate - raise MultiplexError.new(@errors) unles****@error*****? - - prepare_data - end - - def option(name) - @options[name] - end - - def get_partitions(name) - device = @data["farms"][name]["device"] - pattern = Regexp.new("^#{name}\.") - results = {} - @data["datasets"].each do |dataset_name, dataset| - workers = dataset["workers"] - plugins = dataset["plugins"] - dataset["ring"].each do |key, part| - part["partitions"].each do |range, partitions| - partitions.each do |partition| - if partition =~ pattern - path = File.join([device, $POSTMATCH, "db"]) - path = File.expand_path(path, base_path) - options = { - :dataset => dataset_name, - :database => path, - :n_workers => workers, - :plugins => plugins - } - results[partition] = options - end - end - end - end - end - return results - end - - def get_routes(name, args) - routes = [] - dataset = dataset(name) - case args["type"] - when "broadcast" - dataset["ring"].each do |key, partition| - select_range_and_replicas(partition, args, routes) - end - when "scatter" - name = get_partition(dataset, args["key"]) - partition = dataset["ring"][name] - select_range_and_replicas(partition, args, routes) - end - return routes - end - - def get_partition(dataset, key) - continuum = dataset["continuum"] - return dataset["ring"].keys[0] unless continuum - hash = Zlib.crc32(key) - min = 0 - max = continuum.size - 1 - while (min < max) do - index = (min + max) / 2 - value, key = continuum[index] - return key if value == hash - if value > hash - max = index - else - min = index + 1 - end - end - return continuum[max][1] end def datasets @@ -113,326 +40,6 @@ module Droonga def dataset(name) datasets[name] end - - def select_range_and_replicas(partition, args, routes) - date_range = args["date_range"] || 0..-1 - partition["partitions"].sort[date_range].each do |time, replicas| - case args["replica"] - when "top" - routes << replicas[0] - when "random" - routes << replicas[rand(replicas.size)] - when "all" - routes.concat(replicas) - end - end - end - - private - def prepare_data - @data["datasets"].each do |name, dataset| - number_of_partitions = dataset["number_of_partitions"] - next if number_of_partitions < 2 - total_weight = compute_total_weight(dataset) - continuum = [] - dataset["ring"].each do |key, value| - points = number_of_partitions * 160 * value["weight"] / total_weight - points.times do |point| - hash = Digest::SHA1.hexdigest("#{key}:#{point}") - continuum << [hash[0..7].to_i(16), key] - end - end - dataset["continuum"] = continuum.sort do |a, b| a[0] - b[0]; end - end - @options = @data["options"] || {} - end - - def compute_total_weight(dataset) - dataset["ring"].reduce(0) do |result, zone| - result + zone[1]["weight"] - end - end - - def validate - do_validation do - validate_effective_date - end - do_validation do - validate_farms - end - do_validation do - validate_zones - end - do_validation do - validate_datasets - end - do_validation do - validate_zone_relations - end - do_validation do - validate_database_relations - end - end - - def do_validation(&block) - begin - yield - rescue ValidationError => error - @errors << error - end - end - - def validate_required_parameter(value, name) - raise MissingRequiredParameter.new(name, @path) unless value - end - - def validate_parameter_type(expected_types, value, name) - expected_types = [expected_types] unless expected_types.is_a?(Array) - - if expected_types.any? do |type| - value.is_a?(type) - end - return - end - - raise MismatchedParameterType.new(name, - expected_types, - value.class, - @path) - end - - def validate_valid_datetime(value, name) - validate_required_parameter(value, name) - validate_parameter_type(String, value, name) - begin - Time.parse(value) - rescue ArgumentError => error - raise InvalidDate.new(name, value, @path) - end - end - - def validate_positive_numeric_parameter(value, name) - validate_required_parameter(value, name) - validate_parameter_type(Numeric, value, name) - if value < 0 - raise NegativeNumber.new(name, value, @path) - end - end - - def validate_positive_integer_parameter(value, name) - validate_required_parameter(value, name) - validate_parameter_type(Integer, value, name) - if value < 0 - raise NegativeNumber.new(name, value, @path) - end - end - - def validate_one_or_larger_integer_parameter(value, name) - validate_required_parameter(value, name) - validate_parameter_type(Integer, value, name) - if value < 1 - raise SmallerThanOne.new(name, value, @path) - end - end - - def validate_effective_date - date = @data["effective_date"] - validate_required_parameter(date, "effective_date") - validate_valid_datetime(date, "effective_date") - end - - def validate_farms - farms = @data["farms"] - - validate_required_parameter(farms, "farms") - validate_parameter_type(Hash, farms, "farms") - - farms.each do |key, value| - validate_farm(value, "farms.#{key}") - end - end - - def validate_farm(farm, name) - validate_parameter_type(Hash, farm, name) - - validate_required_parameter(farm["device"], "#{name}.device") - validate_parameter_type(String, farm["device"], "#{name}.device") - end - - def validate_zones - zones = @data["zones"] - - validate_required_parameter(zones, "zones") - validate_parameter_type(Array, zones, "zones") - - validate_zone(zones, "zones") - end - - def validate_zone(zone, name) - case zone - when String - return - when Array - zone.each_with_index do |sub_zone, index| - validate_zone(sub_zone, "#{name}[#{index}]") - end - else - validate_parameter_type([String, Array], zone, name) - end - end - - def validate_datasets - datasets = @data["datasets"] - - validate_required_parameter(datasets, "datasets") - validate_parameter_type(Hash, datasets, "datasets") - - datasets.each do |name, dataset| - validate_dataset(dataset, "datasets.#{name}") - end - end - - def validate_dataset(dataset, name) - validate_parameter_type(Hash, dataset, name) - - do_validation do - validate_one_or_larger_integer_parameter(dataset["number_of_partitions"], - "#{name}.number_of_partitions") - end - do_validation do - validate_one_or_larger_integer_parameter(dataset["number_of_replicas"], - "#{name}.number_of_replicas") - end - do_validation do - validate_positive_integer_parameter(dataset["workers"], - "#{name}.workers") - end - do_validation do - validate_date_range(dataset["date_range"], "#{name}.date_range") - end - do_validation do - validate_partition_key(dataset["partition_key"], - "#{name}.partition_key") - end - - do_validation do - ring = dataset["ring"] - validate_required_parameter(ring, "#{name}.ring") - validate_parameter_type(Hash, ring, "#{name}.ring") - ring.each do |key, value| - validate_ring(value, "#{name}.ring.#{key}") - end - end - - do_validation do - validate_plugins(dataset["plugins"], "#{name}.plugins") - end - end - - def validate_date_range(value, name) - validate_required_parameter(value, name) - return if value == "infinity" - raise UnsupportedValue.new(name, value, @path) - end - - def validate_partition_key(value, name) - validate_required_parameter(value, name) - validate_parameter_type(String, value, name) - return if value == "_key" - raise UnsupportedValue.new(name, value, @path) - end - - def validate_ring(ring, name) - validate_parameter_type(Hash, ring, name) - - do_validation do - validate_positive_numeric_parameter(ring["weight"], "#{name}.weight") - end - - do_validation do - validate_parameter_type(Hash, ring["partitions"], "#{name}.partitions") - ring["partitions"].each do |key, value| - validate_partition(value, "#{name}.partitions.#{key}") - end - end - end - - def validate_partition(partition, name) - validate_parameter_type(Array, partition, name) - - partition.each_with_index do |value, index| - do_validation do - validate_parameter_type(String, value, "#{name}[#{index}]") - end - end - end - - def validate_plugins(plugins, name) - return unless plugins - validate_required_parameter(plugins, name) - validate_parameter_type(Array, plugins, "#{name}.plugins") - end - - def validate_zone_relations - return unless @data["zones"].is_a?(Array) - return unless @data["farms"].is_a?(Hash) - - farms = @data["farms"] - zones = @data["zones"] - - all_farms = farms.keys - all_zones = zones.flatten - - all_farms.each do |farm| - unless all_zones.include?(farm) - raise FarmNotZoned.new(farm, zones, @path) - end - end - - all_zones.each do |zone| - unless all_farms.include?(zone) - raise UnknownFarmInZones.new(farm, zones, @path) - end - end - end - - def validate_database_relations - return unless @data["farms"] - - farm_names = @data["farms"].keys.collect do |name| - Regexp.escape(name) - end - valid_farms_matcher = Regexp.new("^(#{farm_names.join("|")})\.") - - datasets.each do |dataset_name, dataset| - ring = dataset["ring"] - next if ring.nil? or !ring.is_a?(Hash) - ring.each do |ring_key, part| - partitions_set = part["partitions"] - next if partitions_set.nil? or !partitions_set.is_a?(Hash) - partitions_set.each do |range, partitions| - next unless partitions.is_a?(Array) - partitions.each_with_index do |partition, index| - name = "datasets.#{dataset_name}.ring.#{ring_key}." + - "partitions.#{range}[#{index}]" - do_validation do - unless partition =~ valid_farms_matcher - raise UnknownFarmForPartition.new(name, partition, @path) - end - do_validation do - directory_name = $POSTMATCH - if directory_name.nil? or directory_name.empty? - message = "\"#{partition}\" has no database name. " + - "You mus specify a database name for \"#{name}\"." - raise ValidationError.new(message, @path) - end - end - end - end - end - end - end - end end end end Modified: lib/droonga/catalog/version1.rb (+392 -0) =================================================================== --- lib/droonga/catalog/version1.rb 2014-02-26 17:21:40 +0900 (9185842) +++ lib/droonga/catalog/version1.rb 2014-02-26 17:25:42 +0900 (03505af) @@ -18,6 +18,398 @@ require "droonga/catalog/base" module Droonga module Catalog class Version1 < Base + def initialize(data, path) + super + @errors = [] + + validate + raise MultiplexError.new(@errors) unles****@error*****? + + prepare_data + end + + def get_partitions(name) + device = @data["farms"][name]["device"] + pattern = Regexp.new("^#{name}\.") + results = {} + @data["datasets"].each do |dataset_name, dataset| + workers = dataset["workers"] + plugins = dataset["plugins"] + dataset["ring"].each do |key, part| + part["partitions"].each do |range, partitions| + partitions.each do |partition| + if partition =~ pattern + path = File.join([device, $POSTMATCH, "db"]) + path = File.expand_path(path, base_path) + options = { + :dataset => dataset_name, + :database => path, + :n_workers => workers, + :plugins => plugins + } + results[partition] = options + end + end + end + end + end + return results + end + + def get_routes(name, args) + routes = [] + dataset = dataset(name) + case args["type"] + when "broadcast" + dataset["ring"].each do |key, partition| + select_range_and_replicas(partition, args, routes) + end + when "scatter" + name = get_partition(dataset, args["key"]) + partition = dataset["ring"][name] + select_range_and_replicas(partition, args, routes) + end + return routes + end + + def get_partition(dataset, key) + continuum = dataset["continuum"] + return dataset["ring"].keys[0] unless continuum + hash = Zlib.crc32(key) + min = 0 + max = continuum.size - 1 + while (min < max) do + index = (min + max) / 2 + value, key = continuum[index] + return key if value == hash + if value > hash + max = index + else + min = index + 1 + end + end + return continuum[max][1] + end + + def select_range_and_replicas(partition, args, routes) + date_range = args["date_range"] || 0..-1 + partition["partitions"].sort[date_range].each do |time, replicas| + case args["replica"] + when "top" + routes << replicas[0] + when "random" + routes << replicas[rand(replicas.size)] + when "all" + routes.concat(replicas) + end + end + end + + private + def prepare_data + @data["datasets"].each do |name, dataset| + number_of_partitions = dataset["number_of_partitions"] + next if number_of_partitions < 2 + total_weight = compute_total_weight(dataset) + continuum = [] + dataset["ring"].each do |key, value| + points = number_of_partitions * 160 * value["weight"] / total_weight + points.times do |point| + hash = Digest::SHA1.hexdigest("#{key}:#{point}") + continuum << [hash[0..7].to_i(16), key] + end + end + dataset["continuum"] = continuum.sort do |a, b| a[0] - b[0]; end + end + @options = @data["options"] || {} + end + + def compute_total_weight(dataset) + dataset["ring"].reduce(0) do |result, zone| + result + zone[1]["weight"] + end + end + + def validate + do_validation do + validate_effective_date + end + do_validation do + validate_farms + end + do_validation do + validate_zones + end + do_validation do + validate_datasets + end + do_validation do + validate_zone_relations + end + do_validation do + validate_database_relations + end + end + + def do_validation(&block) + begin + yield + rescue ValidationError => error + @errors << error + end + end + + def validate_required_parameter(value, name) + raise MissingRequiredParameter.new(name, @path) unless value + end + + def validate_parameter_type(expected_types, value, name) + expected_types = [expected_types] unless expected_types.is_a?(Array) + + if expected_types.any? do |type| + value.is_a?(type) + end + return + end + + raise MismatchedParameterType.new(name, + expected_types, + value.class, + @path) + end + + def validate_valid_datetime(value, name) + validate_required_parameter(value, name) + validate_parameter_type(String, value, name) + begin + Time.parse(value) + rescue ArgumentError => error + raise InvalidDate.new(name, value, @path) + end + end + + def validate_positive_numeric_parameter(value, name) + validate_required_parameter(value, name) + validate_parameter_type(Numeric, value, name) + if value < 0 + raise NegativeNumber.new(name, value, @path) + end + end + + def validate_positive_integer_parameter(value, name) + validate_required_parameter(value, name) + validate_parameter_type(Integer, value, name) + if value < 0 + raise NegativeNumber.new(name, value, @path) + end + end + + def validate_one_or_larger_integer_parameter(value, name) + validate_required_parameter(value, name) + validate_parameter_type(Integer, value, name) + if value < 1 + raise SmallerThanOne.new(name, value, @path) + end + end + + def validate_effective_date + date = @data["effective_date"] + validate_required_parameter(date, "effective_date") + validate_valid_datetime(date, "effective_date") + end + + def validate_farms + farms = @data["farms"] + + validate_required_parameter(farms, "farms") + validate_parameter_type(Hash, farms, "farms") + + farms.each do |key, value| + validate_farm(value, "farms.#{key}") + end + end + + def validate_farm(farm, name) + validate_parameter_type(Hash, farm, name) + + validate_required_parameter(farm["device"], "#{name}.device") + validate_parameter_type(String, farm["device"], "#{name}.device") + end + + def validate_zones + zones = @data["zones"] + + validate_required_parameter(zones, "zones") + validate_parameter_type(Array, zones, "zones") + + validate_zone(zones, "zones") + end + + def validate_zone(zone, name) + case zone + when String + return + when Array + zone.each_with_index do |sub_zone, index| + validate_zone(sub_zone, "#{name}[#{index}]") + end + else + validate_parameter_type([String, Array], zone, name) + end + end + + def validate_datasets + datasets = @data["datasets"] + + validate_required_parameter(datasets, "datasets") + validate_parameter_type(Hash, datasets, "datasets") + + datasets.each do |name, dataset| + validate_dataset(dataset, "datasets.#{name}") + end + end + + def validate_dataset(dataset, name) + validate_parameter_type(Hash, dataset, name) + + do_validation do + validate_one_or_larger_integer_parameter(dataset["number_of_partitions"], + "#{name}.number_of_partitions") + end + do_validation do + validate_one_or_larger_integer_parameter(dataset["number_of_replicas"], + "#{name}.number_of_replicas") + end + do_validation do + validate_positive_integer_parameter(dataset["workers"], + "#{name}.workers") + end + do_validation do + validate_date_range(dataset["date_range"], "#{name}.date_range") + end + do_validation do + validate_partition_key(dataset["partition_key"], + "#{name}.partition_key") + end + + do_validation do + ring = dataset["ring"] + validate_required_parameter(ring, "#{name}.ring") + validate_parameter_type(Hash, ring, "#{name}.ring") + ring.each do |key, value| + validate_ring(value, "#{name}.ring.#{key}") + end + end + + do_validation do + validate_plugins(dataset["plugins"], "#{name}.plugins") + end + end + + def validate_date_range(value, name) + validate_required_parameter(value, name) + return if value == "infinity" + raise UnsupportedValue.new(name, value, @path) + end + + def validate_partition_key(value, name) + validate_required_parameter(value, name) + validate_parameter_type(String, value, name) + return if value == "_key" + raise UnsupportedValue.new(name, value, @path) + end + + def validate_ring(ring, name) + validate_parameter_type(Hash, ring, name) + + do_validation do + validate_positive_numeric_parameter(ring["weight"], "#{name}.weight") + end + + do_validation do + validate_parameter_type(Hash, ring["partitions"], "#{name}.partitions") + ring["partitions"].each do |key, value| + validate_partition(value, "#{name}.partitions.#{key}") + end + end + end + + def validate_partition(partition, name) + validate_parameter_type(Array, partition, name) + + partition.each_with_index do |value, index| + do_validation do + validate_parameter_type(String, value, "#{name}[#{index}]") + end + end + end + + def validate_plugins(plugins, name) + return unless plugins + validate_required_parameter(plugins, name) + validate_parameter_type(Array, plugins, "#{name}.plugins") + end + + def validate_zone_relations + return unless @data["zones"].is_a?(Array) + return unless @data["farms"].is_a?(Hash) + + farms = @data["farms"] + zones = @data["zones"] + + all_farms = farms.keys + all_zones = zones.flatten + + all_farms.each do |farm| + unless all_zones.include?(farm) + raise FarmNotZoned.new(farm, zones, @path) + end + end + + all_zones.each do |zone| + unless all_farms.include?(zone) + raise UnknownFarmInZones.new(farm, zones, @path) + end + end + end + + def validate_database_relations + return unless @data["farms"] + + farm_names = @data["farms"].keys.collect do |name| + Regexp.escape(name) + end + valid_farms_matcher = Regexp.new("^(#{farm_names.join("|")})\.") + + datasets.each do |dataset_name, dataset| + ring = dataset["ring"] + next if ring.nil? or !ring.is_a?(Hash) + ring.each do |ring_key, part| + partitions_set = part["partitions"] + next if partitions_set.nil? or !partitions_set.is_a?(Hash) + partitions_set.each do |range, partitions| + next unless partitions.is_a?(Array) + partitions.each_with_index do |partition, index| + name = "datasets.#{dataset_name}.ring.#{ring_key}." + + "partitions.#{range}[#{index}]" + do_validation do + unless partition =~ valid_farms_matcher + raise UnknownFarmForPartition.new(name, partition, @path) + end + do_validation do + directory_name = $POSTMATCH + if directory_name.nil? or directory_name.empty? + message = "\"#{partition}\" has no database name. " + + "You mus specify a database name for \"#{name}\"." + raise ValidationError.new(message, @path) + end + end + end + end + end + end + end + end end end end Modified: test/unit/catalog/test_version1.rb (+0 -17) =================================================================== --- test/unit/catalog/test_version1.rb 2014-02-26 17:21:40 +0900 (83b703a) +++ test/unit/catalog/test_version1.rb 2014-02-26 17:25:42 +0900 (dad45d9) @@ -36,23 +36,6 @@ class CatalogTestVersion1 < Test::Unit::TestCase Droonga::Catalog::Version1.new(data, path) end - class OptionTest < self - def create_catalog(options) - super(minimum_data.merge("options" => options), "path") - end - - def test_nonexistent - catalog = create_catalog({}) - assert_nil(catalog.option("nonexistent")) - end - - def test_existent - catalog = create_catalog("plugins" => ["crud", "groonga"]) - assert_equal(["crud", "groonga"], - catalog.option("plugins")) - end - end - class PartitionTest < self def setup data = JSON.parse(File.read(catalog_path)) -------------- next part -------------- HTML����������������������������...Download