susumu.yata
null+****@clear*****
Tue Dec 16 10:44:02 JST 2014
susumu.yata 2014-11-21 17:18:14 +0900 (Fri, 21 Nov 2014) New Revision: f9d1b3703e683d3e0af763dc6bef9e088a0492e5 https://github.com/groonga/grnxx/commit/f9d1b3703e683d3e0af763dc6bef9e088a0492e5 Message: Add a Pipeline stub. (#114) Added files: lib/grnxx/impl/pipeline.cpp lib/grnxx/impl/pipeline.hpp Copied files: lib/grnxx/pipeline-old.cpp (from lib/grnxx/pipeline.cpp) Modified files: include/grnxx/Makefile.am include/grnxx/pipeline.hpp lib/grnxx/Makefile.am lib/grnxx/impl/Makefile.am lib/grnxx/pipeline.cpp Modified: include/grnxx/Makefile.am (+1 -2) =================================================================== --- include/grnxx/Makefile.am 2014-11-21 15:13:39 +0900 (0444dc1) +++ include/grnxx/Makefile.am 2014-11-21 17:18:14 +0900 (c49404b) @@ -14,8 +14,7 @@ pkginclude_HEADERS = \ index.hpp \ library.hpp \ merger.hpp \ + pipeline.hpp \ sorter.hpp \ string.hpp \ table.hpp - -# pipeline.hpp Modified: include/grnxx/pipeline.hpp (+161 -70) =================================================================== --- include/grnxx/pipeline.hpp 2014-11-21 15:13:39 +0900 (7000e56) +++ include/grnxx/pipeline.hpp 2014-11-21 17:18:14 +0900 (a655a76) @@ -1,120 +1,211 @@ #ifndef GRNXX_PIPELINE_HPP #define GRNXX_PIPELINE_HPP +#include <limits> +#include <memory> + +#include "grnxx/cursor.hpp" +#include "grnxx/expression.hpp" +#include "grnxx/merger.hpp" #include "grnxx/sorter.hpp" -#include "grnxx/types.hpp" +#include "grnxx/table.hpp" namespace grnxx { -namespace pipeline { - -class Node; -} // namespace pipeline - -using PipelineNode = pipeline::Node; +struct PipelineOptions { +}; class Pipeline { public: - ~Pipeline(); + Pipeline() = default; + virtual ~Pipeline() = default; - const Table *table() const { - return table_; - } + virtual const Table *table() const = 0; // Read all the records through the pipeline. // // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - bool flush(Error *error, Array<Record> *records); - - private: - const Table *table_; - unique_ptr<PipelineNode> root_; - - static unique_ptr<Pipeline> create(Error *error, - const Table *table, - unique_ptr<PipelineNode> &&root, - const PipelineOptions &options); - - Pipeline(const Table *table, - unique_ptr<PipelineNode> &&root); - - friend class PipelineBuilder; + // On failure, throws an exception. + virtual void flush(Array<Record> *records) = 0; }; class PipelineBuilder { public: // Create an object for building a pipeline. // - // On success, returns a poitner to the builder. - // On failure, returns nullptr and stores error information into "*error" if - // "error" != nullptr. - static unique_ptr<PipelineBuilder> create(Error *error, const Table *table); + // On success, returns the builder. + // On failure, throws an exception. + static std::unique_ptr<PipelineBuilder> create(const Table *table); - ~PipelineBuilder(); + PipelineBuilder() = default; + virtual ~PipelineBuilder() = default; // Return the associated table. - const Table *table() const { - return table_; - } + virtual const Table *table() const = 0; // Push a cursor. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - bool push_cursor(Error *error, unique_ptr<Cursor> &&cursor); + // On failure, throws an exception. + virtual void push_cursor(std::unique_ptr<Cursor> &&cursor) = 0; // Push a filter. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - bool push_filter(Error *error, - unique_ptr<Expression> &&expression, - Int offset = 0, - Int limit = numeric_limits<Int>::max()); + // On failure, throws an exception. + virtual void push_filter( + std::unique_ptr<Expression> &&expression, + size_t offset = 0, + size_t limit = std::numeric_limits<size_t>::max()) = 0; // Push an adjuster. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - bool push_adjuster(Error *error, unique_ptr<Expression> &&expression); + // On failure, throws an exception. + virtual void push_adjuster(std::unique_ptr<Expression> &&expression) = 0; // Push a sorter. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - bool push_sorter(Error *error, unique_ptr<Sorter> &&sorter); + // On failure, throws an exception. + virtual void push_sorter(std::unique_ptr<Sorter> &&sorter) = 0; // Push a merger. - bool push_merger(Error *error, - const MergerOptions &options = MergerOptions()); + // + // On failure, throws an exception. + virtual void push_merger(const MergerOptions &options = MergerOptions()) = 0; // Clear the internal stack. - void clear(); + virtual void clear() = 0; // Complete building a pipeline and clear the internal stack. // // Fails if the stack is empty or contains more than one nodes. // - // On success, returns a pointer to the expression. - // On failure, returns nullptr and stores error information into "*error" if - // "error" != nullptr. - unique_ptr<Pipeline> release( - Error *error, - const PipelineOptions &options = PipelineOptions()); - - private: - const Table *table_; - Array<unique_ptr<PipelineNode>> stack_; - - PipelineBuilder(); + // On success, returns the expression. + // On failure, throws an exception. + virtual std::unique_ptr<Pipeline> release( + const PipelineOptions &options = PipelineOptions()) = 0; }; } // namespace grnxx #endif // GRNXX_PIPELINE_HPP + + +//#ifndef GRNXX_PIPELINE_HPP +//#define GRNXX_PIPELINE_HPP + +//#include "grnxx/sorter.hpp" +//#include "grnxx/types.hpp" + +//namespace grnxx { +//namespace pipeline { + +//class Node; + +//} // namespace pipeline + +//using PipelineNode = pipeline::Node; + +//class Pipeline { +// public: +// ~Pipeline(); + +// const Table *table() const { +// return table_; +// } + +// // Read all the records through the pipeline. +// // +// // On success, returns true. +// // On failure, returns false and stores error information into "*error" if +// // "error" != nullptr. +// bool flush(Error *error, Array<Record> *records); + +// private: +// const Table *table_; +// unique_ptr<PipelineNode> root_; + +// static unique_ptr<Pipeline> create(Error *error, +// const Table *table, +// unique_ptr<PipelineNode> &&root, +// const PipelineOptions &options); + +// Pipeline(const Table *table, +// unique_ptr<PipelineNode> &&root); + +// friend class PipelineBuilder; +//}; + +//class PipelineBuilder { +// public: +// // Create an object for building a pipeline. +// // +// // On success, returns a poitner to the builder. +// // On failure, returns nullptr and stores error information into "*error" if +// // "error" != nullptr. +// static unique_ptr<PipelineBuilder> create(Error *error, const Table *table); + +// ~PipelineBuilder(); + +// // Return the associated table. +// const Table *table() const { +// return table_; +// } + +// // Push a cursor. +// // +// // On success, returns true. +// // On failure, returns false and stores error information into "*error" if +// // "error" != nullptr. +// bool push_cursor(Error *error, unique_ptr<Cursor> &&cursor); + +// // Push a filter. +// // +// // On success, returns true. +// // On failure, returns false and stores error information into "*error" if +// // "error" != nullptr. +// bool push_filter(Error *error, +// unique_ptr<Expression> &&expression, +// Int offset = 0, +// Int limit = numeric_limits<Int>::max()); + +// // Push an adjuster. +// // +// // On success, returns true. +// // On failure, returns false and stores error information into "*error" if +// // "error" != nullptr. +// bool push_adjuster(Error *error, unique_ptr<Expression> &&expression); + +// // Push a sorter. +// // +// // On success, returns true. +// // On failure, returns false and stores error information into "*error" if +// // "error" != nullptr. +// bool push_sorter(Error *error, unique_ptr<Sorter> &&sorter); + +// // Push a merger. +// bool push_merger(Error *error, +// const MergerOptions &options = MergerOptions()); + +// // Clear the internal stack. +// void clear(); + +// // Complete building a pipeline and clear the internal stack. +// // +// // Fails if the stack is empty or contains more than one nodes. +// // +// // On success, returns a pointer to the expression. +// // On failure, returns nullptr and stores error information into "*error" if +// // "error" != nullptr. +// unique_ptr<Pipeline> release( +// Error *error, +// const PipelineOptions &options = PipelineOptions()); + +// private: +// const Table *table_; +// Array<unique_ptr<PipelineNode>> stack_; + +// PipelineBuilder(); +//}; + +//} // namespace grnxx + +//#endif // GRNXX_PIPELINE_HPP Modified: lib/grnxx/Makefile.am (+2 -2) =================================================================== --- lib/grnxx/Makefile.am 2014-11-21 15:13:39 +0900 (a2be799) +++ lib/grnxx/Makefile.am 2014-11-21 17:18:14 +0900 (fd1c3f8) @@ -14,11 +14,11 @@ libgrnxx_la_SOURCES = \ expression.cpp \ library.cpp \ merger.cpp \ + pipeline.cpp \ sorter.cpp \ string.cpp -# index.cpp \ -# pipeline.cpp +# index.cpp libgrnxx_includedir = ${includedir}/grnxx libgrnxx_include_HEADERS = \ Modified: lib/grnxx/impl/Makefile.am (+2 -0) =================================================================== --- lib/grnxx/impl/Makefile.am 2014-11-21 15:13:39 +0900 (6f34085) +++ lib/grnxx/impl/Makefile.am 2014-11-21 17:18:14 +0900 (6cf8af6) @@ -12,6 +12,7 @@ libgrnxx_impl_la_SOURCES = \ db.cpp \ expression.cpp \ merger.cpp \ + pipeline.cpp \ sorter.cpp \ table.cpp @@ -23,5 +24,6 @@ libgrnxx_impl_include_HEADERS = \ expression.hpp \ index.hpp \ merger.hpp \ + pipeline.hpp \ sorter.hpp \ table.hpp Added: lib/grnxx/impl/pipeline.cpp (+7 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/impl/pipeline.cpp 2014-11-21 17:18:14 +0900 (9b0b022) @@ -0,0 +1,7 @@ +#include "grnxx/impl/pipeline.hpp" + +namespace grnxx { +namespace impl { + +} // namespace impl +} // namespace grnxx Added: lib/grnxx/impl/pipeline.hpp (+59 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/impl/pipeline.hpp 2014-11-21 17:18:14 +0900 (2c7eee1) @@ -0,0 +1,59 @@ +#ifndef GRNXX_IMPL_PIPELINE_HPP +#define GRNXX_IMPL_PIPELINE_HPP + +#include "grnxx/impl/table.hpp" +#include "grnxx/pipeline.hpp" + +namespace grnxx { +namespace impl { + +using PipelineInterface = grnxx::Pipeline; +using PipelineBuilderInterface = grnxx::PipelineBuilder; + +class Pipeline : public PipelineInterface { + public: + // -- Public API (grnxx/expression.hpp) -- + Pipeline(); + ~Pipeline() = default; + + const Table *table() const { + return table_; + } + + void flush(Array<Record> *records); + + private: + const Table *table_; +}; + +class PipelineBuilder : public PipelineBuilderInterface { + public: + // -- Public API (grnxx/expression.hpp) -- + + PipelineBuilder(); + ~PipelineBuilder() = default; + + const Table *table() const { + return table_; + } + + void push_cursor(std::unique_ptr<Cursor> &&cursor); + void push_filter(std::unique_ptr<Expression> &&expression, + size_t offset, + size_t limit); + void push_adjuster(std::unique_ptr<Expression> &&expression); + void push_sorter(std::unique_ptr<Sorter> &&sorter); + void push_merger(const MergerOptions &options); + + void clear(); + + std::unique_ptr<PipelineInterface> release(const PipelineOptions &options); + + private: + const Table *table_; +}; + +} // namespace impl +} // namespace grnxx + +#endif // GRNXX_IMPL_PIPELINE_HPP Copied: lib/grnxx/pipeline-old.cpp (+0 -0) 100% =================================================================== Modified: lib/grnxx/pipeline.cpp (+10 -382) =================================================================== --- lib/grnxx/pipeline.cpp 2014-11-21 15:13:39 +0900 (dc740ae) +++ lib/grnxx/pipeline.cpp 2014-11-21 17:18:14 +0900 (fd5a58d) @@ -1,390 +1,18 @@ #include "grnxx/pipeline.hpp" -#include "grnxx/cursor.hpp" -#include "grnxx/expression.hpp" -#include "grnxx/merger.hpp" +#include <new> -namespace grnxx { -namespace pipeline { - -// -- Node -- - -class Node { - public: - Node() {} - virtual ~Node() {} - - virtual Int read_next(Error *error, Array<Record> *records) = 0; - virtual Int read_all(Error *error, Array<Record> *records); -}; - -Int Node::read_all(Error *error, Array<Record> *records) { - Int total_count = 0; - for ( ; ; ) { - Int count = read_next(error, records); - if (count == -1) { - return -1; - } else if (count == 0) { - break; - } - total_count += count; - } - return total_count; -} - -// --- CursorNode --- - -class CursorNode : public Node { - public: - explicit CursorNode(unique_ptr<Cursor> &&cursor) - : Node(), - cursor_(std::move(cursor)) {} - ~CursorNode() {} - - Int read_next(Error *error, Array<Record> *records); - Int read_all(Error *error, Array<Record> *records); - - private: - unique_ptr<Cursor> cursor_; -}; - -Int CursorNode::read_next(Error *error, Array<Record> *records) { - // TODO: The following block size (1024) should be optimized. - auto result = cursor_->read(error, 1024, records); - if (!result.is_ok) { - return -1; - } - return result.count; -} - -Int CursorNode::read_all(Error *error, Array<Record> *records) { - auto result = cursor_->read_all(error, records); - if (!result.is_ok) { - return -1; - } - return result.count; -} - -// --- FilterNode --- - -class FilterNode : public Node { - public: - FilterNode(unique_ptr<Node> &&arg, - unique_ptr<Expression> &&expression, - Int offset, - Int limit) - : Node(), - arg_(std::move(arg)), - expression_(std::move(expression)), - offset_(offset), - limit_(limit) {} - ~FilterNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg_; - unique_ptr<Expression> expression_; - Int offset_; - Int limit_; -}; - -Int FilterNode::read_next(Error *error, Array<Record> *records) { - // TODO: The following threshold (1024) should be optimized. - Int offset = records->size(); - while (limit_ > 0) { - Int count = arg_->read_next(error, records); - if (count == -1) { - return -1; - } else if (count == 0) { - break; - } - ArrayRef<Record> ref = records->ref(records->size() - count, count); - if (!expression_->filter(error, ref, &ref)) { - return -1; - } - if (offset_ > 0) { - if (offset_ >= ref.size()) { - offset_ -= ref.size(); - ref = ref.ref(0, 0); - } else { - for (Int i = offset_; i < ref.size(); ++i) { - ref.set(i - offset_, ref[i]); - } - ref = ref.ref(0, ref.size() - offset_); - offset_ = 0; - } - } - if (ref.size() > limit_) { - ref = ref.ref(0, limit_); - } - limit_ -= ref.size(); - if (!records->resize(error, records->size() - count + ref.size())) { - return -1; - } - if ((records->size() - offset) >= 1024) { - break; - } - } - return records->size() - offset; -} - -// --- AdjusterNode --- - -class AdjusterNode : public Node { - public: - explicit AdjusterNode(unique_ptr<Node> &&arg, - unique_ptr<Expression> &&expression) - : Node(), - arg_(std::move(arg)), - expression_(std::move(expression)) {} - ~AdjusterNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg_; - unique_ptr<Expression> expression_; -}; - -Int AdjusterNode::read_next(Error *error, Array<Record> *records) { - Int offset = records->size(); - Int count = arg_->read_next(error, records); - if (count == -1) { - return -1; - } - if (!expression_->adjust(error, records, offset)) { - return -1; - } - return count; -} - -// --- SorterNode --- - -class SorterNode : public Node { - public: - explicit SorterNode(unique_ptr<Node> &&arg, - unique_ptr<Sorter> &&sorter) - : Node(), - arg_(std::move(arg)), - sorter_(std::move(sorter)) {} - ~SorterNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg_; - unique_ptr<Sorter> sorter_; -}; - -Int SorterNode::read_next(Error *error, Array<Record> *records) { - Int count = arg_->read_all(error, records); - if (count == -1) { - return -1; - } else if (count == 0) { - return 0; - } - if (!sorter_->sort(error, records)) { - return -1; - } - return records->size(); -} +#include "grnxx/impl/pipeline.hpp" -// --- MergerNode --- - -class MergerNode : public Node { - public: - explicit MergerNode(unique_ptr<Node> &&arg1, - unique_ptr<Node> &&arg2, - unique_ptr<Merger> &&merger) - : Node(), - arg1_(std::move(arg1)), - arg2_(std::move(arg2)), - merger_(std::move(merger)) {} - ~MergerNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg1_; - unique_ptr<Node> arg2_; - unique_ptr<Merger> merger_; -}; - -Int MergerNode::read_next(Error *error, Array<Record> *records) { - Array<Record> arg1_records; - Int count = arg1_->read_all(error, &arg1_records); - if (count == -1) { - return -1; - } - Array<Record> arg2_records; - count = arg2_->read_all(error, &arg2_records); - if (count == -1) { - return -1; - } - if ((arg1_records.size() == 0) && (arg2_records.size() == 0)) { - return 0; - } - if (!merger_->merge(error, &arg1_records, &arg2_records, records)) { - return -1; - } - return records->size(); -} - -} // namespace pipeline - -using namespace pipeline; - -Pipeline::~Pipeline() {} - -bool Pipeline::flush(Error *error, Array<Record> *records) { - return root_->read_all(error, records) >= 0; -} - -unique_ptr<Pipeline> Pipeline::create(Error *error, - const Table *table, - unique_ptr<PipelineNode> &&root, - const PipelineOptions &) { - unique_ptr<Pipeline> pipeline( - new (nothrow) Pipeline(table, std::move(root))); - if (!pipeline) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return pipeline; -} - -Pipeline::Pipeline(const Table *table, - unique_ptr<PipelineNode> &&root) - : table_(table), - root_(std::move(root)) {} - -unique_ptr<PipelineBuilder> PipelineBuilder::create(Error *error, - const Table *table) { - unique_ptr<PipelineBuilder> builder(new (nothrow) PipelineBuilder); - if (!builder) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - builder->table_ = table; - return builder; -} - -PipelineBuilder::~PipelineBuilder() {} - -bool PipelineBuilder::push_cursor(Error *error, unique_ptr<Cursor> &&cursor) { - // Reserve a space for a new node. - if (!stack_.reserve(error, stack_.size() + 1)) { - return false; - } - unique_ptr<Node> node(new (nothrow) CursorNode(std::move(cursor))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - // This push_back() must not fail because a space is already reserved. - stack_.push_back(nullptr, std::move(node)); - return true; -} - -bool PipelineBuilder::push_filter(Error *error, - unique_ptr<Expression> &&expression, - Int offset, Int limit) { - if (stack_.size() < 1) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 1); - unique_ptr<Node> node( - new (nothrow) FilterNode(std::move(arg), std::move(expression), - offset, limit)); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; -} - -bool PipelineBuilder::push_adjuster(Error *error, - unique_ptr<Expression> &&expression) { - if (stack_.size() < 1) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 1); - unique_ptr<Node> node( - new (nothrow) AdjusterNode(std::move(arg), std::move(expression))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; -} - -bool PipelineBuilder::push_sorter(Error *error, unique_ptr<Sorter> &&sorter) { - if (stack_.size() < 1) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 1); - unique_ptr<Node> node( - new (nothrow) SorterNode(std::move(arg), std::move(sorter))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; -} - -bool PipelineBuilder::push_merger(Error *error, const MergerOptions &options) { - if (stack_.size() < 2) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - auto merger = Merger::create(error, options); - if (!merger) { - return false; - } - unique_ptr<Node> arg2 = std::move(stack_[stack_.size() - 2]); - unique_ptr<Node> arg1 = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 2); - unique_ptr<Node> node(new (nothrow) MergerNode(std::move(arg1), - std::move(arg2), - std::move(merger))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; - - - // TODO - GRNXX_ERROR_SET(error, NOT_SUPPORTED_YET, "Not supported yet"); - return false; -} - -void PipelineBuilder::clear() { - stack_.clear(); -} +namespace grnxx { -unique_ptr<Pipeline> PipelineBuilder::release(Error *error, - const PipelineOptions &options) { - if (stack_.size() != 1) { - GRNXX_ERROR_SET(error, INVALID_ARGUMENT, "Incomplete pipeline"); - return nullptr; - } - unique_ptr<PipelineNode> root = std::move(stack_[0]); - stack_.clear(); - return Pipeline::create(error, table_, std::move(root), options); +std::unique_ptr<PipelineBuilder> PipelineBuilder::create( + const Table *table) try { + throw "Not supported yet"; // TODO +// return std::unique_ptr<PipelineBuilder>( +// new impl::PipelineBuilder(static_cast<const impl::Table *>(table))); +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO } -PipelineBuilder::PipelineBuilder() : table_(nullptr), stack_() {} - } // namespace grnxx -------------- next part -------------- HTML����������������������������...Download