susumu.yata
null+****@clear*****
Thu Jul 18 17:17:25 JST 2013
susumu.yata 2013-07-18 17:17:25 +0900 (Thu, 18 Jul 2013) New Revision: b83b1f40b60cb5833eebc949535a815263594376 https://github.com/groonga/grnxx/commit/b83b1f40b60cb5833eebc949535a815263594376 Message: Add grnxx::map::BytesPool as a new version of grnxx::map::BytesStore. Added files: lib/grnxx/map/bytes_pool.cpp lib/grnxx/map/bytes_pool.hpp Modified files: lib/grnxx/map/Makefile.am Modified: lib/grnxx/map/Makefile.am (+2 -0) =================================================================== --- lib/grnxx/map/Makefile.am 2013-07-17 16:43:28 +0900 (220adbc) +++ lib/grnxx/map/Makefile.am 2013-07-18 17:17:25 +0900 (fa885f9) @@ -17,6 +17,7 @@ libgrnxx_map_la_LDFLAGS = @AM_LTLDFLAGS@ libgrnxx_map_la_SOURCES = \ array_map.cpp \ bytes_array.cpp \ + bytes_pool.cpp \ bytes_store.cpp \ cursor_impl.cpp \ double_array.cpp \ @@ -29,6 +30,7 @@ libgrnxx_map_includedir = ${includedir}/grnxx/map libgrnxx_map_include_HEADERS = \ array_map.hpp \ bytes_array.hpp \ + bytes_pool.hpp \ bytes_store.hpp \ cursor_impl.hpp \ double_array.hpp \ Added: lib/grnxx/map/bytes_pool.cpp (+342 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/map/bytes_pool.cpp 2013-07-18 17:17:25 +0900 (151f9e9) @@ -0,0 +1,342 @@ +/* + Copyright (C) 2013 Brazil, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ +#include "grnxx/map/bytes_pool.hpp" + +#include <cstring> +#include <new> + +#include "grnxx/exception.hpp" +#include "grnxx/logger.hpp" +#include "grnxx/storage.hpp" +#include "grnxx/string_builder.hpp" + +namespace grnxx { +namespace map { +namespace { + +constexpr uint64_t POOL_SIZE = 1ULL << 48; + +constexpr uint32_t MAX_PAGE_ID = (POOL_SIZE / BytesPool::page_size()) - 1; +constexpr uint32_t INVALID_PAGE_ID = MAX_PAGE_ID + 1; + +} // namespace + +struct BytesPoolHeader { + uint64_t next_offset; + uint32_t max_page_id; + uint32_t latest_empty_page_id; + uint32_t latest_idle_page_id; + uint32_t pool_storage_node_id; + uint32_t page_headers_storage_node_id; + uint32_t reserved; + + BytesPoolHeader(); +}; + +BytesPoolHeader::BytesPoolHeader() + : next_offset(0), + max_page_id(0), + latest_empty_page_id(INVALID_PAGE_ID), + latest_idle_page_id(INVALID_PAGE_ID), + pool_storage_node_id(STORAGE_INVALID_NODE_ID), + page_headers_storage_node_id(STORAGE_INVALID_NODE_ID), + reserved(0) {} + +StringBuilder &operator<<(StringBuilder &builder, BytesPoolPageStatus status) { + switch (status) { + case BYTES_POOL_PAGE_ACTIVE: { + return builder << "BYTES_POOL_PAGE_ACTIVE"; + } + case BYTES_POOL_PAGE_IN_USE: { + return builder << "BYTES_POOL_PAGE_IN_USE"; + } + case BYTES_POOL_PAGE_EMPTY: { + return builder << "BYTES_POOL_PAGE_EMPTY"; + } + case BYTES_POOL_PAGE_IDLE: { + return builder << "BYTES_POOL_PAGE_IDLE"; + } + default: { + return builder << "n/a"; + } + } +} + +BytesPoolPageHeader::BytesPoolPageHeader() + : status(BYTES_POOL_PAGE_ACTIVE), + size_in_use(0), + modified_time(0) {} + +BytesPool::BytesPool() + : storage_(nullptr), + storage_node_id_(STORAGE_INVALID_NODE_ID), + header_(nullptr), + pool_(), + page_headers_() {} + +BytesPool *BytesPool::create(Storage *storage, uint32_t storage_node_id) { + if (!storage) { + GRNXX_ERROR() << "invalid argument: storage == nullptr"; + throw LogicError(); + } + std::unique_ptr<BytesPool> pool(new (std::nothrow) BytesPool); + if (!pool) { + GRNXX_ERROR() << "new grnxx::map::BytesPool failed"; + throw MemoryError(); + } + pool->create_pool(storage, storage_node_id); + return pool.release(); +} + +BytesPool *BytesPool::open(Storage *storage, uint32_t storage_node_id) { + if (!storage) { + GRNXX_ERROR() << "invalid argument: storage == nullptr"; + throw LogicError(); + } + std::unique_ptr<BytesPool> pool(new (std::nothrow) BytesPool); + if (!pool) { + GRNXX_ERROR() << "new grnxx::map::BytesPool failed"; + throw MemoryError(); + } + pool->open_pool(storage, storage_node_id); + return pool.release(); +} + +void BytesPool::unlink(Storage *storage, uint32_t storage_node_id) { + std::unique_ptr<BytesPool> pool(BytesPool::open(storage, storage_node_id)); + storage->unlink_node(storage_node_id); +} + +void BytesPool::unset(uint64_t value_id) { + const uint64_t offset = get_offset(value_id); + const uint32_t size = get_size(value_id); + const uint32_t page_id = get_page_id(offset); + if ((size > MAX_VALUE_SIZE) || (page_id > header_->max_page_id)) { + GRNXX_ERROR() << "invalid argument: offset = " << offset + << ", size = " << size + << ", page_id = " << page_id + << ", max_size = " << MAX_VALUE_SIZE + << ", max_page_id = " << header_->max_page_id; + throw LogicError(); + } + BytesPoolPageHeader * const page_header = + &page_headers_->get_value(page_id); + if ((page_header->status != BYTES_POOL_PAGE_ACTIVE) && + (page_header->status != BYTES_POOL_PAGE_IN_USE)) { + GRNXX_ERROR() << "wrong page: page_id = " << page_id + << ", status = " << page_header->status; + throw LogicError(); + } + if (size > page_header->size_in_use) { + GRNXX_ERROR() << "wrong page: size = " << size + << ", size_in_use = " << page_header->size_in_use; + throw LogicError(); + } + if ((page_header->status == BYTES_POOL_PAGE_ACTIVE) || + (size < page_header->size_in_use)) { + // This operation does not change the page status. + page_header->size_in_use -= size; + } else { + // This operation makes the page EMPTY. + make_page_empty(page_id, page_header); + } +} + +uint64_t BytesPool::add(ValueArg value) { + if (value.size() > MAX_VALUE_SIZE) { + GRNXX_ERROR() << "invalid argument: size = " << value.size() + << ", max_size = " << MAX_VALUE_SIZE; + throw LogicError(); + } + uint64_t offset = header_->next_offset; + uint32_t size = static_cast<uint32_t>(value.size()); + uint32_t page_id = get_page_id(offset); + BytesPoolPageHeader *page_header = &page_headers_->get_value(page_id); + uint32_t offset_in_page = get_offset_in_page(offset); + const uint32_t size_left = POOL_PAGE_SIZE - offset_in_page; + if (size >= size_left) { + uint32_t next_page_id; + BytesPoolPageHeader *next_page_header = reserve_active_page(&next_page_id); + if (size > size_left) { + // Skip the remaining space of the previous ACTIVE page. + if (page_header->size_in_use == 0) { + // Change the page status from ACTIVE to EMPTY. + make_page_empty(page_id, page_header); + } else { + // Change the page status from ACTIVE to IN_USE. + page_header->status = BYTES_POOL_PAGE_IN_USE; + page_header->modified_time = clock_.now(); + } + // Use the new ACTIVE page. + header_->next_offset = next_page_id * POOL_PAGE_SIZE; + offset = header_->next_offset; + page_id = next_page_id; + page_header = next_page_header; + } else { + // Use the previous ACTIVE page. + page_header->status = BYTES_POOL_PAGE_IN_USE; + page_header->modified_time = clock_.now(); + header_->next_offset = next_page_id * POOL_PAGE_SIZE; + } + } + uint8_t * const value_buf = &pool_->get_value(offset); + std::memcpy(value_buf, value.data(), size); + page_header->size_in_use += size; + if (offset == header_->next_offset) { + header_->next_offset += size; + } + return get_value_id(offset, size); +} + +bool BytesPool::sweep(Duration lifetime) { + if (header_->latest_empty_page_id == INVALID_PAGE_ID) { + // Nothing to do. + return true; + } + BytesPoolPageHeader * const latest_empty_page_header = + &page_headers_->get_value(header_->latest_empty_page_id); + const Time threshold = clock_.now() - lifetime; + do { + const uint32_t oldest_empty_page_id = + latest_empty_page_header->next_page_id; + BytesPoolPageHeader * const oldest_empty_page_header = + &page_headers_->get_value(oldest_empty_page_id); + if (oldest_empty_page_header->status != BYTES_POOL_PAGE_EMPTY) { + GRNXX_ERROR() << "status conflict: status = " + << oldest_empty_page_header->status; + throw LogicError(); + } + if (oldest_empty_page_header->modified_time > threshold) { + // The remaining empty pages are not ready. + return true; + } + const uint32_t next_oldest_empty_page_id = + oldest_empty_page_header->next_page_id; + make_page_idle(oldest_empty_page_id, oldest_empty_page_header); + if (oldest_empty_page_header != latest_empty_page_header) { + latest_empty_page_header->next_page_id = next_oldest_empty_page_id; + } else { + header_->latest_empty_page_id = INVALID_PAGE_ID; + } + } while (header_->latest_empty_page_id != INVALID_PAGE_ID); + return true; +} + +BytesPool::~BytesPool() {} + +void BytesPool::create_pool(Storage *storage, uint32_t storage_node_id) { + storage_ = storage; + StorageNode storage_node = + storage->create_node(storage_node_id, sizeof(BytesPoolHeader)); + storage_node_id_ = storage_node.id(); + try { + header_ = static_cast<BytesPoolHeader *>(storage_node.body()); + *header_ = BytesPoolHeader(); + pool_.reset(Pool::create(storage, storage_node_id_, POOL_SIZE)); + page_headers_.reset(PageHeaderArray::create(storage, storage_node_id, + MAX_PAGE_ID + 1)); + header_->pool_storage_node_id = pool_->storage_node_id(); + header_->page_headers_storage_node_id = page_headers_->storage_node_id(); + } catch (...) { + storage->unlink_node(storage_node_id_); + throw; + } +} + +void BytesPool::open_pool(Storage *storage, uint32_t storage_node_id) { + storage_ = storage; + StorageNode storage_node = storage->open_node(storage_node_id); + storage_node_id_ = storage_node.id(); + header_ = static_cast<BytesPoolHeader *>(storage_node.body()); + pool_.reset(Pool::open(storage, header_->pool_storage_node_id)); + page_headers_.reset( + PageHeaderArray::open(storage, header_->page_headers_storage_node_id)); +} + +BytesPoolPageHeader *BytesPool::reserve_active_page(uint32_t *page_id) { + BytesPoolPageHeader *latest_idle_page_header = nullptr; + uint32_t next_page_id; + if (header_->latest_idle_page_id != INVALID_PAGE_ID) { + // Use the oldest IDLE page. + latest_idle_page_header = + &page_headers_->get_value(header_->latest_idle_page_id); + next_page_id = latest_idle_page_header->next_page_id; + } else { + // Create a new page. + next_page_id = header_->max_page_id + 1; + if (next_page_id > MAX_PAGE_ID) { + GRNXX_ERROR() << "too many pages: next_page_id = " << next_page_id + << ", max_page_id = " << MAX_PAGE_ID; + throw LogicError(); + } + } + BytesPoolPageHeader * const next_page_header = + &page_headers_->get_value(next_page_id); + if (latest_idle_page_header) { + if (next_page_id != header_->latest_idle_page_id) { + latest_idle_page_header->next_page_id = next_page_header->next_page_id; + } else { + header_->latest_idle_page_id = INVALID_PAGE_ID; + } + } else { + ++header_->max_page_id; + } + *next_page_header = BytesPoolPageHeader(); + next_page_header->modified_time = clock_.now(); + *page_id = next_page_id; + return next_page_header; +} + +void BytesPool::make_page_empty(uint32_t page_id, + BytesPoolPageHeader *page_header) { + BytesPoolPageHeader *latest_empty_page_header = nullptr; + if (header_->latest_empty_page_id != INVALID_PAGE_ID) { + latest_empty_page_header = + &page_headers_->get_value(header_->latest_empty_page_id); + } + page_header->status = BYTES_POOL_PAGE_EMPTY; + if (latest_empty_page_header) { + page_header->next_page_id = latest_empty_page_header->next_page_id; + latest_empty_page_header->next_page_id = page_id; + } else { + page_header->next_page_id = page_id; + } + page_header->modified_time = clock_.now(); + header_->latest_empty_page_id = page_id; +} + +void BytesPool::make_page_idle(uint32_t page_id, + BytesPoolPageHeader *page_header) { + BytesPoolPageHeader *latest_idle_page_header = nullptr; + if (header_->latest_idle_page_id != INVALID_PAGE_ID) { + latest_idle_page_header = + &page_headers_->get_value(header_->latest_idle_page_id); + } + page_header->status = BYTES_POOL_PAGE_IDLE; + if (latest_idle_page_header) { + page_header->next_page_id = latest_idle_page_header->next_page_id; + latest_idle_page_header->next_page_id = page_id; + } else { + page_header->next_page_id = page_id; + } + page_header->modified_time = clock_.now(); + header_->latest_idle_page_id = page_id; +} + +} // namespace map +} // namespace grnxx Added: lib/grnxx/map/bytes_pool.hpp (+163 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/map/bytes_pool.hpp 2013-07-18 17:17:25 +0900 (3defb45) @@ -0,0 +1,163 @@ +/* + Copyright (C) 2013 Brazil, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ +#ifndef GRNXX_MAP_BYTES_POOL_HPP +#define GRNXX_MAP_BYTES_POOL_HPP + +#include "grnxx/features.hpp" + +#include <memory> + +#include "grnxx/array.hpp" +#include "grnxx/bytes.hpp" +#include "grnxx/duration.hpp" +#include "grnxx/periodic_clock.hpp" +#include "grnxx/time.hpp" +#include "grnxx/traits.hpp" +#include "grnxx/types.hpp" + +namespace grnxx { + +class Storage; + +namespace map { + +constexpr uint64_t BYTES_POOL_VALUE_ID_MASK = (1ULL << 61) - 1; +constexpr uint64_t BYTES_POOL_MAX_VALUE_ID = BYTES_POOL_VALUE_ID_MASK; + +struct BytesPoolHeader; + +enum BytesPoolPageStatus : uint32_t { + // The next byte sequence will be added to this page. + BYTES_POOL_PAGE_ACTIVE = 0, + // This page is in use. + BYTES_POOL_PAGE_IN_USE = 1, + // This page is empty but not ready-to-use. + BYTES_POOL_PAGE_EMPTY = 2, + // This page is empty and ready-to-use. + BYTES_POOL_PAGE_IDLE = 3 +}; + +struct BytesPoolPageHeader { + // ACTIVE, IN_USE, EMPTY, and IDLE. + BytesPoolPageStatus status; + union { + // ACTIVE and IN_USE. + uint32_t size_in_use; + // EMPTY and IDLE. + uint32_t next_page_id; + }; + // ACTIVE, IN_USE, EMPTY, and IDLE. + Time modified_time; + + // Initialize member variables. + BytesPoolPageHeader(); +}; + +class BytesPool { + static constexpr uint32_t POOL_PAGE_SIZE = 1U << 20; + static constexpr uint32_t POOL_TABLE_SIZE = 1U << 14; + + static constexpr uint32_t MAX_VALUE_SIZE = 4096; + + // The number of bits allocated for representing a value size. + static constexpr uint8_t VALUE_ID_SIZE_BITS = 13; + static constexpr uint64_t VALUE_ID_SIZE_MASK = + (1ULL << VALUE_ID_SIZE_BITS) - 1; + + using Pool = Array<uint8_t, POOL_PAGE_SIZE, POOL_TABLE_SIZE>; + using PageHeaderArray = Array<BytesPoolPageHeader, POOL_TABLE_SIZE>; + + public: + using Value = typename Traits<Bytes>::Type; + using ValueArg = typename Traits<Bytes>::ArgumentType; + + ~BytesPool(); + + // Create a pool. + static BytesPool *create(Storage *storage, uint32_t storage_node_id); + // Opena pool. + static BytesPool *open(Storage *storage, uint32_t storage_node_id); + + // Unlink a pool. + static void unlink(Storage *storage, uint32_t storage_node_id); + + // Return the storage node ID. + uint32_t storage_node_id() const { + return storage_node_id_; + } + + // Return the page size. + static constexpr uint64_t page_size() { + return POOL_PAGE_SIZE; + } + + // Get a byte sequence. + Value get(uint64_t value_id) { + const uint64_t offset = get_offset(value_id); + const uint32_t size = get_size(value_id); + return Value(&pool_->get_value(offset), size); + } + // Remove a byte sequence. + void unset(uint64_t value_id); + // Add a byte sequence and return its ID. + uint64_t add(ValueArg value); + + // Sweep empty pages whose modified time <= (now - lifetime). + bool sweep(Duration lifetime); + + private: + Storage *storage_; + uint32_t storage_node_id_; + BytesPoolHeader *header_; + std::unique_ptr<Pool> pool_; + std::unique_ptr<PageHeaderArray> page_headers_; + PeriodicClock clock_; + + BytesPool(); + + void create_pool(Storage *storage, uint32_t storage_node_id); + void open_pool(Storage *storage, uint32_t storage_node_id); + + // Reserve a page. + BytesPoolPageHeader *reserve_active_page(uint32_t *page_id); + // Make a page empty. + void make_page_empty(uint32_t page_id, BytesPoolPageHeader *page_header); + // Make a page idle. + void make_page_idle(uint32_t page_id, BytesPoolPageHeader *page_header); + + static uint64_t get_value_id(uint64_t offset, uint32_t size) { + return (offset * (VALUE_ID_SIZE_MASK + 1)) | size; + } + static uint64_t get_offset(uint64_t value_id) { + return value_id / (VALUE_ID_SIZE_MASK + 1); + } + static uint32_t get_size(uint64_t value_id) { + return static_cast<uint32_t>(value_id & VALUE_ID_SIZE_MASK); + } + static uint32_t get_page_id(uint64_t offset) { + return static_cast<uint32_t>(offset / POOL_PAGE_SIZE); + } + static uint32_t get_offset_in_page(uint64_t offset) { + return static_cast<uint32_t>(offset % POOL_PAGE_SIZE); + } +}; + +} // namespace map +} // namespace grnxx + +#endif // GRNXX_MAP_BYTES_POOL_HPP -------------- next part -------------- HTML����������������������������... Download