[Groonga-commit] groonga/grnxx [master] Fix bugs of grnxx::alpha::DoubleArray::insert().

Back to archive index

susumu.yata null+****@clear*****
Wed Jan 30 16:13:32 JST 2013


susumu.yata	2013-01-30 16:13:32 +0900 (Wed, 30 Jan 2013)

  New Revision: 2086f7d8fbf3a9443d327fe32fe308a17be03654
  https://github.com/groonga/grnxx/commit/2086f7d8fbf3a9443d327fe32fe308a17be03654

  Log:
    Fix bugs of grnxx::alpha::DoubleArray::insert().

  Modified files:
    lib/alpha/double_array.cpp
    lib/alpha/double_array.hpp

  Modified: lib/alpha/double_array.cpp (+52 -20)
===================================================================
--- lib/alpha/double_array.cpp    2013-01-21 22:10:20 +0900 (00a37e4)
+++ lib/alpha/double_array.cpp    2013-01-30 16:13:32 +0900 (a387280)
@@ -29,13 +29,15 @@ DoubleArrayOpen DOUBLE_ARRAY_OPEN;
 
 DoubleArrayHeader::DoubleArrayHeader()
   : nodes_block_id_(io::BLOCK_INVALID_ID),
+    siblings_block_id_(io::BLOCK_INVALID_ID),
     chunks_block_id_(io::BLOCK_INVALID_ID),
     entries_block_id_(io::BLOCK_INVALID_ID),
     keys_block_id_(io::BLOCK_INVALID_ID),
     root_node_id_(0),
     total_key_length_(0),
     next_key_id_(0),
-    max_key_id_(0),
+    next_key_pos_(0),
+    max_key_id_(-1),
     num_keys_(0),
     num_chunks_(0),
     num_phantoms_(0),
