[Groonga-commit] groonga/grnxx at 04ec27b [master] Update grnxx::map::DoubleArray to use grnxx::map::Pool.

Back to archive index

susumu.yata null+****@clear*****
Wed Aug 21 14:00:41 JST 2013


susumu.yata	2013-08-21 14:00:41 +0900 (Wed, 21 Aug 2013)

  New Revision: 04ec27b9e1decfc26daded38e2370f4d9716423b
  https://github.com/groonga/grnxx/commit/04ec27b9e1decfc26daded38e2370f4d9716423b

  Message:
    Update grnxx::map::DoubleArray to use grnxx::map::Pool.

  Modified files:
    lib/grnxx/map/double_array.cpp
    lib/grnxx/map/double_array.hpp

  Modified: lib/grnxx/map/double_array.cpp (+149 -77)
===================================================================
--- lib/grnxx/map/double_array.cpp    2013-08-21 13:59:44 +0900 (c333292)
+++ lib/grnxx/map/double_array.cpp    2013-08-21 14:00:41 +0900 (2b48100)
@@ -27,7 +27,7 @@
 #include "grnxx/logger.hpp"
 #include "grnxx/map/common_header.hpp"
 #include "grnxx/map/helper.hpp"
-#include "grnxx/map/key_pool.hpp"
+#include "grnxx/map/pool.hpp"
 #include "grnxx/mutex.hpp"
 #include "grnxx/storage.hpp"
 
