susumu.yata
null+****@clear*****
Wed Jan 30 16:13:32 JST 2013
susumu.yata 2013-01-30 16:13:32 +0900 (Wed, 30 Jan 2013) New Revision: 2086f7d8fbf3a9443d327fe32fe308a17be03654 https://github.com/groonga/grnxx/commit/2086f7d8fbf3a9443d327fe32fe308a17be03654 Log: Fix bugs of grnxx::alpha::DoubleArray::insert(). Modified files: lib/alpha/double_array.cpp lib/alpha/double_array.hpp Modified: lib/alpha/double_array.cpp (+52 -20) =================================================================== --- lib/alpha/double_array.cpp 2013-01-21 22:10:20 +0900 (00a37e4) +++ lib/alpha/double_array.cpp 2013-01-30 16:13:32 +0900 (a387280) @@ -29,13 +29,15 @@ DoubleArrayOpen DOUBLE_ARRAY_OPEN; DoubleArrayHeader::DoubleArrayHeader() : nodes_block_id_(io::BLOCK_INVALID_ID), + siblings_block_id_(io::BLOCK_INVALID_ID), chunks_block_id_(io::BLOCK_INVALID_ID), entries_block_id_(io::BLOCK_INVALID_ID), keys_block_id_(io::BLOCK_INVALID_ID), root_node_id_(0), total_key_length_(0), next_key_id_(0), - max_key_id_(0), + next_key_pos_(0), + max_key_id_(-1), num_keys_(0), num_chunks_(0), num_phantoms_(0), @@ -57,9 +59,13 @@ DoubleArrayKey::DoubleArrayKey(uint64_t id, const void *address, DoubleArrayImpl::~DoubleArrayImpl() { if (!initialized_) try { + // Allocated blocks are unlinked if initialization failed. if (header_->nodes_block_id() != io::BLOCK_INVALID_ID) { nodes_.unlink(pool_, header_->nodes_block_id()); } + if (header_->siblings_block_id() != io::BLOCK_INVALID_ID) { + siblings_.unlink(pool_, header_->siblings_block_id()); + } if (header_->chunks_block_id() != io::BLOCK_INVALID_ID) { chunks_.unlink(pool_, header_->chunks_block_id()); } @@ -105,6 +111,7 @@ bool DoubleArrayImpl::search(const uint8_t *ptr, uint64_t length, return false; } + // Note that nodes_[node_id] might be updated by other threads/processes. const DoubleArrayNode node = nodes_[node_id]; if (!node.is_leaf()) { return false; @@ -123,7 +130,8 @@ bool DoubleArrayImpl::search(const uint8_t *ptr, uint64_t length, bool DoubleArrayImpl::insert(const uint8_t *ptr, uint64_t length, uint64_t *key_pos) { - // TODO + // TODO: Exclusive access control is required. + // GRN_DAT_THROW_IF(STATUS_ERROR, (status_flags() & CHANGING_MASK) != 0); // StatusFlagManager status_flag_manager(header_, INSERTING_FLAG); @@ -140,11 +148,14 @@ bool DoubleArrayImpl::insert(const uint8_t *ptr, uint64_t length, return false; } - const uint64_t new_key_id = header_->next_key_id(); + const int64_t new_key_id = header_->next_key_id(); const uint64_t new_key_pos = append_key(ptr, length, new_key_id); header_->set_total_key_length(header_->total_key_length() + length); header_->set_num_keys(header_->num_keys() + 1); + + // TODO: The first key ID should be fixed to 0 or 1. + // Currently, 0 is used. if (new_key_id > header_->max_key_id()) { header_->set_max_key_id(new_key_id); header_->set_next_key_id(new_key_id + 1); @@ -166,6 +177,7 @@ DoubleArrayImpl::DoubleArrayImpl() header_(nullptr), recycler_(nullptr), nodes_(), + siblings_(), chunks_(), entries_(), keys_(), @@ -184,6 +196,8 @@ void DoubleArrayImpl::create_double_array(io::Pool pool) { nodes_.create(pool); header_->set_nodes_block_id(nodes_.block_id()); + siblings_.create(pool); + header_->set_siblings_block_id(siblings_.block_id()); chunks_.create(pool); header_->set_chunks_block_id(chunks_.block_id()); entries_.create(pool); @@ -211,6 +225,7 @@ void DoubleArrayImpl::open_double_array(io::Pool pool, uint32_t block_id) { recycler_ = pool_.mutable_recycler(); nodes_.open(pool_, header_->nodes_block_id()); + siblings_.open(pool_, header_->siblings_block_id()); chunks_.open(pool_, header_->chunks_block_id()); entries_.open(pool_, header_->entries_block_id()); keys_.open(pool_, header_->keys_block_id()); @@ -246,8 +261,8 @@ bool DoubleArrayImpl::search_leaf(const uint8_t *ptr, uint64_t length, bool DoubleArrayImpl::insert_leaf(const uint8_t *ptr, uint64_t length, uint64_t &node_id, uint64_t query_pos) { const DoubleArrayNode node = nodes_[node_id]; - if (node.is_terminal()) { - return true; + if (node.label() == DOUBLE_ARRAY_TERMINAL_LABEL) { + return false; } else if (node.is_leaf()) { const DoubleArrayKey &key = get_key(node.key_pos()); uint64_t i = query_pos; @@ -275,9 +290,11 @@ bool DoubleArrayImpl::insert_leaf(const uint8_t *ptr, uint64_t length, const uint16_t label = (query_pos < length) ? static_cast<uint16_t>(ptr[query_pos]) : DOUBLE_ARRAY_TERMINAL_LABEL; if ((node.offset() == DOUBLE_ARRAY_INVALID_OFFSET) || - nodes_[node.offset() ^ label].is_phantom()) { + !nodes_[node.offset() ^ label].is_phantom()) { + // The offset of this node must be updated. resolve(node_id, label); } + // The new node will be the leaf node associated with the query. node_id = insert_node(node_id, label); return true; } @@ -300,11 +317,12 @@ uint64_t DoubleArrayImpl::insert_node(uint64_t node_id, uint16_t label) { nodes_[next].set_label(label); if (node.is_leaf()) { -// GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_offset()); +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_origin()); nodes_[offset].set_is_origin(true); nodes_[next].set_key(node.key_pos(), node.key_length()); + // TODO: Must be update at once. } else if (node.offset() == DOUBLE_ARRAY_INVALID_OFFSET) { -// GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_offset()); +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_origin()); nodes_[offset].set_is_origin(true); // } else { // GRN_DAT_DEBUG_THROW_IF(!nodes_[offset].is_origin()); @@ -318,22 +336,32 @@ uint64_t DoubleArrayImpl::insert_node(uint64_t node_id, uint16_t label) { } else if ((label == DOUBLE_ARRAY_TERMINAL_LABEL) || ((child_label != DOUBLE_ARRAY_TERMINAL_LABEL) && (label < child_label))) { + // The next node becomes the first child. // GRN_DAT_DEBUG_THROW_IF(nodes_[offset ^ child_label).is_phantom()); // GRN_DAT_DEBUG_THROW_IF(nodes_[offset ^ child_label).label() != child_label); - nodes_[next].set_sibling(child_label); + siblings_[next] = child_label; + nodes_[next].set_has_sibling(true); +if (label == 0) { + GRNXX_ERROR(); + GRNXX_THROW(); +} nodes_[node_id].set_child(label); } else { uint64_t prev = offset ^ child_label; // GRN_DAT_DEBUG_THROW_IF(nodes_[prev).label() != child_label); - uint16_t sibling_label = nodes_[prev].sibling(); + uint16_t sibling_label = nodes_[prev].has_sibling() ? + siblings_[prev] : DOUBLE_ARRAY_INVALID_LABEL; while (label > sibling_label) { prev = offset ^ sibling_label; // GRN_DAT_DEBUG_THROW_IF(nodes_[prev].label() != sibling_label); - sibling_label = nodes_[prev].sibling(); + sibling_label = nodes_[prev].has_sibling() ? + siblings_[prev] : DOUBLE_ARRAY_INVALID_LABEL; } // GRN_DAT_DEBUG_THROW_IF(label == sibling_label); - nodes_[next].set_sibling(nodes_[prev].sibling()); - nodes_[prev].set_sibling(label); + siblings_[next] = siblings_[prev]; + siblings_[prev] = label; + nodes_[next].set_has_sibling(nodes_[prev].has_sibling()); + nodes_[prev].set_has_sibling(true); } return next; } @@ -366,8 +394,7 @@ uint64_t DoubleArrayImpl::separate(const uint8_t *ptr, uint64_t length, // GRN_DAT_DEBUG_THROW_IF(i > length); const DoubleArrayNode node = nodes_[node_id]; - const uint64_t key_pos = node.key_pos(); - const DoubleArrayKey &key = get_key(key_pos); + const DoubleArrayKey &key = get_key(node.key_pos()); uint16_t labels[2]; labels[0] = (i < node.key_length()) ? @@ -380,10 +407,10 @@ uint64_t DoubleArrayImpl::separate(const uint8_t *ptr, uint64_t length, uint64_t next = offset ^ labels[0]; reserve_node(next); -// GRN_DAT_DEBUG_THROW_IF(nodes_[offset).is_offset()); +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset).is_origin()); nodes_[next].set_label(labels[0]); - nodes_[next].set_key(key_pos, length); + nodes_[next].set_key(node.key_pos(), node.key_length()); next = offset ^ labels[1]; reserve_node(next); @@ -396,11 +423,13 @@ uint64_t DoubleArrayImpl::separate(const uint8_t *ptr, uint64_t length, if ((labels[0] == DOUBLE_ARRAY_TERMINAL_LABEL) || ((labels[1] != DOUBLE_ARRAY_TERMINAL_LABEL) && (labels[0] < labels[1]))) { + siblings_[offset ^ labels[0]] = labels[1]; + nodes_[offset ^ labels[0]].set_has_sibling(true); nodes_[node_id].set_child(labels[0]); - nodes_[offset ^ labels[0]].set_sibling(labels[1]); } else { + siblings_[offset ^ labels[1]] = labels[0]; + nodes_[offset ^ labels[1]].set_has_sibling(true); nodes_[node_id].set_child(labels[1]); - nodes_[offset ^ labels[1]].set_sibling(labels[0]); } return next; } @@ -421,7 +450,8 @@ void DoubleArrayImpl::resolve(uint64_t node_id, uint16_t label) { while (next_label != DOUBLE_ARRAY_INVALID_LABEL) { // GRN_DAT_DEBUG_THROW_IF(next_label > MAX_LABEL); labels[num_labels++] = next_label; - next_label = nodes_[offset ^ next_label].sibling(); + next_label = nodes_[offset ^ next_label].has_sibling() ? + siblings_[offset ^ next_label] : DOUBLE_ARRAY_INVALID_LABEL; } // GRN_DAT_DEBUG_THROW_IF(num_labels == 0); @@ -462,6 +492,7 @@ void DoubleArrayImpl::migrate_nodes(uint64_t node_id, uint64_t dest_offset, DoubleArrayNode dest_node = nodes_[src_node_id]; dest_node.set_is_origin(nodes_[dest_node_id].is_origin()); nodes_[dest_node_id] = dest_node; + siblings_[dest_node_id] = siblings_[src_node_id]; } header_->set_num_zombies(header_->num_zombies() + num_labels); @@ -607,6 +638,7 @@ void DoubleArrayImpl::reserve_chunk(uint64_t chunk_id) { node.set_prev((i - 1) & DOUBLE_ARRAY_CHUNK_MASK); node.set_next((i + 1) & DOUBLE_ARRAY_CHUNK_MASK); nodes_[i] = node; + siblings_[i] = '\0'; } // The level of the new chunk is 0. Modified: lib/alpha/double_array.hpp (+99 -57) =================================================================== --- lib/alpha/double_array.hpp 2013-01-21 22:10:20 +0900 (05d48d9) +++ lib/alpha/double_array.hpp 2013-01-30 16:13:32 +0900 (1847316) @@ -23,17 +23,18 @@ namespace grnxx { namespace alpha { -// FIXME: To be removed when moved to grnxx::db. +// FIXME: To be removed in future. using namespace grnxx::db; extern struct DoubleArrayCreate {} DOUBLE_ARRAY_CREATE; extern struct DoubleArrayOpen {} DOUBLE_ARRAY_OPEN; -constexpr uint64_t DOUBLE_ARRAY_INVALID_ID = 0xFFFFFFFFFFULL; +constexpr uint64_t DOUBLE_ARRAY_MAX_ID = (uint64_t(1) << 40) - 2; +constexpr uint64_t DOUBLE_ARRAY_INVALID_ID = DOUBLE_ARRAY_MAX_ID + 1; constexpr uint64_t DOUBLE_ARRAY_INVALID_OFFSET = 0; constexpr uint16_t DOUBLE_ARRAY_TERMINAL_LABEL = 0x100; -constexpr uint16_t DOUBLE_ARRAY_MAX_LABEL = 0x100; +constexpr uint16_t DOUBLE_ARRAY_MAX_LABEL = DOUBLE_ARRAY_TERMINAL_LABEL; constexpr uint16_t DOUBLE_ARRAY_INVALID_LABEL = 0x1FF; constexpr uint64_t DOUBLE_ARRAY_CHUNK_SIZE = 0x200; @@ -43,10 +44,10 @@ constexpr uint64_t DOUBLE_ARRAY_CHUNK_MASK = 0x1FF; // can find a good offset in that chunk. The chunk level rises when // find_offset() fails in that chunk many times. DOUBLE_ARRAY_MAX_FAILURE_COUNT // is the threshold. Also, in order to limit the time cost, find_offset() scans -// at most DOUBLE_ARRAY_MAX_CHUNk_COUNT chunks. +// at most DOUBLE_ARRAY_MAX_CHUNK_COUNT chunks. // Larger parameters bring more chances of finding good offsets but it leads to // more node renumberings, which are costly operations, and thus results in -// a degradation of space/time efficiencies. +// degradation of space/time efficiencies. constexpr uint64_t DOUBLE_ARRAY_MAX_FAILURE_COUNT = 4; constexpr uint64_t DOUBLE_ARRAY_MAX_CHUNK_COUNT = 16; constexpr uint64_t DOUBLE_ARRAY_MAX_CHUNK_LEVEL = 5; @@ -56,6 +57,7 @@ constexpr uint64_t DOUBLE_ARRAY_MAX_CHUNK_LEVEL = 5; // the linked list is empty and there exists no leader. constexpr uint64_t DOUBLE_ARRAY_INVALID_LEADER = 0x7FFFFFFF; +// The memory allocation unit size for keys. constexpr uint64_t DOUBLE_ARRAY_KEYS_PAGE_SIZE = VECTOR_DEFAULT_PAGE_SIZE; @@ -156,6 +158,9 @@ class DoubleArrayHeader { uint32_t nodes_block_id() const { return nodes_block_id_; } + uint32_t siblings_block_id() const { + return siblings_block_id_; + } uint32_t chunks_block_id() const { return chunks_block_id_; } @@ -177,7 +182,7 @@ class DoubleArrayHeader { uint64_t next_key_pos() const { return next_key_pos_; } - uint64_t max_key_id() const { + int64_t max_key_id() const { return max_key_id_; } uint64_t num_keys() const { @@ -202,6 +207,9 @@ class DoubleArrayHeader { void set_nodes_block_id(uint32_t value) { nodes_block_id_ = value; } + void set_siblings_block_id(uint32_t value) { + siblings_block_id_ = value; + } void set_chunks_block_id(uint32_t value) { chunks_block_id_ = value; } @@ -223,7 +231,7 @@ class DoubleArrayHeader { void set_next_key_pos(uint64_t value) { next_key_pos_ = value; } - void set_max_key_id(uint64_t value) { + void set_max_key_id(int64_t value) { max_key_id_ = value; } void set_num_keys(uint64_t value) { @@ -248,6 +256,7 @@ class DoubleArrayHeader { private: uint32_t nodes_block_id_; + uint32_t siblings_block_id_; uint32_t chunks_block_id_; uint32_t entries_block_id_; uint32_t keys_block_id_; @@ -255,7 +264,7 @@ class DoubleArrayHeader { uint64_t total_key_length_; uint64_t next_key_id_; uint64_t next_key_pos_; - uint64_t max_key_id_; + int64_t max_key_id_; uint64_t num_keys_; uint64_t num_chunks_; uint64_t num_phantoms_; @@ -266,6 +275,8 @@ class DoubleArrayHeader { class DoubleArrayNode { public: + DoubleArrayNode() : qword_(0) {} + // The ID of this node is used as an offset (true) or not (false). bool is_origin() const { // 1 bit. @@ -274,46 +285,48 @@ class DoubleArrayNode { // This node is valid (false) or not (true). bool is_phantom() const { // 1 bit. - return (qword_ & IS_PHANTOM_FLAG) && (~qword_ & IS_LEAF_FLAG); + return qword_ & IS_PHANTOM_FLAG; } // This node is associated with a key (true) or not (false). bool is_leaf() const { // 1 bit. return qword_ & IS_LEAF_FLAG; } - // This node is a leaf node with a valid label (false) or not (true). - bool is_terminal() const { + // This node has an elder sibling (true) or not (false). + bool has_sibling() const { // 1 bit. - return (qword_ & (IS_PHANTOM_FLAG | IS_LEAF_FLAG)) == - (IS_PHANTOM_FLAG | IS_LEAF_FLAG); + return qword_ & HAS_SIBLING_FLAG; } void set_is_origin(bool value) { if (value) { - qword_ &= ~IS_ORIGIN_FLAG; - } else { qword_ |= IS_ORIGIN_FLAG; + } else { + qword_ &= ~IS_ORIGIN_FLAG; } } void set_is_phantom(bool value) { if (value) { - qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG); + qword_ |= IS_PHANTOM_FLAG; } else { - qword_ = (qword_ | IS_PHANTOM_FLAG) & ~IS_LEAF_FLAG; + qword_ = (qword_ & IS_ORIGIN_FLAG) | + (DOUBLE_ARRAY_INVALID_OFFSET << OFFSET_SHIFT) | + (uint64_t(DOUBLE_ARRAY_INVALID_LABEL) << CHILD_SHIFT) | + DOUBLE_ARRAY_INVALID_LABEL; } } void set_is_leaf(bool value) { if (value) { - qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG); + qword_ |= IS_LEAF_FLAG; } else { - qword_ = (qword_ | IS_LEAF_FLAG) & ~IS_PHANTOM_FLAG; + qword_ &= ~IS_LEAF_FLAG; } } - void set_is_terminal(bool value) { + void set_has_sibling(bool value) { if (value) { - qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG); + qword_ |= HAS_SIBLING_FLAG; } else { - qword_ |= IS_PHANTOM_FLAG | IS_LEAF_FLAG; + qword_ &= ~HAS_SIBLING_FLAG; } } @@ -340,18 +353,18 @@ class DoubleArrayNode { // A non-phantom node stores its label. // A phantom node returns an invalid label with IS_PHANTOM_FLAG. uint64_t label() const { - // 8 bits. + // 9 bits. return qword_ & (IS_PHANTOM_FLAG | LABEL_MASK); } - void set_label(uint8_t value) { + void set_label(uint16_t value) { qword_ = (qword_ & ~LABEL_MASK) | value; } // A leaf node stores the start position and the length of the associated // key. uint64_t key_pos() const { - // 40 bits. + // 39 bits. return (qword_ >> KEY_POS_SHIFT) & KEY_POS_MASK; } uint64_t key_length() const { @@ -362,62 +375,65 @@ class DoubleArrayNode { void set_key(uint64_t key_pos, uint64_t key_length) { qword_ = (qword_ & ~((KEY_POS_MASK << KEY_POS_SHIFT) | (KEY_LENGTH_MASK << KEY_LENGTH_SHIFT))) | - (key_pos << KEY_POS_SHIFT) | (key_length << KEY_LENGTH_SHIFT); + (key_pos << KEY_POS_SHIFT) | + (key_length << KEY_LENGTH_SHIFT) | + IS_LEAF_FLAG; } // A non-phantom and non-leaf node stores the offset to its children, // the label of its next sibling, and the label of its first child. uint64_t offset() const { - // 36 bits. - return (qword_ >> 8) & ((uint64_t(1) << 36) - 1); + // 42 bits. + return (qword_ >> OFFSET_SHIFT) & OFFSET_MASK; } uint16_t child() const { // 9 bits. - return static_cast<uint8_t>(qword_ >> 44); - } - uint16_t sibling() const { - // 9 bits. - return static_cast<uint8_t>(qword_ >> 52); + return static_cast<uint16_t>((qword_ >> CHILD_SHIFT) & CHILD_MASK); } void set_offset(uint64_t value) { - qword_ = (qword_ & ~(OFFSET_MASK << OFFSET_SHIFT)) | - (value << OFFSET_SHIFT); + if (qword_ & IS_LEAF_FLAG) { + qword_ = ((qword_ & ~IS_LEAF_FLAG) & ~(OFFSET_MASK << OFFSET_SHIFT)) | + (value << OFFSET_SHIFT) | + (uint64_t(DOUBLE_ARRAY_INVALID_LABEL) << CHILD_SHIFT); + } else { + qword_ = (qword_ & ~(OFFSET_MASK << OFFSET_SHIFT)) | + (value << OFFSET_SHIFT); + } } - void set_child(uint8_t value) { + void set_child(uint16_t value) { qword_ = (qword_ & ~(CHILD_MASK << CHILD_SHIFT)) | (static_cast<uint64_t>(value) << CHILD_SHIFT); } - void set_sibling(uint8_t value) { - qword_ = (qword_ & ~(SIBLING_MASK << SIBLING_SHIFT)) | - (static_cast<uint64_t>(value) << SIBLING_SHIFT); - } private: uint64_t qword_; - static constexpr uint64_t IS_ORIGIN_FLAG = uint64_t(1) << 63; - static constexpr uint64_t IS_PHANTOM_FLAG = uint64_t(1) << 62; - static constexpr uint64_t IS_LEAF_FLAG = uint64_t(1) << 61; + // 60 - 63. + static constexpr uint64_t IS_ORIGIN_FLAG = uint64_t(1) << 63; + static constexpr uint64_t IS_PHANTOM_FLAG = uint64_t(1) << 62; + static constexpr uint64_t IS_LEAF_FLAG = uint64_t(1) << 61; + static constexpr uint64_t HAS_SIBLING_FLAG = uint64_t(1) << 60; - static constexpr uint64_t NEXT_MASK = 0x1FF; + // 0 - 17. + static constexpr uint64_t NEXT_MASK = (uint64_t(1) << 9) - 1; static constexpr uint8_t NEXT_SHIFT = 0; - static constexpr uint64_t PREV_MASK = 0x1FF; + static constexpr uint64_t PREV_MASK = (uint64_t(1) << 9) - 1; static constexpr uint8_t PREV_SHIFT = 9; - static constexpr uint64_t LABEL_MASK = 0xFF; + // 0 - 8. + static constexpr uint64_t LABEL_MASK = (uint64_t(1) << 9) - 1; - static constexpr uint64_t KEY_POS_MASK = (uint64_t(1) << 41) - 1; - static constexpr uint8_t KEY_POS_SHIFT = 8; + // 9 - 59 + static constexpr uint64_t KEY_POS_MASK = (uint64_t(1) << 39) - 1; + static constexpr uint8_t KEY_POS_SHIFT = 9; static constexpr uint64_t KEY_LENGTH_MASK = (uint64_t(1) << 12) - 1; - static constexpr uint8_t KEY_LENGTH_SHIFT = 49; + static constexpr uint8_t KEY_LENGTH_SHIFT = 48; - static constexpr uint64_t OFFSET_MASK = (uint64_t(1) << 35) - 1; - static constexpr uint8_t OFFSET_SHIFT = 8; + static constexpr uint64_t OFFSET_MASK = (uint64_t(1) << 42) - 1; + static constexpr uint8_t OFFSET_SHIFT = 9; static constexpr uint64_t CHILD_MASK = (uint64_t(1) << 9) - 1; - static constexpr uint8_t CHILD_SHIFT = 43; - static constexpr uint64_t SIBLING_MASK = (uint64_t(1) << 9) - 1; - static constexpr uint8_t SIBLING_SHIFT = 52; + static constexpr uint8_t CHILD_SHIFT = 51; }; class DoubleArrayChunk { @@ -570,7 +586,7 @@ class DoubleArrayKey { uint8_t buf_[3]; }; -// TODO +// FIXME class DoubleArrayImpl { public: ~DoubleArrayImpl(); @@ -581,8 +597,6 @@ class DoubleArrayImpl { bool search(const uint8_t *ptr, uint64_t length, uint64_t *key_pos = nullptr); - - // TODO bool insert(const uint8_t *ptr, uint64_t length, uint64_t *key_pos = nullptr); @@ -611,6 +625,7 @@ class DoubleArrayImpl { DoubleArrayHeader *header_; Recycler *recycler_; Vector<DoubleArrayNode> nodes_; + Vector<uint8_t> siblings_; Vector<DoubleArrayChunk> chunks_; Vector<DoubleArrayEntry> entries_; Vector<uint32_t> keys_; @@ -669,6 +684,33 @@ class DoubleArray { *this = DoubleArray(); } + bool search(const void *ptr, uint64_t length, + uint64_t *key_id = nullptr) { + if (key_id) { + uint64_t key_pos; + if (!impl_->search(static_cast<const uint8_t *>(ptr), length, &key_pos)) { + return false; + } + *key_id = impl_->get_key(key_pos).id(); + return true; + } else { + return impl_->search(static_cast<const uint8_t *>(ptr), length, nullptr); + } + } + bool insert(const void *ptr, uint64_t length, + uint64_t *key_id = nullptr) { + if (key_id) { + uint64_t key_pos; + if (!impl_->insert(static_cast<const uint8_t *>(ptr), length, &key_pos)) { + return false; + } + *key_id = impl_->get_key(key_pos).id(); + return true; + } else { + return impl_->insert(static_cast<const uint8_t *>(ptr), length, nullptr); + } + } + uint32_t block_id() const { return impl_->block_id(); } -------------- next part -------------- HTML����������������������������...Download