[Groonga-commit] groonga/grnxx at a8b5462 [master] Implement grnxx::map::BytesStore.

Back to archive index

susumu.yata null+****@clear*****
Wed May 22 07:23:54 JST 2013


susumu.yata	2013-05-22 07:23:54 +0900 (Wed, 22 May 2013)

  New Revision: a8b5462f71b46cff792e1af5228c8b26dff6c054
  https://github.com/groonga/grnxx/commit/a8b5462f71b46cff792e1af5228c8b26dff6c054

  Message:
    Implement grnxx::map::BytesStore.

  Modified files:
    lib/grnxx/map/bytes_store.cpp
    lib/grnxx/map/bytes_store.hpp

  Modified: lib/grnxx/map/bytes_store.cpp (+504 -1)
===================================================================
--- lib/grnxx/map/bytes_store.cpp    2013-05-21 13:05:27 +0900 (a608f19)
+++ lib/grnxx/map/bytes_store.cpp    2013-05-22 07:23:54 +0900 (07d4e54)
@@ -17,16 +17,519 @@
 */
 #include "grnxx/map/bytes_store.hpp"
 
+#include <cstring>
 #include <memory>
 #include <new>
 
 #include "grnxx/logger.hpp"
 #include "grnxx/storage.hpp"
+#include "grnxx/string_builder.hpp"
+#include "grnxx/time/periodic_clock.hpp"
+#include "grnxx/time/time.hpp"
 
 namespace grnxx {
 namespace map {
+namespace {
 
-// TODO
+constexpr uint32_t BYTES_STORE_MAX_SIZE     = 4096;
+
+constexpr uint8_t  BYTES_STORE_OFFSET_SHIFT = 13;
+constexpr uint64_t BYTES_STORE_SIZE_MASK    =
+    (1ULL << BYTES_STORE_OFFSET_SHIFT) - 1;
+
+static_assert(BYTES_STORE_MAX_SIZE <= BYTES_STORE_SIZE_MASK,
+              "BYTES_STORE_MAX_SIZE > BYTES_STORE_SIZE_MASK");
+
+constexpr uint32_t BYTES_STORE_PAGE_SIZE            = 1U << 20;
+constexpr uint32_t BYTES_STORE_TABLE_SIZE           = 1U << 14;
+constexpr uint32_t BYTES_STORE_SECONDARY_TABLE_SIZE = 1U << 14;
+
+constexpr uint32_t BYTES_STORE_INVALID_PAGE_ID      =
+    BYTES_STORE_TABLE_SIZE * BYTES_STORE_SECONDARY_TABLE_SIZE;
+
+enum BytesStorePageStatus : uint32_t {
+  // The next byte sequence will be added to the page.
+  BYTES_STORE_PAGE_ACTIVE = 0,
+  // The page is in use.
+  BYTES_STORE_PAGE_IN_USE = 1,
+  // The page is empty but not ready-to-use.
+  BYTES_STORE_PAGE_EMPTY  = 2,
+  // The page is empty and ready-to-use.
+  BYTES_STORE_PAGE_IDLE   = 3
+};
+
+StringBuilder &operator<<(StringBuilder &builder,
+                          BytesStorePageStatus status) {
+  switch (status) {
+    case BYTES_STORE_PAGE_ACTIVE: {
+      return builder << "BYTES_STORE_PAGE_ACTIVE";
+    }
+    case BYTES_STORE_PAGE_IN_USE: {
+      return builder << "BYTES_STORE_PAGE_IN_USE";
+    }
+    case BYTES_STORE_PAGE_EMPTY: {
+      return builder << "BYTES_STORE_PAGE_EMPTY";
+    }
+    case BYTES_STORE_PAGE_IDLE: {
+      return builder << "BYTES_STORE_PAGE_IDLE";
+    }
+    default: {
+      return builder << "n/a";
+    }
+  }
+}
+
+struct BytesStoreHeader {
+  uint64_t next_offset;
+  uint32_t max_page_id;
+  uint32_t latest_empty_page_id;
+  uint32_t latest_idle_page_id;
+  uint32_t pages_storage_node_id;
+  uint32_t page_headers_storage_node_id;
+  uint32_t reserved;
+
+  BytesStoreHeader();
+};
+
+BytesStoreHeader::BytesStoreHeader()
+    : next_offset(0),
+      max_page_id(0),
+      latest_empty_page_id(BYTES_STORE_INVALID_PAGE_ID),
+      latest_idle_page_id(BYTES_STORE_INVALID_PAGE_ID),
+      pages_storage_node_id(STORAGE_INVALID_NODE_ID),
+      page_headers_storage_node_id(STORAGE_INVALID_NODE_ID),
+      reserved(0) {}
+
+struct BytesStorePageHeader {
+  // ACTIVE, IN_USE, EMPTY, and IDLE.
+  BytesStorePageStatus status;
+  union {
+    // ACTIVE and IN_USE.
+    uint32_t size_in_use;
+    // EMPTY and IDLE.
+    uint32_t next_page_id;
+  };
+  // ACTIVE, IN_USE, EMPTY, and IDLE.
+  Time modified_time;
+
+  BytesStorePageHeader();
+};
+
+BytesStorePageHeader::BytesStorePageHeader()
+    : status(BYTES_STORE_PAGE_ACTIVE),
+      size_in_use(0),
+      modified_time(0) {}
+
+class BytesStoreImpl : public BytesStore {
+  using BytesArray = Array<uint8_t, BYTES_STORE_PAGE_SIZE,
+                                    BYTES_STORE_TABLE_SIZE,
+                                    BYTES_STORE_SECONDARY_TABLE_SIZE>;
+  using PageHeaderArray = Array<BytesStorePageHeader,
+                                BYTES_STORE_TABLE_SIZE,
+                                BYTES_STORE_SECONDARY_TABLE_SIZE,
+                                1>;
+
+ public:
+  using BytesArg = typename Traits<Bytes>::ArgumentType;
+
+  BytesStoreImpl();
+  virtual ~BytesStoreImpl();
+
+  static BytesStoreImpl *create(Storage *storage, uint32_t storage_node_id);
+  static BytesStoreImpl *open(Storage *storage, uint32_t storage_node_id);
+
+  uint32_t storage_node_id() const;
+
+  bool get(uint64_t bytes_id, Bytes *bytes);
+  bool unset(uint64_t bytes_id);
+  bool add(BytesArg bytes, uint64_t *bytes_id);
+
+  bool sweep(Duration lifetime);
+
+ private:
+  Storage *storage_;
+  uint32_t storage_node_id_;
+  BytesStoreHeader *header_;
+  BytesArray pages_;
+  PageHeaderArray page_headers_;
+  PeriodicClock clock_;
+
+  bool create_store(Storage *storage, uint32_t storage_node_id);
+  bool open_store(Storage *storage, uint32_t storage_node_id);
+
+  bool reserve_active_page(uint32_t *page_id,
+                           BytesStorePageHeader **page_header);
+  bool make_page_empty(uint32_t page_id, BytesStorePageHeader *page_header);
+  bool make_page_idle(uint32_t page_id, BytesStorePageHeader *page_header);
+
+  static uint64_t get_bytes_id(uint64_t offset, uint32_t size) {
+    return (offset << BYTES_STORE_OFFSET_SHIFT) | size;
+  }
+  static uint64_t get_offset(uint64_t bytes_id) {
+    return bytes_id >> BYTES_STORE_OFFSET_SHIFT;
+  }
+  static uint32_t get_size(uint64_t bytes_id) {
+    return static_cast<uint32_t>(bytes_id & BYTES_STORE_SIZE_MASK);
+  }
+  static uint32_t get_page_id(uint64_t offset) {
+    return static_cast<uint32_t>(offset / BYTES_STORE_PAGE_SIZE);
+  }
+  static uint32_t get_offset_in_page(uint64_t offset) {
+    return static_cast<uint32_t>(offset % BYTES_STORE_PAGE_SIZE);
+  }
+};
+
+BytesStoreImpl::BytesStoreImpl()
+    : storage_(nullptr),
+      storage_node_id_(STORAGE_INVALID_NODE_ID),
+      header_(nullptr),
+      pages_(),
+      page_headers_() {}
+
+BytesStoreImpl::~BytesStoreImpl() {}
+
+BytesStoreImpl *BytesStoreImpl::create(Storage *storage,
+                                       uint32_t storage_node_id) {
+  if (!storage) {
+    GRNXX_ERROR() << "invalid argument: storage == nullptr";
+    return nullptr;
+  }
+  std::unique_ptr<BytesStoreImpl> store(new (std::nothrow) BytesStoreImpl);
+  if (!store) {
+    GRNXX_ERROR() << "new grnxx::map::BytesStoreImpl failed";
+    return nullptr;
+  }
+  if (!store->create_store(storage, storage_node_id)) {
+    return nullptr;
+  }
+  return store.release();
+}
+
+BytesStoreImpl *BytesStoreImpl::open(Storage *storage,
+                                     uint32_t storage_node_id) {
+  if (!storage) {
+    GRNXX_ERROR() << "invalid argument: storage == nullptr";
+    return nullptr;
+  }
+  std::unique_ptr<BytesStoreImpl> store(new (std::nothrow) BytesStoreImpl);
+  if (!store) {
+    GRNXX_ERROR() << "new grnxx::map::BytesStoreImpl failed";
+    return nullptr;
+  }
+  if (!store->open_store(storage, storage_node_id)) {
+    return nullptr;
+  }
+  return store.release();
+}
+
+uint32_t BytesStoreImpl::storage_node_id() const {
+  return storage_node_id_;
+}
+
+bool BytesStoreImpl::get(uint64_t bytes_id, Bytes *bytes) {
+  const uint64_t offset = get_offset(bytes_id);
+  const uint32_t size = get_size(bytes_id);
+  const uint32_t page_id = get_page_id(offset);
+  if ((size > BYTES_STORE_MAX_SIZE) || (page_id > header_->max_page_id)) {
+    GRNXX_ERROR() << "invalid argument: offset = " << offset
+                  << ", size = " << size
+                  << ", page_id = " << page_id
+                  << ", max_size = " << BYTES_STORE_MAX_SIZE
+                  << ", max_page_id = " << header_->max_page_id;
+    return false;
+  }
+  const uint8_t * const page = pages_.get_page(page_id);
+  if (!page) {
+    return false;
+  }
+  if (bytes) {
+    const uint32_t offset_in_page = get_offset_in_page(offset);
+    *bytes = Bytes(&page[offset_in_page], size);
+  }
+  return true;
+}
+
+bool BytesStoreImpl::unset(uint64_t bytes_id) {
+  const uint64_t offset = get_offset(bytes_id);
+  const uint32_t size = get_size(bytes_id);
+  const uint32_t page_id = get_page_id(offset);
+  if ((size > BYTES_STORE_MAX_SIZE) || (page_id > header_->max_page_id)) {
+    GRNXX_ERROR() << "invalid argument: offset = " << offset
+                  << ", size = " << size
+                  << ", page_id = " << page_id
+                  << ", max_size = " << BYTES_STORE_MAX_SIZE
+                  << ", max_page_id = " << header_->max_page_id;
+    return false;
+  }
+  BytesStorePageHeader * const page_header = page_headers_.get_value(page_id);
+  if (!page_header) {
+    return false;
+  }
+  if ((page_header->status != BYTES_STORE_PAGE_ACTIVE) &&
+      (page_header->status != BYTES_STORE_PAGE_IN_USE)) {
+    GRNXX_ERROR() << "invalid argument: page_id = " << page_id
+                  << ", status = " << page_header->status;
+    return false;
+  }
+  if (size > page_header->size_in_use) {
+    GRNXX_ERROR() << "invalid argument: size = " << size
+                  << ", size_in_use = " << page_header->size_in_use;
+    return false;
+  }
+  if ((page_header->status == BYTES_STORE_PAGE_ACTIVE) ||
+      (size < page_header->size_in_use)) {
+    // This operation does not change the page status.
+    page_header->size_in_use -= size;
+  } else {
+    // This operation makes the page EMPTY.
+    if (!make_page_empty(page_id, page_header)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool BytesStoreImpl::add(BytesArg bytes, uint64_t *bytes_id) {
+  if (bytes.size() > BYTES_STORE_MAX_SIZE) {
+    GRNXX_ERROR() << "invalid argument: size = " << bytes.size();
+    return false;
+  }
+  uint64_t offset = header_->next_offset;
+  uint32_t size = static_cast<uint32_t>(bytes.size());
+  uint32_t page_id = get_page_id(offset);
+  BytesStorePageHeader *page_header = page_headers_.get_value(page_id);
+  if (!page_header) {
+    return false;
+  }
+  uint32_t offset_in_page = get_offset_in_page(offset);
+  const uint32_t size_left = BYTES_STORE_PAGE_SIZE - offset_in_page;
+  if (size >= size_left) {
+    uint32_t next_page_id;
+    BytesStorePageHeader *next_page_header;
+    if (!reserve_active_page(&next_page_id, &next_page_header)) {
+      return false;
+    }
+    if (size > size_left) {
+      // Skip the remaining space of the previous ACTIVE page.
+      if (page_header->size_in_use == 0) {
+        // Change the page status from ACTIVE to EMPTY.
+        if (!make_page_empty(page_id, page_header)) {
+          return false;
+        }
+      } else {
+        // Change the page status from ACTIVE to IN_USE.
+        page_header->status = BYTES_STORE_PAGE_IN_USE;
+        page_header->modified_time = clock_.now();
+      }
+      // Use the new ACTIVE page.
+      header_->next_offset = next_page_id * pages_.page_size();
+      offset = header_->next_offset;
+      page_id = next_page_id;
+      page_header = next_page_header;
+      offset_in_page = get_offset_in_page(offset);
+    } else {
+      // Use the previous ACTIVE page.
+      page_header->status = BYTES_STORE_PAGE_IN_USE;
+      page_header->modified_time = clock_.now();
+      header_->next_offset = next_page_id * pages_.page_size();
+    }
+  }
+  uint8_t * const page = pages_.get_page(page_id);
+  if (!page) {
+    return false;
+  }
+  std::memcpy(page + offset_in_page, bytes.ptr(), size);
+  *bytes_id = get_bytes_id(offset, size);
+  page_header->size_in_use += size;
+  if (offset == header_->next_offset) {
+    header_->next_offset += size;
+  }
+  return true;
+}
+
+bool BytesStoreImpl::sweep(Duration lifetime) {
+  if (header_->latest_empty_page_id == BYTES_STORE_INVALID_PAGE_ID) {
+    // Nothing to do.
+    return true;
+  }
+  BytesStorePageHeader * const latest_empty_page_header =
+      page_headers_.get_value(header_->latest_empty_page_id);
+  if (!latest_empty_page_header) {
+    return false;
+  }
+  const Time threshold = clock_.now() - lifetime;
+  do {
+    const uint32_t oldest_empty_page_id =
+        latest_empty_page_header->next_page_id;
+    BytesStorePageHeader * const oldest_empty_page_header =
+        page_headers_.get_value(oldest_empty_page_id);
+    if (!oldest_empty_page_header) {
+      return false;
+    }
+    if (oldest_empty_page_header->status != BYTES_STORE_PAGE_EMPTY) {
+      GRNXX_ERROR() << "status conflict: status = "
+                    << oldest_empty_page_header->status;
+      return false;
+    }
+    if (oldest_empty_page_header->modified_time > threshold) {
+      // The remaining empty pages are not ready.
+      return true;
+    }
+    const uint32_t next_oldest_empty_page_id =
+        oldest_empty_page_header->next_page_id;
+    if (!make_page_idle(oldest_empty_page_id, oldest_empty_page_header)) {
+      return false;
+    }
+    if (oldest_empty_page_header != latest_empty_page_header) {
+      latest_empty_page_header->next_page_id = next_oldest_empty_page_id;
+    } else {
+      header_->latest_empty_page_id = BYTES_STORE_INVALID_PAGE_ID;
+    }
+  } while (header_->latest_empty_page_id != BYTES_STORE_INVALID_PAGE_ID);
+  return true;
+}
+
+bool BytesStoreImpl::create_store(Storage *storage, uint32_t storage_node_id) {
+  storage_ = storage;
+  StorageNode storage_node =
+      storage->create_node(storage_node_id, sizeof(BytesStoreHeader));
+  if (!storage_node) {
+    return false;
+  }
+  storage_node_id_ = storage_node.id();
+  header_ = static_cast<BytesStoreHeader *>(storage_node.body());
+  *header_ = BytesStoreHeader();
+  if (!pages_.create(storage, storage_node_id_) ||
+      !page_headers_.create(storage, storage_node_id_)) {
+    storage->unlink_node(storage_node_id_);
+    return false;
+  }
+  header_->pages_storage_node_id = pages_.storage_node_id();
+  header_->page_headers_storage_node_id = page_headers_.storage_node_id();
+  return true;
+}
+
+bool BytesStoreImpl::open_store(Storage *storage, uint32_t storage_node_id) {
+  storage_ = storage;
+  StorageNode storage_node = storage->open_node(storage_node_id);
+  if (!storage_node) {
+    return false;
+  }
+  storage_node_id_ = storage_node.id();
+  header_ = static_cast<BytesStoreHeader *>(storage_node.body());
+  if (!pages_.open(storage, header_->pages_storage_node_id) ||
+      !page_headers_.open(storage, header_->page_headers_storage_node_id)) {
+    return false;
+  }
+  return true;
+}
+
+bool BytesStoreImpl::reserve_active_page(uint32_t *page_id,
+                                         BytesStorePageHeader **page_header) {
+  BytesStorePageHeader *latest_idle_page_header = nullptr;
+  uint32_t next_page_id;
+  if (header_->latest_idle_page_id != BYTES_STORE_INVALID_PAGE_ID) {
+    // Use the oldest IDLE page.
+    latest_idle_page_header =
+        page_headers_.get_value(header_->latest_idle_page_id);
+    if (!latest_idle_page_header) {
+      return false;
+    }
+    next_page_id = latest_idle_page_header->next_page_id;
+  } else {
+    // Create a new page.
+    next_page_id = header_->max_page_id + 1;
+  }
+  BytesStorePageHeader * const next_page_header =
+      page_headers_.get_value(next_page_id);
+  if (!next_page_header) {
+    return false;
+  }
+  if (latest_idle_page_header) {
+    if (next_page_id != header_->latest_idle_page_id) {
+      latest_idle_page_header->next_page_id = next_page_header->next_page_id;
+    } else {
+      header_->latest_idle_page_id = BYTES_STORE_INVALID_PAGE_ID;
+    }
+  } else {
+    ++header_->max_page_id;
+  }
+  *next_page_header = BytesStorePageHeader();
+  next_page_header->modified_time = clock_.now();
+  *page_id = next_page_id;
+  *page_header = next_page_header;
+  return true;
+}
+
+bool BytesStoreImpl::make_page_empty(uint32_t page_id,
+                                     BytesStorePageHeader *page_header) {
+  BytesStorePageHeader *latest_empty_page_header = nullptr;
+  if (header_->latest_empty_page_id != BYTES_STORE_INVALID_PAGE_ID) {
+    latest_empty_page_header =
+        page_headers_.get_value(header_->latest_empty_page_id);
+    if (!latest_empty_page_header) {
+      return false;
+    }
+  }
+  page_header->status = BYTES_STORE_PAGE_EMPTY;
+  if (latest_empty_page_header) {
+    page_header->next_page_id = latest_empty_page_header->next_page_id;
+    latest_empty_page_header->next_page_id = page_id;
+  } else {
+    page_header->next_page_id = page_id;
+  }
+  page_header->modified_time = clock_.now();
+  header_->latest_empty_page_id = page_id;
+  return true;
+}
+
+bool BytesStoreImpl::make_page_idle(uint32_t page_id,
+                                    BytesStorePageHeader *page_header) {
+  BytesStorePageHeader *latest_idle_page_header = nullptr;
+  if (header_->latest_idle_page_id != BYTES_STORE_INVALID_PAGE_ID) {
+    BytesStorePageHeader * const latest_idle_page_header =
+        page_headers_.get_value(header_->latest_idle_page_id);
+    if (!latest_idle_page_header) {
+      return false;
+    }
+  }
+  page_header->status = BYTES_STORE_PAGE_IDLE;
+  if (latest_idle_page_header) {
+    page_header->next_page_id = latest_idle_page_header->next_page_id;
+    latest_idle_page_header->next_page_id = page_id;
+  } else {
+    page_header->next_page_id = page_id;
+  }
+  page_header->modified_time = clock_.now();
+  header_->latest_idle_page_id = page_id;
+  return true;
+}
+
+}  // namespace
+
+BytesStore::BytesStore() {}
+BytesStore::~BytesStore() {}
+
+BytesStore *BytesStore::create(Storage *storage, uint32_t storage_node_id) {
+  return BytesStoreImpl::create(storage, storage_node_id);
+}
+
+BytesStore *BytesStore::open(Storage *storage, uint32_t storage_node_id) {
+  return BytesStoreImpl::open(storage, storage_node_id);
+}
+
+bool BytesStore::unlink(Storage *storage, uint32_t storage_node_id) {
+  if (!storage) {
+    GRNXX_ERROR() << "invalid argument: storage == nullptr";
+    return nullptr;
+  }
+  std::unique_ptr<BytesStore> store(
+      BytesStore::open(storage, storage_node_id));
+  if (!store) {
+    return false;
+  }
+  return storage->unlink_node(storage_node_id);
+}
 
 }  // namespace map
 }  // namespace grnxx

  Modified: lib/grnxx/map/bytes_store.hpp (+8 -5)
===================================================================
--- lib/grnxx/map/bytes_store.hpp    2013-05-21 13:05:27 +0900 (9086f0f)
+++ lib/grnxx/map/bytes_store.hpp    2013-05-22 07:23:54 +0900 (b4bf86f)
@@ -20,7 +20,9 @@
 
 #include "grnxx/features.hpp"
 
+#include "grnxx/array.hpp"
 #include "grnxx/bytes.hpp"
+#include "grnxx/time/duration.hpp"
 #include "grnxx/traits.hpp"
 #include "grnxx/types.hpp"
 
@@ -30,10 +32,6 @@ class Storage;
 
 namespace map {
 
-constexpr uint64_t BYTES_STORE_ID_MASK = (1ULL << 60) - 1;
-constexpr uint64_t BYTES_STORE_MIN_ID  = 0;
-constexpr uint64_t BYTES_STORE_MAX_ID  = BYTES_STORE_ID_MASK;
-
 class BytesStore {
  public:
   using BytesArg = typename Traits<Bytes>::ArgumentType;
@@ -49,13 +47,18 @@ class BytesStore {
   // Unlink a store.
   static bool unlink(Storage *storage, uint32_t storage_node_id);
 
-  // TODO: get() should be inlined?
+  // Return the storage node ID.
+  virtual uint32_t storage_node_id() const = 0;
+
   // Get a stored byte sequence.
   virtual bool get(uint64_t bytes_id, Bytes *bytes) = 0;
   // Remove a stored byte sequence.
   virtual bool unset(uint64_t bytes_id) = 0;
   // Add a byte sequence.
   virtual bool add(BytesArg bytes, uint64_t *bytes_id) = 0;
+
+  // Sweep empty pages whose modified time < (now - lifetime).
+  virtual bool sweep(Duration lifetime) = 0;
 };
 
 }  // namespace map
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index