@@ -333,7 +333,7 @@ class DoubleArrayImpl {
   using NodeArray    = Array<Node,     65536, 8192>;  // 42-bit
   using SiblingArray = Array<uint8_t, 262144, 4096>;  // 42-bit
   using BlockArray   = Array<Block,     8192, 1024>;  // 33-bit
-  using Pool         = KeyPool<Bytes>;
+  using Pool         = Pool<Bytes>;
 
   static constexpr uint64_t NODE_ARRAY_SIZE    = 1ULL << 42;
   static constexpr uint64_t SIBLING_ARRAY_SIZE = 1ULL << 42;
@@ -347,19 +347,17 @@ class DoubleArrayImpl {
   DoubleArrayImpl();
   ~DoubleArrayImpl();
 
-  static DoubleArrayImpl *create(Storage *storage, uint32_t storage_node_id);
+  static DoubleArrayImpl *create(Storage *storage, uint32_t storage_node_id,
+                                 Pool *pool);
   static DoubleArrayImpl *create(Storage *storage, uint32_t storage_node_id,
                                  DoubleArrayImpl *src_impl);
-  static DoubleArrayImpl *open(Storage *storage, uint32_t storage_node_id);
+  static DoubleArrayImpl *open(Storage *storage, uint32_t storage_node_id,
+                               Pool *pool);
 
   static void unlink(Storage *storage, uint32_t storage_node_id) {
     storage->unlink_node(storage_node_id);
   }
 
-  void set_pool(Pool *pool) {
-    pool_ = pool;
-  }
-
   uint32_t storage_node_id() const {
     return storage_node_id_;
   }
@@ -403,10 +401,10 @@ class DoubleArrayImpl {
   std::unique_ptr<BlockArray> blocks_;
   Pool *pool_;
 
-  void create_impl(Storage *storage, uint32_t storage_node_id);
+  void create_impl(Storage *storage, uint32_t storage_node_id, Pool *pool);
   void create_impl(Storage *storage, uint32_t storage_node_id,
                    DoubleArrayImpl *src_impl);
-  void open_impl(Storage *storage, uint32_t storage_node_id);
+  void open_impl(Storage *storage, uint32_t storage_node_id, Pool *pool);
 
   void defrag(DoubleArrayImpl *src_impl, uint64_t src, uint64_t dest);
 
@@ -444,13 +442,14 @@ DoubleArrayImpl::DoubleArrayImpl()
 DoubleArrayImpl::~DoubleArrayImpl() {}
 
 DoubleArrayImpl *DoubleArrayImpl::create(Storage *storage,
-                                         uint32_t storage_node_id) {
+                                         uint32_t storage_node_id,
+                                         Pool *pool) {
   std::unique_ptr<DoubleArrayImpl> impl(new (std::nothrow) DoubleArrayImpl);
   if (!impl) {
     GRNXX_ERROR() << "new grnxx::map::DoubleArrayImpl failed";
     throw MemoryError();
   }
-  impl->create_impl(storage, storage_node_id);
+  impl->create_impl(storage, storage_node_id, pool);
   return impl.release();
 }
 
@@ -467,13 +466,14 @@ DoubleArrayImpl *DoubleArrayImpl::create(Storage *storage,
 }
 
 DoubleArrayImpl *DoubleArrayImpl::open(Storage *storage,
-                                       uint32_t storage_node_id) {
+                                       uint32_t storage_node_id,
+                                       Pool *pool) {
   std::unique_ptr<DoubleArrayImpl> impl(new (std::nothrow) DoubleArrayImpl);
   if (!impl) {
     GRNXX_ERROR() << "new grnxx::map::DoubleArrayImpl failed";
     throw MemoryError();
   }
-  impl->open_impl(storage, storage_node_id);
+  impl->open_impl(storage, storage_node_id, pool);
   return impl.release();
 }
 
@@ -482,12 +482,19 @@ bool DoubleArrayImpl::get(int64_t key_id, Key *key) {
     // Out of range.
     return false;
   }
-  return pool_->get(key_id, key);
+  if (key) {
+    return pool_->get(key_id, key);
+  }
+  return pool_->get_bit(key_id);
 }
 
 bool DoubleArrayImpl::unset(int64_t key_id) {
+  if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) {
+    // Out of range.
+    return false;
+  }
   Key key;
-  if (!get(key_id, &key)) {
+  if (!pool_->get(key_id, &key)) {
     // Not found.
     return false;
   }
@@ -495,8 +502,12 @@ bool DoubleArrayImpl::unset(int64_t key_id) {
 }
 
 bool DoubleArrayImpl::reset(int64_t key_id, KeyArg dest_key) {
+  if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) {
+    // Out of range.
+    return false;
+  }
   Key src_key;
-  if (!get(key_id, &src_key)) {
+  if (!pool_->get(key_id, &src_key)) {
     // Not found.
     return false;
   }
@@ -586,7 +597,7 @@ bool DoubleArrayImpl::remove(KeyArg key) {
 }
 
 bool DoubleArrayImpl::replace(KeyArg src_key, KeyArg dest_key,
-                                 int64_t *key_id) {
+                              int64_t *key_id) {
   int64_t src_key_id;
   if (!find(src_key, &src_key_id)) {
     // Not found.
@@ -603,8 +614,7 @@ bool DoubleArrayImpl::replace(KeyArg src_key, KeyArg dest_key,
 }
 
 bool DoubleArrayImpl::find_longest_prefix_match(KeyArg query,
-                                                   int64_t *key_id,
-                                                   Key *key) {
+                                                int64_t *key_id, Key *key) {
   bool found = false;
   uint64_t node_id = ROOT_NODE_ID;
   Node node = nodes_->get(node_id);
@@ -672,7 +682,8 @@ bool DoubleArrayImpl::find_longest_prefix_match(KeyArg query,
   return found;
 }
 
-void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id) {
+void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id,
+                                  Pool *pool) {
   storage_ = storage;
   StorageNode storage_node =
       storage->create_node(storage_node_id, sizeof(Header));
@@ -680,6 +691,7 @@ void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id) {
   try {
     header_ = static_cast<Header *>(storage_node.body());
     *header_ = Header();
+    pool_ = pool;
     nodes_.reset(NodeArray::create(storage, storage_node_id_,
                                    NODE_ARRAY_SIZE));
     siblings_.reset(SiblingArray::create(storage, storage_node_id_,
@@ -699,10 +711,9 @@ void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id) {
 
 void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id,
                                   DoubleArrayImpl *src_impl) {
-  create_impl(storage, storage_node_id);
+  create_impl(storage, storage_node_id, src_impl->pool_);
   try {
     // Build a double-array from "src_impl".
-    pool_ = src_impl->pool_;
     defrag(src_impl, ROOT_NODE_ID, ROOT_NODE_ID);
   } catch (...) {
     storage->unlink_node(storage_node_id_);
@@ -710,7 +721,8 @@ void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id,
   }
 }
 
-void DoubleArrayImpl::open_impl(Storage *storage, uint32_t storage_node_id) {
+void DoubleArrayImpl::open_impl(Storage *storage, uint32_t storage_node_id,
+                                Pool *pool) {
   storage_ = storage;
   storage_node_id_ = storage_node_id;
   StorageNode storage_node = storage->open_node(storage_node_id_);
@@ -720,6 +732,7 @@ void DoubleArrayImpl::open_impl(Storage *storage, uint32_t storage_node_id) {
     throw LogicError();
   }
   header_ = static_cast<Header *>(storage_node.body());
+  pool_ = pool;
   nodes_.reset(NodeArray::open(storage, header_->nodes_storage_node_id));
   siblings_.reset(
       SiblingArray::open(storage, header_->siblings_storage_node_id));
@@ -1174,9 +1187,10 @@ void DoubleArrayImpl::unset_block_level(uint64_t block_id, Block *block) {
 
 struct DoubleArrayHeader {
   CommonHeader common_header;
+  uint64_t pool_id;
   uint64_t impl_id;
-  uint32_t impl_storage_node_id;
   uint32_t pool_storage_node_id;
+  uint32_t impl_storage_node_id;
   Mutex mutex;
 
   // Initialize the member variables.
@@ -1188,9 +1202,10 @@ struct DoubleArrayHeader {
 
 DoubleArrayHeader::DoubleArrayHeader()
     : common_header(FORMAT_STRING, MAP_DOUBLE_ARRAY),
+      pool_id(0),
       impl_id(0),
-      impl_storage_node_id(STORAGE_INVALID_NODE_ID),
       pool_storage_node_id(STORAGE_INVALID_NODE_ID),
+      impl_storage_node_id(STORAGE_INVALID_NODE_ID),
       mutex() {}
 
 DoubleArrayHeader::operator bool() const {
@@ -1224,10 +1239,12 @@ DoubleArray<Bytes>::DoubleArray()
     : storage_(nullptr),
       storage_node_id_(STORAGE_INVALID_NODE_ID),
       header_(nullptr),
-      impl_(),
-      old_impl_(),
       pool_(),
-      impl_id_(0) {}
+      impl_(),
+      queue_(),
+      pool_id_(0),
+      impl_id_(0),
+      clock_() {}
 
 DoubleArray<Bytes>::~DoubleArray() {}
 
@@ -1263,53 +1280,58 @@ MapType DoubleArray<Bytes>::type() const {
 }
 
 int64_t DoubleArray<Bytes>::max_key_id() {
-  return pool_->max_key_id();
+  refresh_if_possible();
+  return impl_->max_key_id();
 }
 
 uint64_t DoubleArray<Bytes>::num_keys() {
-  return pool_->num_keys();
+  refresh_if_possible();
+  return impl_->num_keys();
 }
 
 bool DoubleArray<Bytes>::get(int64_t key_id, Key *key) {
-  if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) {
-    // Out of range.
-    return false;
-  }
-  return pool_->get(key_id, key);
+  refresh_if_possible();
+  return impl_->get(key_id, key);
 }
 
 bool DoubleArray<Bytes>::unset(int64_t key_id) {
+  refresh_if_possible();
   return impl_->unset(key_id);
 }
 
 bool DoubleArray<Bytes>::reset(int64_t key_id, KeyArg dest_key) {
+  refresh_if_possible();
   return impl_->reset(key_id, dest_key);
 }
 
 bool DoubleArray<Bytes>::find(KeyArg key, int64_t *key_id) {
+  refresh_if_possible();
   return impl_->find(key, key_id);
 }
 
 bool DoubleArray<Bytes>::add(KeyArg key, int64_t *key_id) {
+  refresh_if_possible();
   return impl_->add(key, key_id);
 }
 
 bool DoubleArray<Bytes>::remove(KeyArg key) {
+  refresh_if_possible();
   return impl_->remove(key);
 }
 
 bool DoubleArray<Bytes>::replace(KeyArg src_key, KeyArg dest_key,
                                  int64_t *key_id) {
+  refresh_if_possible();
   return impl_->replace(src_key, dest_key, key_id);
 }
 
 void DoubleArray<Bytes>::defrag() {
-  refresh_impl();
-  if (max_key_id() == MAP_MIN_KEY_ID) {
+  refresh_if_possible();
+  if (max_key_id() < MAP_MIN_KEY_ID) {
     // Nothing to do.
     return;
   }
-  // Create a new impl.
+  pool_->defrag();
   std::unique_ptr<Impl> new_impl(
       Impl::create(storage_, storage_node_id_, impl_.get()));
   {
@@ -1317,44 +1339,69 @@ void DoubleArray<Bytes>::defrag() {
     Lock lock(&header_->mutex);
     header_->impl_storage_node_id = new_impl->storage_node_id();
     ++header_->impl_id;
-    old_impl_.swap(new_impl);
-    impl_.swap(old_impl_);
+    impl_.swap(new_impl);
     impl_id_ = header_->impl_id;
   }
-  Impl::unlink(storage_, old_impl_->storage_node_id());
-  pool_->defrag(0.5);
+  Impl::unlink(storage_, new_impl->storage_node_id());
+  try {
+    queue_.push(QueueEntry{ nullptr, std::move(new_impl), clock_.now() });
+  } catch (const std::exception &exception) {
+    GRNXX_ERROR() << "std::queue::push() failed";
+    throw StandardError(exception);
+  }
+}
+
+void DoubleArray<Bytes>::sweep(Duration lifetime) {
+  const Time threshold = clock_.now() - lifetime;
+  while (!queue_.empty()) {
+    QueueEntry &queue_entry = queue_.front();
+    if (queue_entry.time <= threshold) {
+      queue_.pop();
+    }
+  }
 }
 
 void DoubleArray<Bytes>::truncate() {
-  refresh_impl();
-  if (max_key_id() == MAP_MIN_KEY_ID) {
+  refresh_if_possible();
+  if (max_key_id() < MAP_MIN_KEY_ID) {
     // Nothing to do.
     return;
   }
-  // Create an empty impl.
-  std::unique_ptr<Impl> new_impl(Impl::create(storage_, storage_node_id_));
+  std::unique_ptr<Pool> new_pool(Pool::create(storage_, storage_node_id_));
+  std::unique_ptr<Impl> new_impl;
   try {
-    pool_->truncate();
-    new_impl->set_pool(pool_.get());
+    new_impl.reset(Impl::create(storage_, storage_node_id_, new_pool.get()));
   } catch (...) {
-    Impl::unlink(storage_, new_impl->storage_node_id());
+    Pool::unlink(storage_, new_pool->storage_node_id());
     throw;
   }
   {
-    // Validate a new impl.
+    // Validate a new impl and a new pool.
     Lock lock(&header_->mutex);
+    header_->pool_storage_node_id = new_pool->storage_node_id();
     header_->impl_storage_node_id = new_impl->storage_node_id();
+    ++header_->pool_id;
     ++header_->impl_id;
-    old_impl_.swap(new_impl);
-    impl_.swap(old_impl_);
+    pool_.swap(new_pool);
+    impl_.swap(new_impl);
+    pool_id_ = header_->pool_id;
     impl_id_ = header_->impl_id;
   }
-  Impl::unlink(storage_, old_impl_->storage_node_id());
+  Pool::unlink(storage_, new_pool->storage_node_id());
+  Impl::unlink(storage_, new_impl->storage_node_id());
+  try {
+    queue_.push(QueueEntry{ std::move(new_pool), std::move(new_impl),
+                            clock_.now() });
+  } catch (const std::exception &exception) {
+    GRNXX_ERROR() << "std::queue::push() failed";
+    throw StandardError(exception);
+  }
 }
 
 bool DoubleArray<Bytes>::find_longest_prefix_match(KeyArg query,
                                                    int64_t *key_id,
                                                    Key *key) {
+  refresh_if_possible();
   return impl_->find_longest_prefix_match(query, key_id, key);
 }
 
@@ -1379,17 +1426,18 @@ bool DoubleArray<Bytes>::find_longest_prefix_match(KeyArg query,
 void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id,
                                     const MapOptions &) {
   storage_ = storage;
-  StorageNode storage_node =
+  StorageNode header_node =
       storage->create_node(storage_node_id, sizeof(Header));
-  storage_node_id_ = storage_node.id();
+  storage_node_id_ = header_node.id();
   try {
-    header_ = static_cast<Header *>(storage_node.body());
+    header_ = static_cast<Header *>(header_node.body());
     *header_ = Header();
-    impl_.reset(Impl::create(storage, storage_node_id_));
     pool_.reset(Pool::create(storage, storage_node_id_));
-    impl_->set_pool(pool_.get());
-    header_->impl_storage_node_id = impl_->storage_node_id();
+    impl_.reset(Impl::create(storage, storage_node_id_, pool_.get()));
     header_->pool_storage_node_id = pool_->storage_node_id();
+    header_->impl_storage_node_id = impl_->storage_node_id();
+    pool_id_ = ++header_->pool_id;
+    impl_id_ = ++header_->impl_id;
   } catch (...) {
     storage->unlink_node(storage_node_id_);
     throw;
@@ -1399,35 +1447,59 @@ void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id,
 void DoubleArray<Bytes>::open_map(Storage *storage, uint32_t storage_node_id) {
   storage_ = storage;
   storage_node_id_ = storage_node_id;
-  StorageNode storage_node = storage->open_node(storage_node_id_);
-  if (storage_node.size() < sizeof(Header)) {
-    GRNXX_ERROR() << "invalid format: size = " << storage_node.size()
+  StorageNode header_node = storage->open_node(storage_node_id_);
+  if (header_node.size() < sizeof(Header)) {
+    GRNXX_ERROR() << "invalid format: size = " << header_node.size()
                   << ", header_size = " << sizeof(Header);
     throw LogicError();
   }
-  header_ = static_cast<Header *>(storage_node.body());
+  header_ = static_cast<Header *>(header_node.body());
   if (!*header_) {
     GRNXX_ERROR() << "wrong format: expected = " << FORMAT_STRING
                   << ", actual = " << header_->common_header.format();
     throw LogicError();
   }
+}
+
+void DoubleArray<Bytes>::refresh_if_possible() {
+  if (impl_id_ != header_->impl_id) {
+    refresh();
+  }
+}
+
+void DoubleArray<Bytes>::refresh() {
   Lock lock(&header_->mutex);
-  impl_.reset(Impl::open(storage, header_->impl_storage_node_id));
-  pool_.reset(Pool::open(storage, header_->pool_storage_node_id));
-  impl_->set_pool(pool_.get());
-  impl_id_ = header_->impl_id;
+  if (pool_id_ != header_->pool_id) {
+    refresh_pool();
+  }
+  if (impl_id_ != header_->impl_id) {
+    refresh_impl();
+  }
+}
+
+void DoubleArray<Bytes>::refresh_pool() {
+  std::unique_ptr<Pool> new_pool(
+      Pool::open(storage_, header_->pool_storage_node_id));
+  pool_.swap(new_pool);
+  pool_id_ = header_->pool_id;
+  try {
+    queue_.push(QueueEntry{ std::move(new_pool), nullptr, clock_.now() });
+  } catch (const std::exception &exception) {
+    GRNXX_ERROR() << "std::queue::push() failed";
+    throw StandardError(exception);
+  }
 }
 
 void DoubleArray<Bytes>::refresh_impl() {
-  if (impl_id_ != header_->impl_id) {
-    Lock lock(&header_->mutex);
-    if (impl_id_ != header_->impl_id) {
-      std::unique_ptr<Impl> new_impl(
-          Impl::open(storage_, header_->impl_storage_node_id));
-      old_impl_.swap(new_impl);
-      impl_.swap(old_impl_);
-      impl_id_ = header_->impl_id;
-    }
+  std::unique_ptr<Impl> new_impl(
+      Impl::open(storage_, header_->impl_storage_node_id, pool_.get()));
+  impl_.swap(new_impl);
+  impl_id_ = header_->impl_id;
+  try {
+    queue_.push(QueueEntry{ nullptr, std::move(new_impl), clock_.now() });
+  } catch (const std::exception &exception) {
+    GRNXX_ERROR() << "std::queue::push() failed";
+    throw StandardError(exception);
   }
 }
 

  Modified: lib/grnxx/map/double_array.hpp (+23 -6)
===================================================================
--- lib/grnxx/map/double_array.hpp    2013-08-21 13:59:44 +0900 (e7569e8)
+++ lib/grnxx/map/double_array.hpp    2013-08-21 14:00:41 +0900 (1bad27b)
@@ -21,11 +21,15 @@
 #include "grnxx/features.hpp"
 
 #include <memory>
+#include <queue>
 
 #include "grnxx/bytes.hpp"
+#include "grnxx/duration.hpp"
 #include "grnxx/map.hpp"
 #include "grnxx/map_cursor.hpp"
 #include "grnxx/map_cursor_query.hpp"
+#include "grnxx/periodic_clock.hpp"
+#include "grnxx/time.hpp"
 #include "grnxx/types.hpp"
 
 namespace grnxx {
@@ -34,11 +38,12 @@ class Storage;
 
 namespace map {
 
-template <typename T> class KeyPool;
+template <typename T> class Pool;
 
-struct DoubleArrayHeader;
 class DoubleArrayImpl;
 
+struct DoubleArrayHeader;
+
 template <typename T>
 class DoubleArray {
  public:
@@ -51,10 +56,16 @@ template <>
 class DoubleArray<Bytes> : public Map<Bytes> {
   using Header = DoubleArrayHeader;
   using Impl   = DoubleArrayImpl;
-  using Pool   = KeyPool<Bytes>;
+  using Pool   = Pool<Bytes>;
+
+  struct QueueEntry {
+    std::unique_ptr<Pool> pool;
+    std::unique_ptr<Impl> impl;
+    Time time;
+  };
 
  public:
-  using Key = typename Map<Bytes>::Key;
+  using Key    = typename Map<Bytes>::Key;
   using KeyArg = typename Map<Bytes>::KeyArg;
   using Cursor = typename Map<Bytes>::Cursor;
 
@@ -81,6 +92,7 @@ class DoubleArray<Bytes> : public Map<Bytes> {
   bool replace(KeyArg src_key, KeyArg dest_key, int64_t *key_id = nullptr);
 
   void defrag();
+  void sweep(Duration lifetime);
 
   void truncate();
 
@@ -102,15 +114,20 @@ class DoubleArray<Bytes> : public Map<Bytes> {
   Storage *storage_;
   uint32_t storage_node_id_;
   Header *header_;
-  std::unique_ptr<Impl> impl_;
-  std::unique_ptr<Impl> old_impl_;
   std::unique_ptr<Pool> pool_;
+  std::unique_ptr<Impl> impl_;
+  std::queue<QueueEntry> queue_;
+  uint64_t pool_id_;
   uint64_t impl_id_;
+  PeriodicClock clock_;
 
   void create_map(Storage *storage, uint32_t storage_node_id,
                   const MapOptions &options);
   void open_map(Storage *storage, uint32_t storage_node_id);
 
+  inline void refresh_if_possible();
+  void refresh();
+  void refresh_pool();
   void refresh_impl();
 };
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index