susumu.yata
null+****@clear*****
Wed Aug 21 14:00:41 JST 2013
susumu.yata 2013-08-21 14:00:41 +0900 (Wed, 21 Aug 2013) New Revision: 04ec27b9e1decfc26daded38e2370f4d9716423b https://github.com/groonga/grnxx/commit/04ec27b9e1decfc26daded38e2370f4d9716423b Message: Update grnxx::map::DoubleArray to use grnxx::map::Pool. Modified files: lib/grnxx/map/double_array.cpp lib/grnxx/map/double_array.hpp Modified: lib/grnxx/map/double_array.cpp (+149 -77) =================================================================== --- lib/grnxx/map/double_array.cpp 2013-08-21 13:59:44 +0900 (c333292) +++ lib/grnxx/map/double_array.cpp 2013-08-21 14:00:41 +0900 (2b48100) @@ -27,7 +27,7 @@ #include "grnxx/logger.hpp" #include "grnxx/map/common_header.hpp" #include "grnxx/map/helper.hpp" -#include "grnxx/map/key_pool.hpp" +#include "grnxx/map/pool.hpp" #include "grnxx/mutex.hpp" #include "grnxx/storage.hpp" @@ -333,7 +333,7 @@ class DoubleArrayImpl { using NodeArray = Array<Node, 65536, 8192>; // 42-bit using SiblingArray = Array<uint8_t, 262144, 4096>; // 42-bit using BlockArray = Array<Block, 8192, 1024>; // 33-bit - using Pool = KeyPool<Bytes>; + using Pool = Pool<Bytes>; static constexpr uint64_t NODE_ARRAY_SIZE = 1ULL << 42; static constexpr uint64_t SIBLING_ARRAY_SIZE = 1ULL << 42; @@ -347,19 +347,17 @@ class DoubleArrayImpl { DoubleArrayImpl(); ~DoubleArrayImpl(); - static DoubleArrayImpl *create(Storage *storage, uint32_t storage_node_id); + static DoubleArrayImpl *create(Storage *storage, uint32_t storage_node_id, + Pool *pool); static DoubleArrayImpl *create(Storage *storage, uint32_t storage_node_id, DoubleArrayImpl *src_impl); - static DoubleArrayImpl *open(Storage *storage, uint32_t storage_node_id); + static DoubleArrayImpl *open(Storage *storage, uint32_t storage_node_id, + Pool *pool); static void unlink(Storage *storage, uint32_t storage_node_id) { storage->unlink_node(storage_node_id); } - void set_pool(Pool *pool) { - pool_ = pool; - } - uint32_t storage_node_id() const { return storage_node_id_; } @@ -403,10 +401,10 @@ class DoubleArrayImpl { std::unique_ptr<BlockArray> blocks_; Pool *pool_; - void create_impl(Storage *storage, uint32_t storage_node_id); + void create_impl(Storage *storage, uint32_t storage_node_id, Pool *pool); void create_impl(Storage *storage, uint32_t storage_node_id, DoubleArrayImpl *src_impl); - void open_impl(Storage *storage, uint32_t storage_node_id); + void open_impl(Storage *storage, uint32_t storage_node_id, Pool *pool); void defrag(DoubleArrayImpl *src_impl, uint64_t src, uint64_t dest); @@ -444,13 +442,14 @@ DoubleArrayImpl::DoubleArrayImpl() DoubleArrayImpl::~DoubleArrayImpl() {} DoubleArrayImpl *DoubleArrayImpl::create(Storage *storage, - uint32_t storage_node_id) { + uint32_t storage_node_id, + Pool *pool) { std::unique_ptr<DoubleArrayImpl> impl(new (std::nothrow) DoubleArrayImpl); if (!impl) { GRNXX_ERROR() << "new grnxx::map::DoubleArrayImpl failed"; throw MemoryError(); } - impl->create_impl(storage, storage_node_id); + impl->create_impl(storage, storage_node_id, pool); return impl.release(); } @@ -467,13 +466,14 @@ DoubleArrayImpl *DoubleArrayImpl::create(Storage *storage, } DoubleArrayImpl *DoubleArrayImpl::open(Storage *storage, - uint32_t storage_node_id) { + uint32_t storage_node_id, + Pool *pool) { std::unique_ptr<DoubleArrayImpl> impl(new (std::nothrow) DoubleArrayImpl); if (!impl) { GRNXX_ERROR() << "new grnxx::map::DoubleArrayImpl failed"; throw MemoryError(); } - impl->open_impl(storage, storage_node_id); + impl->open_impl(storage, storage_node_id, pool); return impl.release(); } @@ -482,12 +482,19 @@ bool DoubleArrayImpl::get(int64_t key_id, Key *key) { // Out of range. return false; } - return pool_->get(key_id, key); + if (key) { + return pool_->get(key_id, key); + } + return pool_->get_bit(key_id); } bool DoubleArrayImpl::unset(int64_t key_id) { + if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) { + // Out of range. + return false; + } Key key; - if (!get(key_id, &key)) { + if (!pool_->get(key_id, &key)) { // Not found. return false; } @@ -495,8 +502,12 @@ bool DoubleArrayImpl::unset(int64_t key_id) { } bool DoubleArrayImpl::reset(int64_t key_id, KeyArg dest_key) { + if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) { + // Out of range. + return false; + } Key src_key; - if (!get(key_id, &src_key)) { + if (!pool_->get(key_id, &src_key)) { // Not found. return false; } @@ -586,7 +597,7 @@ bool DoubleArrayImpl::remove(KeyArg key) { } bool DoubleArrayImpl::replace(KeyArg src_key, KeyArg dest_key, - int64_t *key_id) { + int64_t *key_id) { int64_t src_key_id; if (!find(src_key, &src_key_id)) { // Not found. @@ -603,8 +614,7 @@ bool DoubleArrayImpl::replace(KeyArg src_key, KeyArg dest_key, } bool DoubleArrayImpl::find_longest_prefix_match(KeyArg query, - int64_t *key_id, - Key *key) { + int64_t *key_id, Key *key) { bool found = false; uint64_t node_id = ROOT_NODE_ID; Node node = nodes_->get(node_id); @@ -672,7 +682,8 @@ bool DoubleArrayImpl::find_longest_prefix_match(KeyArg query, return found; } -void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id) { +void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id, + Pool *pool) { storage_ = storage; StorageNode storage_node = storage->create_node(storage_node_id, sizeof(Header)); @@ -680,6 +691,7 @@ void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id) { try { header_ = static_cast<Header *>(storage_node.body()); *header_ = Header(); + pool_ = pool; nodes_.reset(NodeArray::create(storage, storage_node_id_, NODE_ARRAY_SIZE)); siblings_.reset(SiblingArray::create(storage, storage_node_id_, @@ -699,10 +711,9 @@ void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id) { void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id, DoubleArrayImpl *src_impl) { - create_impl(storage, storage_node_id); + create_impl(storage, storage_node_id, src_impl->pool_); try { // Build a double-array from "src_impl". - pool_ = src_impl->pool_; defrag(src_impl, ROOT_NODE_ID, ROOT_NODE_ID); } catch (...) { storage->unlink_node(storage_node_id_); @@ -710,7 +721,8 @@ void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id, } } -void DoubleArrayImpl::open_impl(Storage *storage, uint32_t storage_node_id) { +void DoubleArrayImpl::open_impl(Storage *storage, uint32_t storage_node_id, + Pool *pool) { storage_ = storage; storage_node_id_ = storage_node_id; StorageNode storage_node = storage->open_node(storage_node_id_); @@ -720,6 +732,7 @@ void DoubleArrayImpl::open_impl(Storage *storage, uint32_t storage_node_id) { throw LogicError(); } header_ = static_cast<Header *>(storage_node.body()); + pool_ = pool; nodes_.reset(NodeArray::open(storage, header_->nodes_storage_node_id)); siblings_.reset( SiblingArray::open(storage, header_->siblings_storage_node_id)); @@ -1174,9 +1187,10 @@ void DoubleArrayImpl::unset_block_level(uint64_t block_id, Block *block) { struct DoubleArrayHeader { CommonHeader common_header; + uint64_t pool_id; uint64_t impl_id; - uint32_t impl_storage_node_id; uint32_t pool_storage_node_id; + uint32_t impl_storage_node_id; Mutex mutex; // Initialize the member variables. @@ -1188,9 +1202,10 @@ struct DoubleArrayHeader { DoubleArrayHeader::DoubleArrayHeader() : common_header(FORMAT_STRING, MAP_DOUBLE_ARRAY), + pool_id(0), impl_id(0), - impl_storage_node_id(STORAGE_INVALID_NODE_ID), pool_storage_node_id(STORAGE_INVALID_NODE_ID), + impl_storage_node_id(STORAGE_INVALID_NODE_ID), mutex() {} DoubleArrayHeader::operator bool() const { @@ -1224,10 +1239,12 @@ DoubleArray<Bytes>::DoubleArray() : storage_(nullptr), storage_node_id_(STORAGE_INVALID_NODE_ID), header_(nullptr), - impl_(), - old_impl_(), pool_(), - impl_id_(0) {} + impl_(), + queue_(), + pool_id_(0), + impl_id_(0), + clock_() {} DoubleArray<Bytes>::~DoubleArray() {} @@ -1263,53 +1280,58 @@ MapType DoubleArray<Bytes>::type() const { } int64_t DoubleArray<Bytes>::max_key_id() { - return pool_->max_key_id(); + refresh_if_possible(); + return impl_->max_key_id(); } uint64_t DoubleArray<Bytes>::num_keys() { - return pool_->num_keys(); + refresh_if_possible(); + return impl_->num_keys(); } bool DoubleArray<Bytes>::get(int64_t key_id, Key *key) { - if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) { - // Out of range. - return false; - } - return pool_->get(key_id, key); + refresh_if_possible(); + return impl_->get(key_id, key); } bool DoubleArray<Bytes>::unset(int64_t key_id) { + refresh_if_possible(); return impl_->unset(key_id); } bool DoubleArray<Bytes>::reset(int64_t key_id, KeyArg dest_key) { + refresh_if_possible(); return impl_->reset(key_id, dest_key); } bool DoubleArray<Bytes>::find(KeyArg key, int64_t *key_id) { + refresh_if_possible(); return impl_->find(key, key_id); } bool DoubleArray<Bytes>::add(KeyArg key, int64_t *key_id) { + refresh_if_possible(); return impl_->add(key, key_id); } bool DoubleArray<Bytes>::remove(KeyArg key) { + refresh_if_possible(); return impl_->remove(key); } bool DoubleArray<Bytes>::replace(KeyArg src_key, KeyArg dest_key, int64_t *key_id) { + refresh_if_possible(); return impl_->replace(src_key, dest_key, key_id); } void DoubleArray<Bytes>::defrag() { - refresh_impl(); - if (max_key_id() == MAP_MIN_KEY_ID) { + refresh_if_possible(); + if (max_key_id() < MAP_MIN_KEY_ID) { // Nothing to do. return; } - // Create a new impl. + pool_->defrag(); std::unique_ptr<Impl> new_impl( Impl::create(storage_, storage_node_id_, impl_.get())); { @@ -1317,44 +1339,69 @@ void DoubleArray<Bytes>::defrag() { Lock lock(&header_->mutex); header_->impl_storage_node_id = new_impl->storage_node_id(); ++header_->impl_id; - old_impl_.swap(new_impl); - impl_.swap(old_impl_); + impl_.swap(new_impl); impl_id_ = header_->impl_id; } - Impl::unlink(storage_, old_impl_->storage_node_id()); - pool_->defrag(0.5); + Impl::unlink(storage_, new_impl->storage_node_id()); + try { + queue_.push(QueueEntry{ nullptr, std::move(new_impl), clock_.now() }); + } catch (const std::exception &exception) { + GRNXX_ERROR() << "std::queue::push() failed"; + throw StandardError(exception); + } +} + +void DoubleArray<Bytes>::sweep(Duration lifetime) { + const Time threshold = clock_.now() - lifetime; + while (!queue_.empty()) { + QueueEntry &queue_entry = queue_.front(); + if (queue_entry.time <= threshold) { + queue_.pop(); + } + } } void DoubleArray<Bytes>::truncate() { - refresh_impl(); - if (max_key_id() == MAP_MIN_KEY_ID) { + refresh_if_possible(); + if (max_key_id() < MAP_MIN_KEY_ID) { // Nothing to do. return; } - // Create an empty impl. - std::unique_ptr<Impl> new_impl(Impl::create(storage_, storage_node_id_)); + std::unique_ptr<Pool> new_pool(Pool::create(storage_, storage_node_id_)); + std::unique_ptr<Impl> new_impl; try { - pool_->truncate(); - new_impl->set_pool(pool_.get()); + new_impl.reset(Impl::create(storage_, storage_node_id_, new_pool.get())); } catch (...) { - Impl::unlink(storage_, new_impl->storage_node_id()); + Pool::unlink(storage_, new_pool->storage_node_id()); throw; } { - // Validate a new impl. + // Validate a new impl and a new pool. Lock lock(&header_->mutex); + header_->pool_storage_node_id = new_pool->storage_node_id(); header_->impl_storage_node_id = new_impl->storage_node_id(); + ++header_->pool_id; ++header_->impl_id; - old_impl_.swap(new_impl); - impl_.swap(old_impl_); + pool_.swap(new_pool); + impl_.swap(new_impl); + pool_id_ = header_->pool_id; impl_id_ = header_->impl_id; } - Impl::unlink(storage_, old_impl_->storage_node_id()); + Pool::unlink(storage_, new_pool->storage_node_id()); + Impl::unlink(storage_, new_impl->storage_node_id()); + try { + queue_.push(QueueEntry{ std::move(new_pool), std::move(new_impl), + clock_.now() }); + } catch (const std::exception &exception) { + GRNXX_ERROR() << "std::queue::push() failed"; + throw StandardError(exception); + } } bool DoubleArray<Bytes>::find_longest_prefix_match(KeyArg query, int64_t *key_id, Key *key) { + refresh_if_possible(); return impl_->find_longest_prefix_match(query, key_id, key); } @@ -1379,17 +1426,18 @@ bool DoubleArray<Bytes>::find_longest_prefix_match(KeyArg query, void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id, const MapOptions &) { storage_ = storage; - StorageNode storage_node = + StorageNode header_node = storage->create_node(storage_node_id, sizeof(Header)); - storage_node_id_ = storage_node.id(); + storage_node_id_ = header_node.id(); try { - header_ = static_cast<Header *>(storage_node.body()); + header_ = static_cast<Header *>(header_node.body()); *header_ = Header(); - impl_.reset(Impl::create(storage, storage_node_id_)); pool_.reset(Pool::create(storage, storage_node_id_)); - impl_->set_pool(pool_.get()); - header_->impl_storage_node_id = impl_->storage_node_id(); + impl_.reset(Impl::create(storage, storage_node_id_, pool_.get())); header_->pool_storage_node_id = pool_->storage_node_id(); + header_->impl_storage_node_id = impl_->storage_node_id(); + pool_id_ = ++header_->pool_id; + impl_id_ = ++header_->impl_id; } catch (...) { storage->unlink_node(storage_node_id_); throw; @@ -1399,35 +1447,59 @@ void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id, void DoubleArray<Bytes>::open_map(Storage *storage, uint32_t storage_node_id) { storage_ = storage; storage_node_id_ = storage_node_id; - StorageNode storage_node = storage->open_node(storage_node_id_); - if (storage_node.size() < sizeof(Header)) { - GRNXX_ERROR() << "invalid format: size = " << storage_node.size() + StorageNode header_node = storage->open_node(storage_node_id_); + if (header_node.size() < sizeof(Header)) { + GRNXX_ERROR() << "invalid format: size = " << header_node.size() << ", header_size = " << sizeof(Header); throw LogicError(); } - header_ = static_cast<Header *>(storage_node.body()); + header_ = static_cast<Header *>(header_node.body()); if (!*header_) { GRNXX_ERROR() << "wrong format: expected = " << FORMAT_STRING << ", actual = " << header_->common_header.format(); throw LogicError(); } +} + +void DoubleArray<Bytes>::refresh_if_possible() { + if (impl_id_ != header_->impl_id) { + refresh(); + } +} + +void DoubleArray<Bytes>::refresh() { Lock lock(&header_->mutex); - impl_.reset(Impl::open(storage, header_->impl_storage_node_id)); - pool_.reset(Pool::open(storage, header_->pool_storage_node_id)); - impl_->set_pool(pool_.get()); - impl_id_ = header_->impl_id; + if (pool_id_ != header_->pool_id) { + refresh_pool(); + } + if (impl_id_ != header_->impl_id) { + refresh_impl(); + } +} + +void DoubleArray<Bytes>::refresh_pool() { + std::unique_ptr<Pool> new_pool( + Pool::open(storage_, header_->pool_storage_node_id)); + pool_.swap(new_pool); + pool_id_ = header_->pool_id; + try { + queue_.push(QueueEntry{ std::move(new_pool), nullptr, clock_.now() }); + } catch (const std::exception &exception) { + GRNXX_ERROR() << "std::queue::push() failed"; + throw StandardError(exception); + } } void DoubleArray<Bytes>::refresh_impl() { - if (impl_id_ != header_->impl_id) { - Lock lock(&header_->mutex); - if (impl_id_ != header_->impl_id) { - std::unique_ptr<Impl> new_impl( - Impl::open(storage_, header_->impl_storage_node_id)); - old_impl_.swap(new_impl); - impl_.swap(old_impl_); - impl_id_ = header_->impl_id; - } + std::unique_ptr<Impl> new_impl( + Impl::open(storage_, header_->impl_storage_node_id, pool_.get())); + impl_.swap(new_impl); + impl_id_ = header_->impl_id; + try { + queue_.push(QueueEntry{ nullptr, std::move(new_impl), clock_.now() }); + } catch (const std::exception &exception) { + GRNXX_ERROR() << "std::queue::push() failed"; + throw StandardError(exception); } } Modified: lib/grnxx/map/double_array.hpp (+23 -6) =================================================================== --- lib/grnxx/map/double_array.hpp 2013-08-21 13:59:44 +0900 (e7569e8) +++ lib/grnxx/map/double_array.hpp 2013-08-21 14:00:41 +0900 (1bad27b) @@ -21,11 +21,15 @@ #include "grnxx/features.hpp" #include <memory> +#include <queue> #include "grnxx/bytes.hpp" +#include "grnxx/duration.hpp" #include "grnxx/map.hpp" #include "grnxx/map_cursor.hpp" #include "grnxx/map_cursor_query.hpp" +#include "grnxx/periodic_clock.hpp" +#include "grnxx/time.hpp" #include "grnxx/types.hpp" namespace grnxx { @@ -34,11 +38,12 @@ class Storage; namespace map { -template <typename T> class KeyPool; +template <typename T> class Pool; -struct DoubleArrayHeader; class DoubleArrayImpl; +struct DoubleArrayHeader; + template <typename T> class DoubleArray { public: @@ -51,10 +56,16 @@ template <> class DoubleArray<Bytes> : public Map<Bytes> { using Header = DoubleArrayHeader; using Impl = DoubleArrayImpl; - using Pool = KeyPool<Bytes>; + using Pool = Pool<Bytes>; + + struct QueueEntry { + std::unique_ptr<Pool> pool; + std::unique_ptr<Impl> impl; + Time time; + }; public: - using Key = typename Map<Bytes>::Key; + using Key = typename Map<Bytes>::Key; using KeyArg = typename Map<Bytes>::KeyArg; using Cursor = typename Map<Bytes>::Cursor; @@ -81,6 +92,7 @@ class DoubleArray<Bytes> : public Map<Bytes> { bool replace(KeyArg src_key, KeyArg dest_key, int64_t *key_id = nullptr); void defrag(); + void sweep(Duration lifetime); void truncate(); @@ -102,15 +114,20 @@ class DoubleArray<Bytes> : public Map<Bytes> { Storage *storage_; uint32_t storage_node_id_; Header *header_; - std::unique_ptr<Impl> impl_; - std::unique_ptr<Impl> old_impl_; std::unique_ptr<Pool> pool_; + std::unique_ptr<Impl> impl_; + std::queue<QueueEntry> queue_; + uint64_t pool_id_; uint64_t impl_id_; + PeriodicClock clock_; void create_map(Storage *storage, uint32_t storage_node_id, const MapOptions &options); void open_map(Storage *storage, uint32_t storage_node_id); + inline void refresh_if_possible(); + void refresh(); + void refresh_pool(); void refresh_impl(); }; -------------- next part -------------- HTML����������������������������...Download