susumu.yata
null+****@clear*****
Mon Sep 22 17:14:15 JST 2014
susumu.yata 2014-09-22 17:14:15 +0900 (Mon, 22 Sep 2014) New Revision: 03e0d5ca20b59daa182d7f2c72e825ba98223c33 https://github.com/groonga/grnxx/commit/03e0d5ca20b59daa182d7f2c72e825ba98223c33 Message: Update Pipeline to support Merger. (#68) Modified files: lib/grnxx/merger.cpp lib/grnxx/pipeline.cpp Modified: lib/grnxx/merger.cpp (+2 -0) =================================================================== --- lib/grnxx/merger.cpp 2014-09-22 16:44:39 +0900 (b28b318) +++ lib/grnxx/merger.cpp 2014-09-22 17:14:15 +0900 (60de4aa) @@ -152,6 +152,8 @@ bool AndMerger::finish(Error *error) { if (limit_ < output_records_->size()) { output_records_->resize(nullptr, limit_); } + input_records_1_->clear(); + input_records_2_->clear(); return true; } Modified: lib/grnxx/pipeline.cpp (+64 -0) =================================================================== --- lib/grnxx/pipeline.cpp 2014-09-22 16:44:39 +0900 (224d740) +++ lib/grnxx/pipeline.cpp 2014-09-22 17:14:15 +0900 (67ca13c) @@ -2,6 +2,7 @@ #include "grnxx/cursor.hpp" #include "grnxx/expression.hpp" +#include "grnxx/merger.hpp" namespace grnxx { namespace pipeline { @@ -181,6 +182,47 @@ Int SorterNode::read_next(Error *error, Array<Record> *records) { return records->size(); } +// --- MergerNode --- + +class MergerNode : public Node { + public: + explicit MergerNode(unique_ptr<Node> &&arg1, + unique_ptr<Node> &&arg2, + unique_ptr<Merger> &&merger) + : Node(), + arg1_(std::move(arg1)), + arg2_(std::move(arg2)), + merger_(std::move(merger)) {} + ~MergerNode() {} + + Int read_next(Error *error, Array<Record> *records); + + private: + unique_ptr<Node> arg1_; + unique_ptr<Node> arg2_; + unique_ptr<Merger> merger_; +}; + +Int MergerNode::read_next(Error *error, Array<Record> *records) { + Array<Record> arg1_records; + Int count = arg1_->read_all(error, &arg1_records); + if (count == -1) { + return -1; + } + Array<Record> arg2_records; + count = arg2_->read_all(error, &arg2_records); + if (count == -1) { + return -1; + } + if ((arg1_records.size() == 0) && (arg2_records.size() == 0)) { + return 0; + } + if (!merger_->merge(error, &arg1_records, &arg2_records, records)) { + return -1; + } + return records->size(); +} + } // namespace pipeline using namespace pipeline; @@ -293,6 +335,28 @@ bool PipelineBuilder::push_sorter(Error *error, unique_ptr<Sorter> &&sorter) { } bool PipelineBuilder::push_merger(Error *error, const MergerOptions &options) { + if (stack_.size() < 2) { + GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); + return false; + } + auto merger = grnxx::Merger::create(error, options); + if (!merger) { + return false; + } + unique_ptr<Node> arg2 = std::move(stack_[stack_.size() - 2]); + unique_ptr<Node> arg1 = std::move(stack_[stack_.size() - 1]); + stack_.resize(nullptr, stack_.size() - 2); + unique_ptr<Node> node(new (nothrow) MergerNode(std::move(arg1), + std::move(arg2), + std::move(merger))); + if (!node) { + GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); + return false; + } + stack_.push_back(error, std::move(node)); + return true; + + // TODO GRNXX_ERROR_SET(error, NOT_SUPPORTED_YET, "Not supported yet"); return false; -------------- next part -------------- HTML����������������������������...Download