[Groonga-commit] groonga/grnxx at 6e4fbf2 [master] Enable Merger. (#113)

Back to archive index

susumu.yata null+****@clear*****
Tue Dec 16 10:54:30 JST 2014


susumu.yata	2014-11-21 15:13:15 +0900 (Fri, 21 Nov 2014)

  New Revision: 6e4fbf2a36e8a181e5233ca467bbf83142fb0e8e
  https://github.com/groonga/grnxx/commit/6e4fbf2a36e8a181e5233ca467bbf83142fb0e8e

  Message:
    Enable Merger. (#113)

  Added files:
    lib/grnxx/impl/merger.cpp
    lib/grnxx/impl/merger.hpp
  Modified files:
    include/grnxx/Makefile.am
    include/grnxx/merger.hpp
    lib/grnxx/Makefile.am
    lib/grnxx/impl/Makefile.am
    lib/grnxx/merger.cpp

  Modified: include/grnxx/Makefile.am (+2 -3)
===================================================================
--- include/grnxx/Makefile.am    2014-11-20 19:33:23 +0900 (eb8daa7)
+++ include/grnxx/Makefile.am    2014-11-21 15:13:15 +0900 (0444dc1)
@@ -13,10 +13,9 @@ pkginclude_HEADERS =	\
 	features.hpp	\
 	index.hpp	\
 	library.hpp	\
+	merger.hpp	\
 	sorter.hpp	\
 	string.hpp	\
 	table.hpp
 
-#	merger.hpp	\
-#	pipeline.hpp	\
-#	sorter.hpp
+#	pipeline.hpp

  Modified: include/grnxx/merger.hpp (+73 -28)
===================================================================
--- include/grnxx/merger.hpp    2014-11-20 19:33:23 +0900 (97944d5)
+++ include/grnxx/merger.hpp    2014-11-21 15:13:15 +0900 (6dc7e39)
@@ -1,64 +1,109 @@
 #ifndef GRNXX_MERGER_HPP
 #define GRNXX_MERGER_HPP
 
-#include "grnxx/types.hpp"
+#include <limits>
+#include <memory>
+
+#include "grnxx/array.hpp"
+#include "grnxx/data_types.hpp"
 
 namespace grnxx {
 
+enum MergerLogicalOperatorType {
+  // Keep records included in both the first input stream and the second input
+  // stream.
+  MERGER_LOGICAL_AND,
+  // Keep records included in the first input stream and/or the second input
+  // stream.
+  MERGER_LOGICAL_OR,
+  // Keep records included in only one of the input streams.
+  MERGER_LOGICAL_XOR,
+  // Keep records included in the first input stream and not included in the
+  // second input stream.
+  MERGER_LOGICAL_MINUS,
+  // Keep records included in the first input stream.
+  MERGER_LOGICAL_LEFT,
+  // Keep records included in the second input stream.
+  MERGER_LOGICAL_RIGHT
+};
+
+enum MergerScoreOperatorType {
+  // Add the first input score and the second input score.
+  MERGER_SCORE_PLUS,
+  // Subtract the second input score from the first input score.
+  MERGER_SCORE_MINUS,
+  // Multiply the first input score by the second input score.
+  MERGER_SCORE_MULTIPLICATION,
+  // Ignores the second input score.
+  MERGER_SCORE_LEFT,
+  // Ignores the first input score.
+  MERGER_SCORE_RIGHT,
+  // All zeros.
+  MERGER_SCORE_ZERO
+};
+
+struct MergerOptions {
+  // How to merge records.
+  MergerLogicalOperatorType logical_operator_type;
+  // How to merge scores.
+  MergerScoreOperatorType score_operator_type;
+  // The score of a missing record is replaced with this value.
+  Float missing_score;
+  // The first "offset" records are skipped.
+  size_t offset;
+  // At most "limit" records are returned.
+  size_t limit;
+
+  MergerOptions()
+      : logical_operator_type(MERGER_LOGICAL_AND),
+        score_operator_type(MERGER_SCORE_PLUS),
+        missing_score(0.0),
+        offset(0),
+        limit(std::numeric_limits<size_t>::max()) {}
+};
+
 class Merger {
  public:
-  Merger();
-  virtual ~Merger();
+  Merger() = default;
+  virtual ~Merger() = default;
 
-  // Create an object for merging record arrays.
+  // Create an object for merging record sets.
   //
-  // On success, returns a poitner to the merger.
-  // On failure, returns nullptr and stores error information into "*error" if
-  // "error" != nullptr.
-  static unique_ptr<Merger> create(
-      Error *error,
+  // On success, returns the merger.
+  // On failure, throws an exception.
+  static std::unique_ptr<Merger> create(
       const MergerOptions &options = MergerOptions());
 
   // Set the target record sets.
   //
   // Aborts merging the old record sets and starts merging the new record sets.
   //
-  // On success, returns true.
-  // On failure, returns false and stores error information into "*error" if
-  // "error" != nullptr.
-  virtual bool reset(Error *error,
-                     Array<Record> *input_records_1,
+  // On failure, throws an exception.
+  virtual void reset(Array<Record> *input_records_1,
                      Array<Record> *input_records_2,
                      Array<Record> *output_records) = 0;
 
   // Progress merging.
   //
-  // On success, returns true.
-  // On failure, returns false and stores error information into "*error" if
-  // "error" != nullptr.
-  virtual bool progress(Error *error);
+  // On failure, throws an exception.
+  virtual void progress() = 0;
 
   // Finish merging.
   //
   // Assumes that all the records are ready.
   // Leaves only the result records if offset and limit are specified.
   //
-  // On success, returns true.
-  // On failure, returns false and stores error information into "*error" if
-  // "error" != nullptr.
-  virtual bool finish(Error *error) = 0;
+  // On failure, throws an exception.
+  virtual void finish() = 0;
 
   // Merge records.
   //
   // Calls reset() and finish() to merge records.
   //
-  // On success, returns true.
-  // On failure, returns false and stores error information into "*error" if
-  // "error" != nullptr.
-  virtual bool merge(Error *error,
-                     Array<Record> *input_records_1,
+  // On failure, throws an exception.
+  virtual void merge(Array<Record> *input_records_1,
                      Array<Record> *input_records_2,
-                     Array<Record> *output_records);
+                     Array<Record> *output_records) = 0;
 };
 
 }  // namespace grnxx

  Modified: lib/grnxx/Makefile.am (+1 -1)
===================================================================
--- lib/grnxx/Makefile.am    2014-11-20 19:33:23 +0900 (e4062c3)
+++ lib/grnxx/Makefile.am    2014-11-21 15:13:15 +0900 (a2be799)
@@ -13,11 +13,11 @@ libgrnxx_la_SOURCES =			\
 	db.cpp				\
 	expression.cpp			\
 	library.cpp			\
+	merger.cpp			\
 	sorter.cpp			\
 	string.cpp
 
 #	index.cpp			\
-#	merger.cpp			\
 #	pipeline.cpp
 
 libgrnxx_includedir = ${includedir}/grnxx

  Modified: lib/grnxx/impl/Makefile.am (+2 -0)
===================================================================
--- lib/grnxx/impl/Makefile.am    2014-11-20 19:33:23 +0900 (fc6cb2d)
+++ lib/grnxx/impl/Makefile.am    2014-11-21 15:13:15 +0900 (6f34085)
@@ -11,6 +11,7 @@ libgrnxx_impl_la_LDFLAGS = @AM_LTLDFLAGS@
 libgrnxx_impl_la_SOURCES =		\
 	db.cpp				\
 	expression.cpp			\
+	merger.cpp			\
 	sorter.cpp			\
 	table.cpp
 
@@ -21,5 +22,6 @@ libgrnxx_impl_include_HEADERS =		\
 	db.hpp				\
 	expression.hpp			\
 	index.hpp			\
+	merger.hpp			\
 	sorter.hpp			\
 	table.hpp

  Added: lib/grnxx/impl/merger.cpp (+772 -0) 100644
===================================================================
--- /dev/null
+++ lib/grnxx/impl/merger.cpp    2014-11-21 15:13:15 +0900 (af0c20c)
@@ -0,0 +1,772 @@
+#include "grnxx/impl/merger.hpp"
+
+#include <new>
+#include <unordered_map>
+
+namespace grnxx {
+namespace impl {
+namespace merger {
+
+// -- AndMerger --
+
+class AndMerger : public Merger {
+ public:
+  // -- Public API (grnxx/merger.hpp) --
+
+  AndMerger(const MergerOptions &options) : Merger(options) {}
+  ~AndMerger() = default;
+
+  void finish();
+};
+
+void AndMerger::finish() {
+  // Create a hash table from the smaller input.
+  Array<Record> *filter_records;
+  Array<Record> *stream_records;
+  if (input_records_1_->size() < input_records_2_->size()) {
+    filter_records = input_records_1_;
+    stream_records = input_records_2_;
+  } else {
+    filter_records = input_records_2_;
+    stream_records = input_records_1_;
+  }
+  std::unordered_map<int64_t, Float> filter;
+  for (size_t i = 0; i < filter_records->size(); ++i) try {
+    filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score;
+  } catch (const std::bad_alloc &) {
+    throw "Memory allocation failed";  // TODO
+  }
+
+  // Filter the stream (the larger input) with the hash table.
+  const bool stream_is_1 = (stream_records == input_records_1_);
+  for (size_t i = 0; i < stream_records->size(); ++i) {
+    auto it = filter.find((*stream_records)[i].row_id.value());
+    if (it != filter.end()) {
+      Record record;
+      record.row_id = Int(it->first);
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score = (*stream_records)[i].score + it->second;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score - it->second;
+          } else {
+            record.score = it->second - (*stream_records)[i].score;
+          }
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score = (*stream_records)[i].score * it->second;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score;
+          } else {
+            record.score = it->second;
+          }
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          if (stream_is_1) {
+            record.score = it->second;
+          } else {
+            record.score = (*stream_records)[i].score;
+          }
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+      output_records_->push_back(record);
+    }
+  }
+
+  // Remove out-of-range records.
+  if (offset_ > 0) {
+    for (size_t i = offset_; i < output_records_->size(); ++i) {
+      (*output_records_)[i - offset_] = (*output_records_)[i];
+    }
+    output_records_->resize(output_records_->size() - offset_);
+  }
+  if (limit_ < output_records_->size()) {
+    output_records_->resize(limit_);
+  }
+  input_records_1_->clear();
+  input_records_2_->clear();
+}
+
+// -- OrMerger --
+
+class OrMerger : public Merger {
+ public:
+  // -- Public API (grnxx/merger.hpp) --
+
+  OrMerger(const MergerOptions &options) : Merger(options) {}
+  ~OrMerger() = default;
+
+  void finish();
+};
+
+void OrMerger::finish() {
+  // Create a hash table from the smaller input.
+  Array<Record> *filter_records;
+  Array<Record> *stream_records;
+  if (input_records_1_->size() < input_records_2_->size()) {
+    filter_records = input_records_1_;
+    stream_records = input_records_2_;
+  } else {
+    filter_records = input_records_2_;
+    stream_records = input_records_1_;
+  }
+  std::unordered_map<int64_t, Float> filter;
+  for (size_t i = 0; i < filter_records->size(); ++i) try {
+    filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score;
+  } catch (const std::bad_alloc &) {
+    throw "Memory allocation failed";  // TODO
+  }
+
+  // Filter the stream (the larger input) with the hash table.
+  const bool stream_is_1 = (stream_records == input_records_1_);
+  for (size_t i = 0; i < stream_records->size(); ++i) {
+    Record record;
+    record.row_id = (*stream_records)[i].row_id;
+    auto it = filter.find((*stream_records)[i].row_id.value());
+    if (it == filter.end()) {
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score = (*stream_records)[i].score + missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score - missing_score_;
+          } else {
+            record.score = missing_score_ - (*stream_records)[i].score;
+          }
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score = (*stream_records)[i].score * missing_score_;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score;
+          } else {
+            record.score = missing_score_;
+          }
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          if (stream_is_1) {
+            record.score = missing_score_;
+          } else {
+            record.score = (*stream_records)[i].score;
+          }
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+    } else {
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score = it->second + (*stream_records)[i].score;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score - it->second;
+          } else {
+            record.score = it->second - (*stream_records)[i].score;
+          }
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score = it->second * (*stream_records)[i].score;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score;
+          } else {
+            record.score = it->second;
+          }
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          if (!stream_is_1) {
+            record.score = (*stream_records)[i].score;
+          } else {
+            record.score = it->second;
+          }
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+      filter.erase(it);
+    }
+    output_records_->push_back(record);
+  }
+
+  for (auto it : filter) {
+    switch (score_operator_type_) {
+      case MERGER_SCORE_PLUS: {
+        it.second += missing_score_;
+        break;
+      }
+      case MERGER_SCORE_MINUS: {
+        if (stream_is_1) {
+          it.second = missing_score_ - it.second;
+        } else {
+          it.second -= missing_score_;
+        }
+        break;
+      }
+      case MERGER_SCORE_MULTIPLICATION: {
+        it.second *= missing_score_;
+        break;
+      }
+      case MERGER_SCORE_LEFT: {
+        if (stream_is_1) {
+          it.second = missing_score_;
+        }
+        break;
+      }
+      case MERGER_SCORE_RIGHT: {
+        if (!stream_is_1) {
+          it.second = missing_score_;
+        }
+        break;
+      }
+      case MERGER_SCORE_ZERO: {
+        it.second = Float(0.0);
+        break;
+      }
+    }
+    output_records_->push_back(Record(Int(it.first), it.second));
+  }
+
+  // Remove out-of-range records.
+  if (offset_ > 0) {
+    for (size_t i = offset_; i < output_records_->size(); ++i) {
+      (*output_records_)[i - offset_] = (*output_records_)[i];
+    }
+    output_records_->resize(output_records_->size() - offset_);
+  }
+  if (limit_ < output_records_->size()) {
+    output_records_->resize(limit_);
+  }
+  input_records_1_->clear();
+  input_records_2_->clear();
+}
+
+class XorMerger : public Merger {
+ public:
+  // -- Public API (grnxx/merger.hpp) --
+
+  XorMerger(const MergerOptions &options) : Merger(options) {}
+  ~XorMerger() = default;
+
+  void finish();
+};
+
+void XorMerger::finish() {
+  // Create a hash table from the smaller input.
+  Array<Record> *filter_records;
+  Array<Record> *stream_records;
+  if (input_records_1_->size() < input_records_2_->size()) {
+    filter_records = input_records_1_;
+    stream_records = input_records_2_;
+  } else {
+    filter_records = input_records_2_;
+    stream_records = input_records_1_;
+  }
+  std::unordered_map<int64_t, Float> filter;
+  for (size_t i = 0; i < filter_records->size(); ++i) try {
+    filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score;
+  } catch (...) {
+    throw "Memory allocation failed";  // TODO
+  }
+
+  // Filter the stream (the larger input) with the hash table.
+  const bool stream_is_1 = (stream_records == input_records_1_);
+  for (size_t i = 0; i < stream_records->size(); ++i) {
+    auto it = filter.find((*stream_records)[i].row_id.value());
+    if (it != filter.end()) {
+      filter.erase(it);
+    } else {
+      Record record;
+      record.row_id = (*stream_records)[i].row_id;
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score = (*stream_records)[i].score + missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score - missing_score_;
+          } else {
+            record.score = missing_score_ - (*stream_records)[i].score;
+          }
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score = (*stream_records)[i].score * missing_score_;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          if (stream_is_1) {
+            record.score = (*stream_records)[i].score;
+          } else {
+            record.score = missing_score_;
+          }
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          if (stream_is_1) {
+            record.score = missing_score_;
+          } else {
+            record.score = (*stream_records)[i].score;
+          }
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+      output_records_->push_back(record);
+    }
+  }
+
+  for (auto it : filter) {
+    switch (score_operator_type_) {
+      case MERGER_SCORE_PLUS: {
+        it.second += missing_score_;
+        break;
+      }
+      case MERGER_SCORE_MINUS: {
+        if (stream_is_1) {
+          it.second = missing_score_ - it.second;
+        } else {
+          it.second -= missing_score_;
+        }
+        break;
+      }
+      case MERGER_SCORE_MULTIPLICATION: {
+        it.second *= missing_score_;
+        break;
+      }
+      case MERGER_SCORE_LEFT: {
+        if (stream_is_1) {
+          it.second = missing_score_;
+        }
+        break;
+      }
+      case MERGER_SCORE_RIGHT: {
+        if (!stream_is_1) {
+          it.second = missing_score_;
+        }
+        break;
+      }
+      case MERGER_SCORE_ZERO: {
+        it.second = Float(0.0);
+        break;
+      }
+    }
+    output_records_->push_back(Record(Int(it.first), it.second));
+  }
+
+  // Remove out-of-range records.
+  if (offset_ > 0) {
+    for (size_t i = offset_; i < output_records_->size(); ++i) {
+      (*output_records_)[i - offset_] = (*output_records_)[i];
+    }
+    output_records_->resize(output_records_->size() - offset_);
+  }
+  if (limit_ < output_records_->size()) {
+    output_records_->resize(limit_);
+  }
+  input_records_1_->clear();
+  input_records_2_->clear();
+}
+
+class MinusMerger : public Merger {
+ public:
+  // -- Public API (grnxx/merger.hpp) --
+
+  MinusMerger(const MergerOptions &options) : Merger(options) {}
+  ~MinusMerger() = default;
+
+  void finish();
+};
+
+void MinusMerger::finish() {
+  // Create a hash table from the smaller input.
+  Array<Record> *filter_records;
+  Array<Record> *stream_records;
+  if (input_records_1_->size() < input_records_2_->size()) {
+    filter_records = input_records_1_;
+    stream_records = input_records_2_;
+  } else {
+    filter_records = input_records_2_;
+    stream_records = input_records_1_;
+  }
+  std::unordered_map<int64_t, Float> filter;
+  for (size_t i = 0; i < filter_records->size(); ++i) try {
+    filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score;
+  } catch (...) {
+    throw "Memory allocation failed";  // TODO
+  }
+
+  // Filter the stream (the larger input) with the hash table.
+  const bool stream_is_1 = (stream_records == input_records_1_);
+  if (stream_is_1) {
+    for (size_t i = 0; i < stream_records->size(); ++i) {
+      auto it = filter.find((*stream_records)[i].row_id.value());
+      if (it != filter.end()) {
+        continue;
+      }
+      Record record = stream_records->get(i);
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score += missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          record.score -= missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score *= missing_score_;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          record.score = missing_score_;
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+      output_records_->push_back(record);
+    }
+  } else {
+    for (size_t i = 0; i < stream_records->size(); ++i) {
+      auto it = filter.find((*stream_records)[i].row_id.value());
+      if (it != filter.end()) {
+        filter.erase(it);
+      }
+    }
+    for (auto it : filter) {
+      Record record;
+      record.row_id = Int(it.first);
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score = it.second + missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          record.score = it.second - missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score = it.second * missing_score_;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          record.score = it.second;
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          record.score = missing_score_;
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+      output_records_->push_back(record);
+    }
+  }
+
+  // Remove out-of-range records.
+  if (offset_ > 0) {
+    for (size_t i = offset_; i < output_records_->size(); ++i) {
+      (*output_records_)[i - offset_] = (*output_records_)[i];
+    }
+    output_records_->resize(output_records_->size() - offset_);
+  }
+  if (limit_ < output_records_->size()) {
+    output_records_->resize(limit_);
+  }
+  input_records_1_->clear();
+  input_records_2_->clear();
+}
+
+class LeftMerger : public Merger {
+ public:
+  // -- Public API (grnxx/merger.hpp) --
+
+  LeftMerger(const MergerOptions &options) : Merger(options) {}
+  ~LeftMerger() = default;
+
+  void finish();
+};
+
+void LeftMerger::finish() {
+  // Create a hash table from the second input.
+  std::unordered_map<int64_t, Float> filter;
+  for (size_t i = 0; i < input_records_2_->size(); ++i) {
+    filter[(*input_records_2_)[i].row_id.value()] =
+        (*input_records_2_)[i].score;
+  }
+
+  // Adjust score of the first input.
+  for (size_t i = 0; i < input_records_1_->size(); ++i) {
+    Record record = input_records_1_->get(i);
+    auto it = filter.find(record.row_id.value());
+    if (it != filter.end()) {
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score += it->second;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          record.score -= it->second;
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score *= it->second;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          record.score = it->second;
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+    } else {
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score += missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          record.score -= missing_score_;
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score *= missing_score_;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          record.score = missing_score_;
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+    }
+    output_records_->push_back(record);
+  }
+
+  // Remove out-of-range records.
+  if (offset_ > 0) {
+    for (size_t i = offset_; i < output_records_->size(); ++i) {
+      (*output_records_)[i - offset_] = (*output_records_)[i];
+    }
+    output_records_->resize(output_records_->size() - offset_);
+  }
+  if (limit_ < output_records_->size()) {
+    output_records_->resize(limit_);
+  }
+  input_records_1_->clear();
+  input_records_2_->clear();
+}
+
+class RightMerger : public Merger {
+ public:
+  // -- Public API (grnxx/merger.hpp) --
+
+  RightMerger(const MergerOptions &options) : Merger(options) {}
+  ~RightMerger() = default;
+
+  void finish();
+};
+
+void RightMerger::finish() {
+  // Create a hash table from the first input.
+  std::unordered_map<int64_t, Float> filter;
+  for (size_t i = 0; i < input_records_1_->size(); ++i) {
+    filter[(*input_records_1_)[i].row_id.value()] =
+        (*input_records_1_)[i].score;
+  }
+
+  // Adjust score of the first input.
+  for (size_t i = 0; i < input_records_2_->size(); ++i) {
+    Record record;
+    record.row_id = (*input_records_2_)[i].row_id;
+    auto it = filter.find(record.row_id.value());
+    if (it != filter.end()) {
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score = it->second + (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          record.score = it->second - (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score = it->second * (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          record.score = it->second;
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          record.score = (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+    } else {
+      switch (score_operator_type_) {
+        case MERGER_SCORE_PLUS: {
+          record.score = missing_score_ + (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_MINUS: {
+          record.score = missing_score_ - (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_MULTIPLICATION: {
+          record.score = missing_score_ * (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_LEFT: {
+          record.score = missing_score_;
+          break;
+        }
+        case MERGER_SCORE_RIGHT: {
+          record.score = (*input_records_2_)[i].score;
+          break;
+        }
+        case MERGER_SCORE_ZERO: {
+          record.score = Float(0.0);
+          break;
+        }
+      }
+    }
+    output_records_->push_back(record);
+  }
+
+  // Remove out-of-range records.
+  if (offset_ > 0) {
+    for (size_t i = offset_; i < output_records_->size(); ++i) {
+      (*output_records_)[i - offset_] = (*output_records_)[i];
+    }
+    output_records_->resize(output_records_->size() - offset_);
+  }
+  if (limit_ < output_records_->size()) {
+    output_records_->resize(limit_);
+  }
+  input_records_1_->clear();
+  input_records_2_->clear();
+}
+
+}  // namespace merger
+
+using namespace merger;
+
+Merger::Merger(const MergerOptions &options)
+    : input_records_1_(nullptr),
+      input_records_2_(nullptr),
+      output_records_(nullptr),
+      logical_operator_type_(options.logical_operator_type),
+      score_operator_type_(options.score_operator_type),
+      missing_score_(options.missing_score),
+      offset_(options.offset),
+      limit_(options.limit) {}
+
+void Merger::reset(Array<Record> *input_records_1,
+                   Array<Record> *input_records_2,
+                   Array<Record> *output_records) {
+  input_records_1_ = input_records_1;
+  input_records_2_ = input_records_2;
+  output_records_ = output_records;
+}
+
+void Merger::progress() {
+  // TODO: Incremental merging is not supported yet.
+}
+
+void Merger::merge(Array<Record> *input_records_1,
+                   Array<Record> *input_records_2,
+                   Array<Record> *output_records) {
+  reset(input_records_1, input_records_2, output_records);
+  finish();
+}
+
+Merger *Merger::create(const MergerOptions &options) try {
+  switch (options.logical_operator_type) {
+    case MERGER_LOGICAL_AND: {
+      return new AndMerger(options);
+    }
+    case MERGER_LOGICAL_OR: {
+      return new OrMerger(options);
+    }
+    case MERGER_LOGICAL_XOR: {
+      return new XorMerger(options);
+    }
+    case MERGER_LOGICAL_MINUS: {
+      return new MinusMerger(options);
+    }
+    case MERGER_LOGICAL_LEFT: {
+      return new LeftMerger(options);
+    }
+    case MERGER_LOGICAL_RIGHT: {
+      return new RightMerger(options);
+    }
+    default: {
+      throw "Invalid operator type";  // TODO
+    }
+  }
+} catch (const std::bad_alloc &) {
+  throw "Memory allocation failed";  // TODO
+}
+
+}  // namespace impl
+}  // namespace grnxx

  Added: lib/grnxx/impl/merger.hpp (+49 -0) 100644
===================================================================
--- /dev/null
+++ lib/grnxx/impl/merger.hpp    2014-11-21 15:13:15 +0900 (933c56c)
@@ -0,0 +1,49 @@
+#ifndef GRNXX_IMPL_MERGER_HPP
+#define GRNXX_IMPL_MERGER_HPP
+
+#include "grnxx/merger.hpp"
+
+namespace grnxx {
+namespace impl {
+
+using MergerInterface = grnxx::Merger;
+
+class Merger : public MergerInterface {
+ public:
+  // -- Public API (grnxx/merger.hpp) --
+
+  explicit Merger(const MergerOptions &options);
+  virtual ~Merger() = default;
+
+  virtual void reset(Array<Record> *input_records_1,
+                     Array<Record> *input_records_2,
+                     Array<Record> *output_records);
+  virtual void progress();
+  virtual void finish() = 0;
+  virtual void merge(Array<Record> *input_records_1,
+                     Array<Record> *input_records_2,
+                     Array<Record> *output_records);
+
+  // -- Internal API --
+
+  // Create an object for merging record sets.
+  //
+  // On success, returns the merger.
+  // On failure, throws an exception.
+  static Merger *create(const MergerOptions &options);
+
+ protected:
+  Array<Record> *input_records_1_;
+  Array<Record> *input_records_2_;
+  Array<Record> *output_records_;
+  MergerLogicalOperatorType logical_operator_type_;
+  MergerScoreOperatorType score_operator_type_;
+  Float missing_score_;
+  size_t offset_;
+  size_t limit_;
+};
+
+}  // namespace impl
+}  // namespace grnxx
+
+#endif  // GRNXX_IMPL_MERGER_HPP

  Modified: lib/grnxx/merger.cpp (+3 -1078)
===================================================================
--- lib/grnxx/merger.cpp    2014-11-20 19:33:23 +0900 (47ca119)
+++ lib/grnxx/merger.cpp    2014-11-21 15:13:15 +0900 (ea3191f)
@@ -1,1086 +1,11 @@
 #include "grnxx/merger.hpp"
 
-#include <unordered_map>
+#include "grnxx/impl/merger.hpp"
 
 namespace grnxx {
 
-// -- AndMerger --
-
-class AndMerger : public Merger {
- public:
-  ~AndMerger() {}
-
-  static unique_ptr<Merger> create(Error *error, const MergerOptions &options);
-
-  bool reset(Error *error,
-             Array<Record> *input_records_1,
-             Array<Record> *input_records_2,
-             Array<Record> *output_records);
-
-  bool finish(Error *error);
-
- private:
-  Array<Record> *input_records_1_;
-  Array<Record> *input_records_2_;
-  Array<Record> *output_records_;
-  MergerOperatorType operator_type_;
-  Int offset_;
-  Int limit_;
-
-  AndMerger(MergerOperatorType operator_type, Int offset, Int limit)
-      : Merger(),
-        input_records_1_(nullptr),
-        input_records_2_(nullptr),
-        output_records_(nullptr),
-        operator_type_(operator_type),
-        offset_(offset),
-        limit_(limit) {}
-};
-
-unique_ptr<Merger> AndMerger::create(Error *error,
-                                     const MergerOptions &options) {
-  unique_ptr<Merger> merger(
-      new (nothrow) AndMerger(options.operator_type,
-                              options.offset,
-                              options.limit));
-  if (!merger) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  return merger;
-}
-
-bool AndMerger::reset(Error *,
-                      Array<Record> *input_records_1,
-                      Array<Record> *input_records_2,
-                      Array<Record> *output_records) {
-  input_records_1_ = input_records_1;
-  input_records_2_ = input_records_2;
-  output_records_ = output_records;
-  return true;
-}
-
-bool AndMerger::finish(Error *error) {
-  // Create a hash table from the smaller input.
-  Array<Record> *filter_records;
-  Array<Record> *stream_records;
-  if (input_records_1_->size() < input_records_2_->size()) {
-    filter_records = input_records_1_;
-    stream_records = input_records_2_;
-  } else {
-    filter_records = input_records_2_;
-    stream_records = input_records_1_;
-  }
-  std::unordered_map<Int, Float> filter;
-  for (Int i = 0; i < filter_records->size(); ++i) try {
-    filter[filter_records->get_row_id(i)] = filter_records->get_score(i);
-  } catch (...) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-
-  // Filter the stream (the larger input) with the hash table.
-  const MergerOperatorType operator_type = operator_type_;
-  const bool stream_is_1 = stream_records == input_records_1_;
-  for (Int i = 0; i < stream_records->size(); ++i) {
-    auto it = filter.find(stream_records->get_row_id(i));
-    if (it != filter.end()) {
-      Record record;
-      record.row_id = it->first;
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score = stream_records->get_score(i) + it->second;
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i) - it->second;
-          } else {
-            record.score = it->second - stream_records->get_score(i);
-          }
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score = stream_records->get_score(i) * it->second;
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i);
-          } else {
-            record.score = it->second;
-          }
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = it->second;
-          } else {
-            record.score = stream_records->get_score(i);
-          }
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-      if (!output_records_->push_back(error, record)) {
-        return false;
-      }
-    }
-  }
-
-  // Remove out-of-range records.
-  if (offset_ > 0) {
-    for (Int i = offset_; i < output_records_->size(); ++i) {
-      output_records_->set(i - offset_, output_records_->get(i));
-    }
-    output_records_->resize(nullptr, output_records_->size() - offset_);
-  }
-  if (limit_ < output_records_->size()) {
-    output_records_->resize(nullptr, limit_);
-  }
-  input_records_1_->clear();
-  input_records_2_->clear();
-  return true;
-}
-
-// -- OrMerger --
-
-class OrMerger : public Merger {
- public:
-  ~OrMerger() {}
-
-  static unique_ptr<Merger> create(Error *error, const MergerOptions &options);
-
-  bool reset(Error *error,
-             Array<Record> *input_records_1,
-             Array<Record> *input_records_2,
-             Array<Record> *output_records);
-
-  bool finish(Error *error);
-
- private:
-  Array<Record> *input_records_1_;
-  Array<Record> *input_records_2_;
-  Array<Record> *output_records_;
-  MergerOperatorType operator_type_;
-  Float null_score_;
-  Int offset_;
-  Int limit_;
-
-  OrMerger(MergerOperatorType operator_type,
-           Float null_score,
-           Int offset,
-           Int limit)
-      : Merger(),
-        input_records_1_(nullptr),
-        input_records_2_(nullptr),
-        output_records_(nullptr),
-        operator_type_(operator_type),
-        null_score_(null_score),
-        offset_(offset),
-        limit_(limit) {}
-};
-
-unique_ptr<Merger> OrMerger::create(Error *error,
-                                    const MergerOptions &options) {
-  unique_ptr<Merger> merger(
-      new (nothrow) OrMerger(options.operator_type,
-                             options.null_score,
-                             options.offset,
-                             options.limit));
-  if (!merger) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  return merger;
-}
-
-bool OrMerger::reset(Error *,
-                     Array<Record> *input_records_1,
-                     Array<Record> *input_records_2,
-                     Array<Record> *output_records) {
-  input_records_1_ = input_records_1;
-  input_records_2_ = input_records_2;
-  output_records_ = output_records;
-  return true;
-}
-
-bool OrMerger::finish(Error *error) {
-  // Create a hash table from the smaller input.
-  Array<Record> *filter_records;
-  Array<Record> *stream_records;
-  if (input_records_1_->size() < input_records_2_->size()) {
-    filter_records = input_records_1_;
-    stream_records = input_records_2_;
-  } else {
-    filter_records = input_records_2_;
-    stream_records = input_records_1_;
-  }
-  std::unordered_map<Int, Float> filter;
-  for (Int i = 0; i < filter_records->size(); ++i) try {
-    filter[filter_records->get_row_id(i)] = filter_records->get_score(i);
-  } catch (...) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-
-  // Filter the stream (the larger input) with the hash table.
-  const MergerOperatorType operator_type = operator_type_;
-  const bool stream_is_1 = stream_records == input_records_1_;
-  for (Int i = 0; i < stream_records->size(); ++i) {
-    Record record;
-    record.row_id = stream_records->get_row_id(i);
-    auto it = filter.find(stream_records->get_row_id(i));
-    if (it == filter.end()) {
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score = stream_records->get_score(i) + null_score_;
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i) - null_score_;
-          } else {
-            record.score = null_score_ - stream_records->get_score(i);
-          }
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score = stream_records->get_score(i) * null_score_;
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i);
-          } else {
-            record.score = null_score_;
-          }
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = null_score_;
-          } else {
-            record.score = stream_records->get_score(i);
-          }
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-    } else {
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score = it->second + stream_records->get_score(i);
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i) - it->second;
-          } else {
-            record.score = it->second - stream_records->get_score(i);
-          }
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score = it->second * stream_records->get_score(i);
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i);
-          } else {
-            record.score = it->second;
-          }
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          if (!stream_is_1) {
-            record.score = stream_records->get_score(i);
-          } else {
-            record.score = it->second;
-          }
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-      filter.erase(it);
-    }
-    if (!output_records_->push_back(error, record)) {
-      return false;
-    }
-  }
-
-  for (auto it : filter) {
-    switch (operator_type) {
-      case PLUS_MERGER_OPERATOR: {
-        it.second += null_score_;
-        break;
-      }
-      case MINUS_MERGER_OPERATOR: {
-        if (stream_is_1) {
-          it.second = null_score_ - it.second;
-        } else {
-          it.second -= null_score_;
-        }
-        break;
-      }
-      case MULTIPLICATION_MERGER_OPERATOR: {
-        it.second *= null_score_;
-        break;
-      }
-      case LHS_MERGER_OPERATOR: {
-        if (stream_is_1) {
-          it.second = null_score_;
-        }
-        break;
-      }
-      case RHS_MERGER_OPERATOR: {
-        if (!stream_is_1) {
-          it.second = null_score_;
-        }
-        break;
-      }
-      case ZERO_MERGER_OPERATOR: {
-        it.second = 0.0;
-        break;
-      }
-    }
-    if (!output_records_->push_back(error, Record(it.first, it.second))) {
-      return false;
-    }
-  }
-
-  // Remove out-of-range records.
-  if (offset_ > 0) {
-    for (Int i = offset_; i < output_records_->size(); ++i) {
-      output_records_->set(i - offset_, output_records_->get(i));
-    }
-    output_records_->resize(nullptr, output_records_->size() - offset_);
-  }
-  if (limit_ < output_records_->size()) {
-    output_records_->resize(nullptr, limit_);
-  }
-  input_records_1_->clear();
-  input_records_2_->clear();
-  return true;
-}
-
-// -- XorMerger --
-
-class XorMerger : public Merger {
- public:
-  ~XorMerger() {}
-
-  static unique_ptr<Merger> create(Error *error, const MergerOptions &options);
-
-  bool reset(Error *error,
-             Array<Record> *input_records_1,
-             Array<Record> *input_records_2,
-             Array<Record> *output_records);
-
-  bool finish(Error *error);
-
- private:
-  Array<Record> *input_records_1_;
-  Array<Record> *input_records_2_;
-  Array<Record> *output_records_;
-  MergerOperatorType operator_type_;
-  Float null_score_;
-  Int offset_;
-  Int limit_;
-
-  XorMerger(MergerOperatorType operator_type,
-            Float null_score,
-            Int offset,
-            Int limit)
-      : Merger(),
-        input_records_1_(nullptr),
-        input_records_2_(nullptr),
-        output_records_(nullptr),
-        operator_type_(operator_type),
-        null_score_(null_score),
-        offset_(offset),
-        limit_(limit) {}
-};
-
-unique_ptr<Merger> XorMerger::create(Error *error,
-                                     const MergerOptions &options) {
-  unique_ptr<Merger> merger(
-      new (nothrow) XorMerger(options.operator_type,
-                              options.null_score,
-                              options.offset,
-                              options.limit));
-  if (!merger) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  return merger;
-}
-
-bool XorMerger::reset(Error *,
-                      Array<Record> *input_records_1,
-                      Array<Record> *input_records_2,
-                      Array<Record> *output_records) {
-  input_records_1_ = input_records_1;
-  input_records_2_ = input_records_2;
-  output_records_ = output_records;
-  return true;
-}
-
-bool XorMerger::finish(Error *error) {
-  // Create a hash table from the smaller input.
-  Array<Record> *filter_records;
-  Array<Record> *stream_records;
-  if (input_records_1_->size() < input_records_2_->size()) {
-    filter_records = input_records_1_;
-    stream_records = input_records_2_;
-  } else {
-    filter_records = input_records_2_;
-    stream_records = input_records_1_;
-  }
-  std::unordered_map<Int, Float> filter;
-  for (Int i = 0; i < filter_records->size(); ++i) try {
-    filter[filter_records->get_row_id(i)] = filter_records->get_score(i);
-  } catch (...) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-
-  // Filter the stream (the larger input) with the hash table.
-  const MergerOperatorType operator_type = operator_type_;
-  const bool stream_is_1 = stream_records == input_records_1_;
-  for (Int i = 0; i < stream_records->size(); ++i) {
-    auto it = filter.find(stream_records->get_row_id(i));
-    if (it != filter.end()) {
-      filter.erase(it);
-    } else {
-      Record record;
-      record.row_id = stream_records->get_row_id(i);
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score = stream_records->get_score(i) + null_score_;
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i) - null_score_;
-          } else {
-            record.score = null_score_ - stream_records->get_score(i);
-          }
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score = stream_records->get_score(i) * null_score_;
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = stream_records->get_score(i);
-          } else {
-            record.score = null_score_;
-          }
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          if (stream_is_1) {
-            record.score = null_score_;
-          } else {
-            record.score = stream_records->get_score(i);
-          }
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-      if (!output_records_->push_back(error, record)) {
-        return false;
-      }
-    }
-  }
-
-  for (auto it : filter) {
-    switch (operator_type) {
-      case PLUS_MERGER_OPERATOR: {
-        it.second += null_score_;
-        break;
-      }
-      case MINUS_MERGER_OPERATOR: {
-        if (stream_is_1) {
-          it.second = null_score_ - it.second;
-        } else {
-          it.second -= null_score_;
-        }
-        break;
-      }
-      case MULTIPLICATION_MERGER_OPERATOR: {
-        it.second *= null_score_;
-        break;
-      }
-      case LHS_MERGER_OPERATOR: {
-        if (stream_is_1) {
-          it.second = null_score_;
-        }
-        break;
-      }
-      case RHS_MERGER_OPERATOR: {
-        if (!stream_is_1) {
-          it.second = null_score_;
-        }
-        break;
-      }
-      case ZERO_MERGER_OPERATOR: {
-        it.second = 0.0;
-        break;
-      }
-    }
-    if (!output_records_->push_back(error, Record(it.first, it.second))) {
-      return false;
-    }
-  }
-
-  // Remove out-of-range records.
-  if (offset_ > 0) {
-    for (Int i = offset_; i < output_records_->size(); ++i) {
-      output_records_->set(i - offset_, output_records_->get(i));
-    }
-    output_records_->resize(nullptr, output_records_->size() - offset_);
-  }
-  if (limit_ < output_records_->size()) {
-    output_records_->resize(nullptr, limit_);
-  }
-  input_records_1_->clear();
-  input_records_2_->clear();
-  return true;
-}
-
-// -- MinusMerger --
-
-class MinusMerger : public Merger {
- public:
-  ~MinusMerger() {}
-
-  static unique_ptr<Merger> create(Error *error, const MergerOptions &options);
-
-  bool reset(Error *error,
-             Array<Record> *input_records_1,
-             Array<Record> *input_records_2,
-             Array<Record> *output_records);
-
-  bool finish(Error *error);
-
- private:
-  Array<Record> *input_records_1_;
-  Array<Record> *input_records_2_;
-  Array<Record> *output_records_;
-  MergerOperatorType operator_type_;
-  Float null_score_;
-  Int offset_;
-  Int limit_;
-
-  MinusMerger(MergerOperatorType operator_type,
-              Float null_score,
-              Int offset,
-              Int limit)
-      : Merger(),
-        input_records_1_(nullptr),
-        input_records_2_(nullptr),
-        output_records_(nullptr),
-        operator_type_(operator_type),
-        null_score_(null_score),
-        offset_(offset),
-        limit_(limit) {}
-};
-
-unique_ptr<Merger> MinusMerger::create(Error *error,
-                                       const MergerOptions &options) {
-  unique_ptr<Merger> merger(
-      new (nothrow) MinusMerger(options.operator_type,
-                                options.null_score,
-                                options.offset,
-                                options.limit));
-  if (!merger) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  return merger;
-}
-
-bool MinusMerger::reset(Error *,
-                        Array<Record> *input_records_1,
-                        Array<Record> *input_records_2,
-                        Array<Record> *output_records) {
-  input_records_1_ = input_records_1;
-  input_records_2_ = input_records_2;
-  output_records_ = output_records;
-  return true;
-}
-
-bool MinusMerger::finish(Error *error) {
-  // Create a hash table from the smaller input.
-  Array<Record> *filter_records;
-  Array<Record> *stream_records;
-  if (input_records_1_->size() < input_records_2_->size()) {
-    filter_records = input_records_1_;
-    stream_records = input_records_2_;
-  } else {
-    filter_records = input_records_2_;
-    stream_records = input_records_1_;
-  }
-  std::unordered_map<Int, Float> filter;
-  for (Int i = 0; i < filter_records->size(); ++i) try {
-    filter[filter_records->get_row_id(i)] = filter_records->get_score(i);
-  } catch (...) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-
-  // Filter the stream (the larger input) with the hash table.
-  const MergerOperatorType operator_type = operator_type_;
-  const bool stream_is_1 = stream_records == input_records_1_;
-  if (stream_is_1) {
-    for (Int i = 0; i < stream_records->size(); ++i) {
-      auto it = filter.find(stream_records->get_row_id(i));
-      if (it != filter.end()) {
-        continue;
-      }
-      Record record = stream_records->get(i);
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score += null_score_;
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          record.score -= null_score_;
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score *= null_score_;
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          record.score = null_score_;
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-      if (!output_records_->push_back(error, record)) {
-        return false;
-      }
-    }
-  } else {
-    for (Int i = 0; i < stream_records->size(); ++i) {
-      auto it = filter.find(stream_records->get_row_id(i));
-      if (it != filter.end()) {
-        filter.erase(it);
-      }
-    }
-    for (auto it : filter) {
-      Record record;
-      record.row_id = it.first;
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score = it.second + null_score_;
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          record.score = it.second - null_score_;
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score = it.second * null_score_;
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          record.score = it.second;
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          record.score = null_score_;
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-      if (!output_records_->push_back(error, record)) {
-        return false;
-      }
-    }
-  }
-
-  // Remove out-of-range records.
-  if (offset_ > 0) {
-    for (Int i = offset_; i < output_records_->size(); ++i) {
-      output_records_->set(i - offset_, output_records_->get(i));
-    }
-    output_records_->resize(nullptr, output_records_->size() - offset_);
-  }
-  if (limit_ < output_records_->size()) {
-    output_records_->resize(nullptr, limit_);
-  }
-  input_records_1_->clear();
-  input_records_2_->clear();
-  return true;
-}
-
-// -- LhsMerger --
-
-class LhsMerger : public Merger {
- public:
-  ~LhsMerger() {}
-
-  static unique_ptr<Merger> create(Error *error, const MergerOptions &options);
-
-  bool reset(Error *error,
-             Array<Record> *input_records_1,
-             Array<Record> *input_records_2,
-             Array<Record> *output_records);
-
-  bool finish(Error *error);
-
- private:
-  Array<Record> *input_records_1_;
-  Array<Record> *input_records_2_;
-  Array<Record> *output_records_;
-  MergerOperatorType operator_type_;
-  Float null_score_;
-  Int offset_;
-  Int limit_;
-
-  LhsMerger(MergerOperatorType operator_type,
-            Float null_score,
-            Int offset,
-            Int limit)
-      : Merger(),
-        input_records_1_(nullptr),
-        input_records_2_(nullptr),
-        output_records_(nullptr),
-        operator_type_(operator_type),
-        null_score_(null_score),
-        offset_(offset),
-        limit_(limit) {}
-};
-
-unique_ptr<Merger> LhsMerger::create(Error *error,
-                                     const MergerOptions &options) {
-  unique_ptr<Merger> merger(
-      new (nothrow) LhsMerger(options.operator_type,
-                              options.null_score,
-                              options.offset,
-                              options.limit));
-  if (!merger) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  return merger;
-}
-
-bool LhsMerger::reset(Error *,
-                      Array<Record> *input_records_1,
-                      Array<Record> *input_records_2,
-                      Array<Record> *output_records) {
-  input_records_1_ = input_records_1;
-  input_records_2_ = input_records_2;
-  output_records_ = output_records;
-  return true;
-}
-
-bool LhsMerger::finish(Error *error) {
-  // Create a hash table from the second input.
-  std::unordered_map<Int, Float> filter;
-  for (Int i = 0; i < input_records_2_->size(); ++i) {
-    filter[input_records_2_->get_row_id(i)] = input_records_2_->get_score(i);
-  }
-
-  // Adjust score of the first input.
-  const MergerOperatorType operator_type = operator_type_;
-  for (Int i = 0; i < input_records_1_->size(); ++i) {
-    Record record = input_records_1_->get(i);
-    auto it = filter.find(record.row_id);
-    if (it != filter.end()) {
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score += it->second;
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          record.score -= it->second;
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score *= it->second;
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          record.score = it->second;
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-    } else {
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score += null_score_;
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          record.score -= null_score_;
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score *= null_score_;
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          record.score = null_score_;
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-    }
-    if (!output_records_->push_back(error, record)) {
-      return false;
-    }
-  }
-
-  // Remove out-of-range records.
-  if (offset_ > 0) {
-    for (Int i = offset_; i < output_records_->size(); ++i) {
-      output_records_->set(i - offset_, output_records_->get(i));
-    }
-    output_records_->resize(nullptr, output_records_->size() - offset_);
-  }
-  if (limit_ < output_records_->size()) {
-    output_records_->resize(nullptr, limit_);
-  }
-  input_records_1_->clear();
-  input_records_2_->clear();
-  return true;
-}
-
-// -- RhsMerger --
-
-
-class RhsMerger : public Merger {
- public:
-  ~RhsMerger() {}
-
-  static unique_ptr<Merger> create(Error *error, const MergerOptions &options);
-
-  bool reset(Error *error,
-             Array<Record> *input_records_1,
-             Array<Record> *input_records_2,
-             Array<Record> *output_records);
-
-  bool finish(Error *error);
-
- private:
-  Array<Record> *input_records_1_;
-  Array<Record> *input_records_2_;
-  Array<Record> *output_records_;
-  MergerOperatorType operator_type_;
-  Float null_score_;
-  Int offset_;
-  Int limit_;
-
-  RhsMerger(MergerOperatorType operator_type,
-            Float null_score,
-            Int offset,
-            Int limit)
-      : Merger(),
-        input_records_1_(nullptr),
-        input_records_2_(nullptr),
-        output_records_(nullptr),
-        operator_type_(operator_type),
-        null_score_(null_score),
-        offset_(offset),
-        limit_(limit) {}
-};
-
-unique_ptr<Merger> RhsMerger::create(Error *error,
-                                     const MergerOptions &options) {
-  unique_ptr<Merger> merger(
-      new (nothrow) RhsMerger(options.operator_type,
-                              options.null_score,
-                              options.offset,
-                              options.limit));
-  if (!merger) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  return merger;
-}
-
-bool RhsMerger::reset(Error *,
-                      Array<Record> *input_records_1,
-                      Array<Record> *input_records_2,
-                      Array<Record> *output_records) {
-  input_records_1_ = input_records_1;
-  input_records_2_ = input_records_2;
-  output_records_ = output_records;
-  return true;
-}
-
-bool RhsMerger::finish(Error *error) {
-  // Create a hash table from the first input.
-  std::unordered_map<Int, Float> filter;
-  for (Int i = 0; i < input_records_1_->size(); ++i) {
-    filter[input_records_1_->get_row_id(i)] = input_records_1_->get_score(i);
-  }
-
-  // Adjust score of the first input.
-  const MergerOperatorType operator_type = operator_type_;
-  for (Int i = 0; i < input_records_2_->size(); ++i) {
-    Record record;
-    record.row_id = input_records_2_->get_row_id(i);
-    auto it = filter.find(record.row_id);
-    if (it != filter.end()) {
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score = it->second + input_records_2_->get_score(i);
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          record.score = it->second - input_records_2_->get_score(i);
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score = it->second * input_records_2_->get_score(i);
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          record.score = it->second;
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          record.score = input_records_2_->get_score(i);
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-    } else {
-      switch (operator_type) {
-        case PLUS_MERGER_OPERATOR: {
-          record.score = null_score_ + input_records_2_->get_score(i);
-          break;
-        }
-        case MINUS_MERGER_OPERATOR: {
-          record.score = null_score_ - input_records_2_->get_score(i);
-          break;
-        }
-        case MULTIPLICATION_MERGER_OPERATOR: {
-          record.score = null_score_ * input_records_2_->get_score(i);
-          break;
-        }
-        case LHS_MERGER_OPERATOR: {
-          record.score = null_score_;
-          break;
-        }
-        case RHS_MERGER_OPERATOR: {
-          record.score = input_records_2_->get_score(i);
-          break;
-        }
-        case ZERO_MERGER_OPERATOR: {
-          record.score = 0.0;
-          break;
-        }
-      }
-    }
-    if (!output_records_->push_back(error, record)) {
-      return false;
-    }
-  }
-
-  // Remove out-of-range records.
-  if (offset_ > 0) {
-    for (Int i = offset_; i < output_records_->size(); ++i) {
-      output_records_->set(i - offset_, output_records_->get(i));
-    }
-    output_records_->resize(nullptr, output_records_->size() - offset_);
-  }
-  if (limit_ < output_records_->size()) {
-    output_records_->resize(nullptr, limit_);
-  }
-  input_records_1_->clear();
-  input_records_2_->clear();
-  return true;
-}
-
-// -- Merger --
-
-Merger::Merger() {}
-
-Merger::~Merger() {}
-
-unique_ptr<Merger> Merger::create(Error *error, const MergerOptions &options) {
-  switch (options.type) {
-    case AND_MERGER: {
-      return AndMerger::create(error, options);
-    }
-    case OR_MERGER: {
-      return OrMerger::create(error, options);
-    }
-    case XOR_MERGER: {
-      return XorMerger::create(error, options);
-    }
-    case MINUS_MERGER: {
-      return MinusMerger::create(error, options);
-    }
-    case LHS_MERGER: {
-      return LhsMerger::create(error, options);
-    }
-    case RHS_MERGER: {
-      return RhsMerger::create(error, options);
-    }
-  }
-}
-
-bool Merger::progress(Error *) {
-  // TODO: Incremental merging is not supported yet.
-  return true;
-}
-
-bool Merger::merge(Error *error,
-                   Array<Record> *input_records_1,
-                   Array<Record> *input_records_2,
-                   Array<Record> *output_records) {
-  if (!reset(error, input_records_1, input_records_2, output_records)) {
-    return false;
-  }
-  return finish(error);
+std::unique_ptr<Merger> Merger::create(const MergerOptions &options) {
+  return std::unique_ptr<Merger>(impl::Merger::create(options));
 }
 
 }  // namespace grnxx




More information about the Groonga-commit mailing list
Back to archive index