@@ -57,9 +59,13 @@ DoubleArrayKey::DoubleArrayKey(uint64_t id, const void *address,
 
 DoubleArrayImpl::~DoubleArrayImpl() {
   if (!initialized_) try {
+    // Allocated blocks are unlinked if initialization failed.
     if (header_->nodes_block_id() != io::BLOCK_INVALID_ID) {
       nodes_.unlink(pool_, header_->nodes_block_id());
     }
+    if (header_->siblings_block_id() != io::BLOCK_INVALID_ID) {
+      siblings_.unlink(pool_, header_->siblings_block_id());
+    }
     if (header_->chunks_block_id() != io::BLOCK_INVALID_ID) {
       chunks_.unlink(pool_, header_->chunks_block_id());
     }
@@ -105,6 +111,7 @@ bool DoubleArrayImpl::search(const uint8_t *ptr, uint64_t length,
     return false;
   }
 
+  // Note that nodes_[node_id] might be updated by other threads/processes.
   const DoubleArrayNode node = nodes_[node_id];
   if (!node.is_leaf()) {
     return false;
@@ -123,7 +130,8 @@ bool DoubleArrayImpl::search(const uint8_t *ptr, uint64_t length,
 
 bool DoubleArrayImpl::insert(const uint8_t *ptr, uint64_t length,
                              uint64_t *key_pos) {
-  // TODO
+  // TODO: Exclusive access control is required.
+
 //  GRN_DAT_THROW_IF(STATUS_ERROR, (status_flags() & CHANGING_MASK) != 0);
 //  StatusFlagManager status_flag_manager(header_, INSERTING_FLAG);
 
@@ -140,11 +148,14 @@ bool DoubleArrayImpl::insert(const uint8_t *ptr, uint64_t length,
     return false;
   }
 
-  const uint64_t new_key_id = header_->next_key_id();
+  const int64_t new_key_id = header_->next_key_id();
   const uint64_t new_key_pos = append_key(ptr, length, new_key_id);
 
   header_->set_total_key_length(header_->total_key_length() + length);
   header_->set_num_keys(header_->num_keys() + 1);
+
+  // TODO: The first key ID should be fixed to 0 or 1.
+  //       Currently, 0 is used.
   if (new_key_id > header_->max_key_id()) {
     header_->set_max_key_id(new_key_id);
     header_->set_next_key_id(new_key_id + 1);
@@ -166,6 +177,7 @@ DoubleArrayImpl::DoubleArrayImpl()
     header_(nullptr),
     recycler_(nullptr),
     nodes_(),
+    siblings_(),
     chunks_(),
     entries_(),
     keys_(),
@@ -184,6 +196,8 @@ void DoubleArrayImpl::create_double_array(io::Pool pool) {
 
   nodes_.create(pool);
   header_->set_nodes_block_id(nodes_.block_id());
+  siblings_.create(pool);
+  header_->set_siblings_block_id(siblings_.block_id());
   chunks_.create(pool);
   header_->set_chunks_block_id(chunks_.block_id());
   entries_.create(pool);
@@ -211,6 +225,7 @@ void DoubleArrayImpl::open_double_array(io::Pool pool, uint32_t block_id) {
   recycler_ = pool_.mutable_recycler();
 
   nodes_.open(pool_, header_->nodes_block_id());
+  siblings_.open(pool_, header_->siblings_block_id());
   chunks_.open(pool_, header_->chunks_block_id());
   entries_.open(pool_, header_->entries_block_id());
   keys_.open(pool_, header_->keys_block_id());
@@ -246,8 +261,8 @@ bool DoubleArrayImpl::search_leaf(const uint8_t *ptr, uint64_t length,
 bool DoubleArrayImpl::insert_leaf(const uint8_t *ptr, uint64_t length,
                                   uint64_t &node_id, uint64_t query_pos) {
   const DoubleArrayNode node = nodes_[node_id];
-  if (node.is_terminal()) {
-    return true;
+  if (node.label() == DOUBLE_ARRAY_TERMINAL_LABEL) {
+    return false;
   } else if (node.is_leaf()) {
     const DoubleArrayKey &key = get_key(node.key_pos());
     uint64_t i = query_pos;
@@ -275,9 +290,11 @@ bool DoubleArrayImpl::insert_leaf(const uint8_t *ptr, uint64_t length,
     const uint16_t label = (query_pos < length) ?
         static_cast<uint16_t>(ptr[query_pos]) : DOUBLE_ARRAY_TERMINAL_LABEL;
     if ((node.offset() == DOUBLE_ARRAY_INVALID_OFFSET) ||
-        nodes_[node.offset() ^ label].is_phantom()) {
+        !nodes_[node.offset() ^ label].is_phantom()) {
+      // The offset of this node must be updated.
       resolve(node_id, label);
     }
+    // The new node will be the leaf node associated with the query.
     node_id = insert_node(node_id, label);
     return true;
   }
@@ -300,11 +317,12 @@ uint64_t DoubleArrayImpl::insert_node(uint64_t node_id, uint16_t label) {
 
   nodes_[next].set_label(label);
   if (node.is_leaf()) {
-//    GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_offset());
+//    GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_origin());
     nodes_[offset].set_is_origin(true);
     nodes_[next].set_key(node.key_pos(), node.key_length());
+    // TODO: Must be update at once.
   } else if (node.offset() == DOUBLE_ARRAY_INVALID_OFFSET) {
-//    GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_offset());
+//    GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_origin());
     nodes_[offset].set_is_origin(true);
 //  } else {
 //    GRN_DAT_DEBUG_THROW_IF(!nodes_[offset].is_origin());
@@ -318,22 +336,32 @@ uint64_t DoubleArrayImpl::insert_node(uint64_t node_id, uint16_t label) {
   } else if ((label == DOUBLE_ARRAY_TERMINAL_LABEL) ||
              ((child_label != DOUBLE_ARRAY_TERMINAL_LABEL) &&
               (label < child_label))) {
+    // The next node becomes the first child.
 //    GRN_DAT_DEBUG_THROW_IF(nodes_[offset ^ child_label).is_phantom());
 //    GRN_DAT_DEBUG_THROW_IF(nodes_[offset ^ child_label).label() != child_label);
-    nodes_[next].set_sibling(child_label);
+    siblings_[next] = child_label;
+    nodes_[next].set_has_sibling(true);
+if (label == 0) {
+  GRNXX_ERROR();
+  GRNXX_THROW();
+}
     nodes_[node_id].set_child(label);
   } else {
     uint64_t prev = offset ^ child_label;
 //    GRN_DAT_DEBUG_THROW_IF(nodes_[prev).label() != child_label);
-    uint16_t sibling_label = nodes_[prev].sibling();
+    uint16_t sibling_label = nodes_[prev].has_sibling() ?
+        siblings_[prev] : DOUBLE_ARRAY_INVALID_LABEL;
     while (label > sibling_label) {
       prev = offset ^ sibling_label;
 //      GRN_DAT_DEBUG_THROW_IF(nodes_[prev].label() != sibling_label);
-      sibling_label = nodes_[prev].sibling();
+      sibling_label = nodes_[prev].has_sibling() ?
+          siblings_[prev] : DOUBLE_ARRAY_INVALID_LABEL;
     }
 //    GRN_DAT_DEBUG_THROW_IF(label == sibling_label);
-    nodes_[next].set_sibling(nodes_[prev].sibling());
-    nodes_[prev].set_sibling(label);
+    siblings_[next] = siblings_[prev];
+    siblings_[prev] = label;
+    nodes_[next].set_has_sibling(nodes_[prev].has_sibling());
+    nodes_[prev].set_has_sibling(true);
   }
   return next;
 }
@@ -366,8 +394,7 @@ uint64_t DoubleArrayImpl::separate(const uint8_t *ptr, uint64_t length,
 //  GRN_DAT_DEBUG_THROW_IF(i > length);
 
   const DoubleArrayNode node = nodes_[node_id];
-  const uint64_t key_pos = node.key_pos();
-  const DoubleArrayKey &key = get_key(key_pos);
+  const DoubleArrayKey &key = get_key(node.key_pos());
 
   uint16_t labels[2];
   labels[0] = (i < node.key_length()) ?
@@ -380,10 +407,10 @@ uint64_t DoubleArrayImpl::separate(const uint8_t *ptr, uint64_t length,
 
   uint64_t next = offset ^ labels[0];
   reserve_node(next);
-//  GRN_DAT_DEBUG_THROW_IF(nodes_[offset).is_offset());
+//  GRN_DAT_DEBUG_THROW_IF(nodes_[offset).is_origin());
 
   nodes_[next].set_label(labels[0]);
-  nodes_[next].set_key(key_pos, length);
+  nodes_[next].set_key(node.key_pos(), node.key_length());
 
   next = offset ^ labels[1];
   reserve_node(next);
@@ -396,11 +423,13 @@ uint64_t DoubleArrayImpl::separate(const uint8_t *ptr, uint64_t length,
   if ((labels[0] == DOUBLE_ARRAY_TERMINAL_LABEL) ||
       ((labels[1] != DOUBLE_ARRAY_TERMINAL_LABEL) &&
        (labels[0] < labels[1]))) {
+    siblings_[offset ^ labels[0]] = labels[1];
+    nodes_[offset ^ labels[0]].set_has_sibling(true);
     nodes_[node_id].set_child(labels[0]);
-    nodes_[offset ^ labels[0]].set_sibling(labels[1]);
   } else {
+    siblings_[offset ^ labels[1]] = labels[0];
+    nodes_[offset ^ labels[1]].set_has_sibling(true);
     nodes_[node_id].set_child(labels[1]);
-    nodes_[offset ^ labels[1]].set_sibling(labels[0]);
   }
   return next;
 }
@@ -421,7 +450,8 @@ void DoubleArrayImpl::resolve(uint64_t node_id, uint16_t label) {
     while (next_label != DOUBLE_ARRAY_INVALID_LABEL) {
 //      GRN_DAT_DEBUG_THROW_IF(next_label > MAX_LABEL);
       labels[num_labels++] = next_label;
-      next_label = nodes_[offset ^ next_label].sibling();
+      next_label = nodes_[offset ^ next_label].has_sibling() ?
+          siblings_[offset ^ next_label] : DOUBLE_ARRAY_INVALID_LABEL;
     }
 //    GRN_DAT_DEBUG_THROW_IF(num_labels == 0);
 
@@ -462,6 +492,7 @@ void DoubleArrayImpl::migrate_nodes(uint64_t node_id, uint64_t dest_offset,
     DoubleArrayNode dest_node = nodes_[src_node_id];
     dest_node.set_is_origin(nodes_[dest_node_id].is_origin());
     nodes_[dest_node_id] = dest_node;
+    siblings_[dest_node_id] = siblings_[src_node_id];
   }
   header_->set_num_zombies(header_->num_zombies() + num_labels);
 
@@ -607,6 +638,7 @@ void DoubleArrayImpl::reserve_chunk(uint64_t chunk_id) {
     node.set_prev((i - 1) & DOUBLE_ARRAY_CHUNK_MASK);
     node.set_next((i + 1) & DOUBLE_ARRAY_CHUNK_MASK);
     nodes_[i] = node;
+    siblings_[i] = '\0';
   }
 
   // The level of the new chunk is 0.

  Modified: lib/alpha/double_array.hpp (+99 -57)
===================================================================
--- lib/alpha/double_array.hpp    2013-01-21 22:10:20 +0900 (05d48d9)
+++ lib/alpha/double_array.hpp    2013-01-30 16:13:32 +0900 (1847316)
@@ -23,17 +23,18 @@
 namespace grnxx {
 namespace alpha {
 
-// FIXME: To be removed when moved to grnxx::db.
+// FIXME: To be removed in future.
 using namespace grnxx::db;
 
 extern struct DoubleArrayCreate {} DOUBLE_ARRAY_CREATE;
 extern struct DoubleArrayOpen {} DOUBLE_ARRAY_OPEN;
 
-constexpr uint64_t DOUBLE_ARRAY_INVALID_ID     = 0xFFFFFFFFFFULL;
+constexpr uint64_t DOUBLE_ARRAY_MAX_ID         = (uint64_t(1) << 40) - 2;
+constexpr uint64_t DOUBLE_ARRAY_INVALID_ID     = DOUBLE_ARRAY_MAX_ID + 1;
 constexpr uint64_t DOUBLE_ARRAY_INVALID_OFFSET = 0;
 
 constexpr uint16_t DOUBLE_ARRAY_TERMINAL_LABEL  = 0x100;
-constexpr uint16_t DOUBLE_ARRAY_MAX_LABEL       = 0x100;
+constexpr uint16_t DOUBLE_ARRAY_MAX_LABEL       = DOUBLE_ARRAY_TERMINAL_LABEL;
 constexpr uint16_t DOUBLE_ARRAY_INVALID_LABEL   = 0x1FF;
 
 constexpr uint64_t DOUBLE_ARRAY_CHUNK_SIZE      = 0x200;
@@ -43,10 +44,10 @@ constexpr uint64_t DOUBLE_ARRAY_CHUNK_MASK      = 0x1FF;
 // can find a good offset in that chunk. The chunk level rises when
 // find_offset() fails in that chunk many times. DOUBLE_ARRAY_MAX_FAILURE_COUNT
 // is the threshold. Also, in order to limit the time cost, find_offset() scans
-// at most DOUBLE_ARRAY_MAX_CHUNk_COUNT chunks.
+// at most DOUBLE_ARRAY_MAX_CHUNK_COUNT chunks.
 // Larger parameters bring more chances of finding good offsets but it leads to
 // more node renumberings, which are costly operations, and thus results in
-// a degradation of space/time efficiencies.
+// degradation of space/time efficiencies.
 constexpr uint64_t DOUBLE_ARRAY_MAX_FAILURE_COUNT  = 4;
 constexpr uint64_t DOUBLE_ARRAY_MAX_CHUNK_COUNT    = 16;
 constexpr uint64_t DOUBLE_ARRAY_MAX_CHUNK_LEVEL    = 5;
@@ -56,6 +57,7 @@ constexpr uint64_t DOUBLE_ARRAY_MAX_CHUNK_LEVEL    = 5;
 // the linked list is empty and there exists no leader.
 constexpr uint64_t DOUBLE_ARRAY_INVALID_LEADER     = 0x7FFFFFFF;
 
+// The memory allocation unit size for keys.
 constexpr uint64_t DOUBLE_ARRAY_KEYS_PAGE_SIZE     =
     VECTOR_DEFAULT_PAGE_SIZE;
 
@@ -156,6 +158,9 @@ class DoubleArrayHeader {
   uint32_t nodes_block_id() const {
     return nodes_block_id_;
   }
+  uint32_t siblings_block_id() const {
+    return siblings_block_id_;
+  }
   uint32_t chunks_block_id() const {
     return chunks_block_id_;
   }
@@ -177,7 +182,7 @@ class DoubleArrayHeader {
   uint64_t next_key_pos() const {
     return next_key_pos_;
   }
-  uint64_t max_key_id() const {
+  int64_t max_key_id() const {
     return max_key_id_;
   }
   uint64_t num_keys() const {
@@ -202,6 +207,9 @@ class DoubleArrayHeader {
   void set_nodes_block_id(uint32_t value) {
     nodes_block_id_ = value;
   }
+  void set_siblings_block_id(uint32_t value) {
+    siblings_block_id_ = value;
+  }
   void set_chunks_block_id(uint32_t value) {
     chunks_block_id_ = value;
   }
@@ -223,7 +231,7 @@ class DoubleArrayHeader {
   void set_next_key_pos(uint64_t value) {
     next_key_pos_ = value;
   }
-  void set_max_key_id(uint64_t value) {
+  void set_max_key_id(int64_t value) {
     max_key_id_ = value;
   }
   void set_num_keys(uint64_t value) {
@@ -248,6 +256,7 @@ class DoubleArrayHeader {
 
  private:
   uint32_t nodes_block_id_;
+  uint32_t siblings_block_id_;
   uint32_t chunks_block_id_;
   uint32_t entries_block_id_;
   uint32_t keys_block_id_;
@@ -255,7 +264,7 @@ class DoubleArrayHeader {
   uint64_t total_key_length_;
   uint64_t next_key_id_;
   uint64_t next_key_pos_;
-  uint64_t max_key_id_;
+  int64_t max_key_id_;
   uint64_t num_keys_;
   uint64_t num_chunks_;
   uint64_t num_phantoms_;
@@ -266,6 +275,8 @@ class DoubleArrayHeader {
 
 class DoubleArrayNode {
  public:
+  DoubleArrayNode() : qword_(0) {}
+
   // The ID of this node is used as an offset (true) or not (false).
   bool is_origin() const {
     // 1 bit.
@@ -274,46 +285,48 @@ class DoubleArrayNode {
   // This node is valid (false) or not (true).
   bool is_phantom() const {
     // 1 bit.
-    return (qword_ & IS_PHANTOM_FLAG) && (~qword_ & IS_LEAF_FLAG);
+    return qword_ & IS_PHANTOM_FLAG;
   }
   // This node is associated with a key (true) or not (false).
   bool is_leaf() const {
     // 1 bit.
     return qword_ & IS_LEAF_FLAG;
   }
-  // This node is a leaf node with a valid label (false) or not (true).
-  bool is_terminal() const {
+  // This node has an elder sibling (true) or not (false).
+  bool has_sibling() const {
     // 1 bit.
-    return (qword_ & (IS_PHANTOM_FLAG | IS_LEAF_FLAG)) ==
-        (IS_PHANTOM_FLAG | IS_LEAF_FLAG);
+    return qword_ & HAS_SIBLING_FLAG;
   }
 
   void set_is_origin(bool value) {
     if (value) {
-      qword_ &= ~IS_ORIGIN_FLAG;
-    } else {
       qword_ |= IS_ORIGIN_FLAG;
+    } else {
+      qword_ &= ~IS_ORIGIN_FLAG;
     }
   }
   void set_is_phantom(bool value) {
     if (value) {
-      qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG);
+      qword_ |= IS_PHANTOM_FLAG;
     } else {
-      qword_ = (qword_ | IS_PHANTOM_FLAG) & ~IS_LEAF_FLAG;
+      qword_ = (qword_ & IS_ORIGIN_FLAG) |
+          (DOUBLE_ARRAY_INVALID_OFFSET << OFFSET_SHIFT) |
+          (uint64_t(DOUBLE_ARRAY_INVALID_LABEL) << CHILD_SHIFT) |
+          DOUBLE_ARRAY_INVALID_LABEL;
     }
   }
   void set_is_leaf(bool value) {
     if (value) {
-      qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG);
+      qword_ |= IS_LEAF_FLAG;
     } else {
-      qword_ = (qword_ | IS_LEAF_FLAG) & ~IS_PHANTOM_FLAG;
+      qword_ &= ~IS_LEAF_FLAG;
     }
   }
-  void set_is_terminal(bool value) {
+  void set_has_sibling(bool value) {
     if (value) {
-      qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG);
+      qword_ |= HAS_SIBLING_FLAG;
     } else {
-      qword_ |= IS_PHANTOM_FLAG | IS_LEAF_FLAG;
+      qword_ &= ~HAS_SIBLING_FLAG;
     }
   }
 
@@ -340,18 +353,18 @@ class DoubleArrayNode {
   // A non-phantom node stores its label.
   // A phantom node returns an invalid label with IS_PHANTOM_FLAG.
   uint64_t label() const {
-    // 8 bits.
+    // 9 bits.
     return qword_ & (IS_PHANTOM_FLAG | LABEL_MASK);
   }
 
-  void set_label(uint8_t value) {
+  void set_label(uint16_t value) {
     qword_ = (qword_ & ~LABEL_MASK) | value;
   }
 
   // A leaf node stores the start position and the length of the associated
   // key.
   uint64_t key_pos() const {
-    // 40 bits.
+    // 39 bits.
     return (qword_ >> KEY_POS_SHIFT) & KEY_POS_MASK;
   }
   uint64_t key_length() const {
@@ -362,62 +375,65 @@ class DoubleArrayNode {
   void set_key(uint64_t key_pos, uint64_t key_length) {
     qword_ = (qword_ & ~((KEY_POS_MASK << KEY_POS_SHIFT) |
                          (KEY_LENGTH_MASK << KEY_LENGTH_SHIFT))) |
-             (key_pos << KEY_POS_SHIFT) | (key_length << KEY_LENGTH_SHIFT);
+             (key_pos << KEY_POS_SHIFT) |
+             (key_length << KEY_LENGTH_SHIFT) |
+             IS_LEAF_FLAG;
   }
 
   // A non-phantom and non-leaf node stores the offset to its children,
   // the label of its next sibling, and the label of its first child.
   uint64_t offset() const {
-    // 36 bits.
-    return (qword_ >> 8) & ((uint64_t(1) << 36) - 1);
+    // 42 bits.
+    return (qword_ >> OFFSET_SHIFT) & OFFSET_MASK;
   }
   uint16_t child() const {
     // 9 bits.
-    return static_cast<uint8_t>(qword_ >> 44);
-  }
-  uint16_t sibling() const {
-    // 9 bits.
-    return static_cast<uint8_t>(qword_ >> 52);
+    return static_cast<uint16_t>((qword_ >> CHILD_SHIFT) & CHILD_MASK);
   }
 
   void set_offset(uint64_t value) {
-    qword_ = (qword_ & ~(OFFSET_MASK << OFFSET_SHIFT)) |
-             (value << OFFSET_SHIFT);
+    if (qword_ & IS_LEAF_FLAG) {
+      qword_ = ((qword_ & ~IS_LEAF_FLAG) & ~(OFFSET_MASK << OFFSET_SHIFT)) |
+               (value << OFFSET_SHIFT) |
+               (uint64_t(DOUBLE_ARRAY_INVALID_LABEL) << CHILD_SHIFT);
+    } else {
+      qword_ = (qword_ & ~(OFFSET_MASK << OFFSET_SHIFT)) |
+               (value << OFFSET_SHIFT);
+    }
   }
-  void set_child(uint8_t value) {
+  void set_child(uint16_t value) {
     qword_ = (qword_ & ~(CHILD_MASK << CHILD_SHIFT)) |
              (static_cast<uint64_t>(value) << CHILD_SHIFT);
   }
-  void set_sibling(uint8_t value) {
-    qword_ = (qword_ & ~(SIBLING_MASK << SIBLING_SHIFT)) |
-             (static_cast<uint64_t>(value) << SIBLING_SHIFT);
-  }
 
  private:
   uint64_t qword_;
 
-  static constexpr uint64_t IS_ORIGIN_FLAG    = uint64_t(1) << 63;
-  static constexpr uint64_t IS_PHANTOM_FLAG   = uint64_t(1) << 62;
-  static constexpr uint64_t IS_LEAF_FLAG      = uint64_t(1) << 61;
+  // 60 - 63.
+  static constexpr uint64_t IS_ORIGIN_FLAG   = uint64_t(1) << 63;
+  static constexpr uint64_t IS_PHANTOM_FLAG  = uint64_t(1) << 62;
+  static constexpr uint64_t IS_LEAF_FLAG     = uint64_t(1) << 61;
+  static constexpr uint64_t HAS_SIBLING_FLAG = uint64_t(1) << 60;
 
-  static constexpr uint64_t NEXT_MASK  = 0x1FF;
+  // 0 - 17.
+  static constexpr uint64_t NEXT_MASK  = (uint64_t(1) << 9) - 1;
   static constexpr uint8_t  NEXT_SHIFT = 0;
-  static constexpr uint64_t PREV_MASK  = 0x1FF;
+  static constexpr uint64_t PREV_MASK  = (uint64_t(1) << 9) - 1;
   static constexpr uint8_t  PREV_SHIFT = 9;
 
-  static constexpr uint64_t LABEL_MASK = 0xFF;
+  // 0 - 8.
+  static constexpr uint64_t LABEL_MASK = (uint64_t(1) << 9) - 1;
 
-  static constexpr uint64_t KEY_POS_MASK     = (uint64_t(1) << 41) - 1;
-  static constexpr uint8_t  KEY_POS_SHIFT    = 8;
+  // 9 - 59
+  static constexpr uint64_t KEY_POS_MASK     = (uint64_t(1) << 39) - 1;
+  static constexpr uint8_t  KEY_POS_SHIFT    = 9;
   static constexpr uint64_t KEY_LENGTH_MASK  = (uint64_t(1) << 12) - 1;
-  static constexpr uint8_t  KEY_LENGTH_SHIFT = 49;
+  static constexpr uint8_t  KEY_LENGTH_SHIFT = 48;
 
-  static constexpr uint64_t OFFSET_MASK   = (uint64_t(1) << 35) - 1;
-  static constexpr uint8_t  OFFSET_SHIFT  = 8;
+  static constexpr uint64_t OFFSET_MASK   = (uint64_t(1) << 42) - 1;
+  static constexpr uint8_t  OFFSET_SHIFT  = 9;
   static constexpr uint64_t CHILD_MASK    = (uint64_t(1) << 9) - 1;
-  static constexpr uint8_t  CHILD_SHIFT   = 43;
-  static constexpr uint64_t SIBLING_MASK  = (uint64_t(1) << 9) - 1;
-  static constexpr uint8_t  SIBLING_SHIFT = 52;
+  static constexpr uint8_t  CHILD_SHIFT   = 51;
 };
 
 class DoubleArrayChunk {
@@ -570,7 +586,7 @@ class DoubleArrayKey {
   uint8_t buf_[3];
 };
 
-// TODO
+// FIXME
 class DoubleArrayImpl {
  public:
   ~DoubleArrayImpl();
@@ -581,8 +597,6 @@ class DoubleArrayImpl {
 
   bool search(const uint8_t *ptr, uint64_t length,
               uint64_t *key_pos = nullptr);
-
-  // TODO
   bool insert(const uint8_t *ptr, uint64_t length,
               uint64_t *key_pos = nullptr);
 
@@ -611,6 +625,7 @@ class DoubleArrayImpl {
   DoubleArrayHeader *header_;
   Recycler *recycler_;
   Vector<DoubleArrayNode> nodes_;
+  Vector<uint8_t> siblings_;
   Vector<DoubleArrayChunk> chunks_;
   Vector<DoubleArrayEntry> entries_;
   Vector<uint32_t> keys_;
@@ -669,6 +684,33 @@ class DoubleArray {
     *this = DoubleArray();
   }
 
+  bool search(const void *ptr, uint64_t length,
+              uint64_t *key_id = nullptr) {
+    if (key_id) {
+      uint64_t key_pos;
+      if (!impl_->search(static_cast<const uint8_t *>(ptr), length, &key_pos)) {
+        return false;
+      }
+      *key_id = impl_->get_key(key_pos).id();
+      return true;
+    } else {
+      return impl_->search(static_cast<const uint8_t *>(ptr), length, nullptr);
+    }
+  }
+  bool insert(const void *ptr, uint64_t length,
+              uint64_t *key_id = nullptr) {
+    if (key_id) {
+      uint64_t key_pos;
+      if (!impl_->insert(static_cast<const uint8_t *>(ptr), length, &key_pos)) {
+        return false;
+      }
+      *key_id = impl_->get_key(key_pos).id();
+      return true;
+    } else {
+      return impl_->insert(static_cast<const uint8_t *>(ptr), length, nullptr);
+    }
+  }
+
   uint32_t block_id() const {
     return impl_->block_id();
   }
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index