susumu.yata
null+****@clear*****
Wed Jul 24 18:37:56 JST 2013
susumu.yata 2013-07-24 18:37:56 +0900 (Wed, 24 Jul 2013) New Revision: 87c8f75b896bd33d3263a3a6b5096d394d668a09 https://github.com/groonga/grnxx/commit/87c8f75b896bd33d3263a3a6b5096d394d668a09 Message: Update error handling of grnxx::map::DoubleArray. Modified files: lib/grnxx/map/double_array.cpp lib/grnxx/map/double_array.hpp Modified: lib/grnxx/map/double_array.cpp (+67 -143) =================================================================== --- lib/grnxx/map/double_array.cpp 2013-07-24 17:05:52 +0900 (1753519) +++ lib/grnxx/map/double_array.cpp 2013-07-24 18:37:56 +0900 (a0d2665) @@ -93,14 +93,14 @@ DoubleArrayHeader::operator bool() const { template <typename T> Map<T> *DoubleArray<T>::create(Storage *, uint32_t, const MapOptions &) { - GRNXX_ERROR() << "invalid combination"; - return nullptr; + GRNXX_ERROR() << "unsupported type"; + throw LogicError(); } template <typename T> Map<T> *DoubleArray<T>::open(Storage *, uint32_t) { - GRNXX_ERROR() << "invalid combination"; - return nullptr; + GRNXX_ERROR() << "unsupported type"; + throw LogicError(); } template class DoubleArray<int8_t>; @@ -143,7 +143,6 @@ DoubleArray<Bytes> *DoubleArray<Bytes>::open(Storage *storage, if (!map) { GRNXX_ERROR() << "new grnxx::map::DoubleArray failed"; throw MemoryError(); - return nullptr; } map->open_map(storage, storage_node_id); return map.release(); @@ -176,7 +175,7 @@ bool DoubleArray<Bytes>::get(int64_t key_id, Key *key) { bool DoubleArray<Bytes>::unset(int64_t key_id) { Key key; if (!get(key_id, &key)) { - // Not found or error. + // Not found. return false; } return remove(key); @@ -185,7 +184,7 @@ bool DoubleArray<Bytes>::unset(int64_t key_id) { bool DoubleArray<Bytes>::reset(int64_t key_id, KeyArg dest_key) { Key src_key; if (!get(key_id, &src_key)) { - // Not found or error. + // Not found. return false; } return replace(src_key, dest_key); @@ -237,24 +236,12 @@ bool DoubleArray<Bytes>::find(KeyArg key, int64_t *key_id) { bool DoubleArray<Bytes>::add(KeyArg key, int64_t *key_id) { Node *node; uint64_t key_pos; - if (find_leaf(key, &node, &key_pos) == DOUBLE_ARRAY_FAILED) { - // Error. - return false; - } - switch (insert_leaf(key, node, key_pos, &node)) { - case DOUBLE_ARRAY_FOUND: { - if (key_id) { - *key_id = node->key_id(); - } - return false; - } - case DOUBLE_ARRAY_INSERTED: { - break; - } - default: { - // Error. - return false; + find_leaf(key, &node, &key_pos); + if (!insert_leaf(key, node, key_pos, &node)) { + if (key_id) { + *key_id = node->key_id(); } + return false; } const int64_t next_key_id = pool_->add(key); node->set_key_id(next_key_id); @@ -267,8 +254,8 @@ bool DoubleArray<Bytes>::add(KeyArg key, int64_t *key_id) { bool DoubleArray<Bytes>::remove(KeyArg key) { Node *node; uint64_t key_pos; - if (find_leaf(key, &node, &key_pos) != DOUBLE_ARRAY_FOUND) { - // Not found or error. + if (!find_leaf(key, &node, &key_pos)) { + // Not found. return false; } Key stored_key; @@ -289,11 +276,11 @@ bool DoubleArray<Bytes>::replace(KeyArg src_key, KeyArg dest_key, int64_t *key_id) { int64_t src_key_id; if (!find(src_key, &src_key_id)) { - // Not found or error. + // Not found. return false; } if (!replace_key(src_key_id, src_key, dest_key)) { - // Found or error. + // Found. return false; } if (key_id) { @@ -458,25 +445,15 @@ bool DoubleArray<Bytes>::replace_key(int64_t key_id, KeyArg src_key, KeyArg dest_key) { Node *dest_node; uint64_t key_pos; - if (find_leaf(dest_key, &dest_node, &key_pos) == DOUBLE_ARRAY_FAILED) { + find_leaf(dest_key, &dest_node, &key_pos); + if (!insert_leaf(dest_key, dest_node, key_pos, &dest_node)) { return false; } - switch (insert_leaf(dest_key, dest_node, key_pos, &dest_node)) { - case DOUBLE_ARRAY_FOUND: { - return false; - } - case DOUBLE_ARRAY_INSERTED: { - break; - } - default: { - // Error. - return false; - } - } Node *src_node; - if (find_leaf(src_key, &src_node, &key_pos) != DOUBLE_ARRAY_FOUND) { + if (!find_leaf(src_key, &src_node, &key_pos)) { // Critical error. - return false; + GRNXX_ERROR() << "not found: src_key = " << src_key; + throw LogicError(); } pool_->reset(key_id, dest_key); dest_node->set_key_id(key_id); @@ -484,47 +461,50 @@ bool DoubleArray<Bytes>::replace_key(int64_t key_id, KeyArg src_key, return true; } -DoubleArrayResult DoubleArray<Bytes>::find_leaf(KeyArg key, Node **leaf_node, - uint64_t *leaf_key_pos) { +bool DoubleArray<Bytes>::find_leaf(KeyArg key, Node **leaf_node, + uint64_t *leaf_key_pos) { Node *node = &nodes_->get_value(ROOT_NODE_ID); uint64_t key_pos; for (key_pos = 0; key_pos < key.size(); ++key_pos) { if (node->is_leaf()) { + // Found. *leaf_node = node; *leaf_key_pos = key_pos; - return DOUBLE_ARRAY_FOUND; + return true; } const uint64_t child_node_id = node->offset() ^ key[key_pos]; Node * const child_node = &nodes_->get_value(child_node_id); if (child_node->label() != key[key_pos]) { + // Not found. *leaf_node = node; *leaf_key_pos = key_pos; - return DOUBLE_ARRAY_NOT_FOUND; + return false; } node = child_node; } *leaf_node = node; *leaf_key_pos = key_pos; if (node->is_leaf()) { - return DOUBLE_ARRAY_FOUND; + // Found. + return true; } if (node->child() != NODE_TERMINAL_LABEL) { - return DOUBLE_ARRAY_NOT_FOUND; + // Not found. + return false; } const uint64_t node_id = node->offset() ^ NODE_TERMINAL_LABEL; node = &nodes_->get_value(node_id); *leaf_node = node; - return node->is_leaf() ? DOUBLE_ARRAY_FOUND : DOUBLE_ARRAY_NOT_FOUND; + return node->is_leaf(); } -DoubleArrayResult DoubleArray<Bytes>::insert_leaf(KeyArg key, Node *node, - uint64_t key_pos, - Node **leaf_node) { +bool DoubleArray<Bytes>::insert_leaf(KeyArg key, Node *node, + uint64_t key_pos, Node **leaf_node) { if (node->is_leaf()) { Key stored_key; if (!pool_->get(node->key_id(), &stored_key)) { - // Not found. - return DOUBLE_ARRAY_NOT_FOUND; + GRNXX_ERROR() << "not found: key = " << key << ", key_pos = " << key_pos; + throw LogicError(); } uint64_t i = key_pos; while ((i < key.size()) && (i < stored_key.size())) { @@ -534,54 +514,38 @@ DoubleArrayResult DoubleArray<Bytes>::insert_leaf(KeyArg key, Node *node, ++i; } if ((i == key.size()) && (i == stored_key.size())) { - return DOUBLE_ARRAY_FOUND; + return false; } while (key_pos < i) { - if (!insert_node(node, key[key_pos], &node)) { - return DOUBLE_ARRAY_FAILED; - } + node = insert_node(node, key[key_pos]); ++key_pos; } uint64_t labels[2]; labels[0] = (key_pos < stored_key.size()) ? stored_key[key_pos] : NODE_TERMINAL_LABEL; labels[1] = (key_pos < key.size()) ? key[key_pos] : NODE_TERMINAL_LABEL; - if (!separate(node, labels, leaf_node)) { - return DOUBLE_ARRAY_FAILED; - } - return DOUBLE_ARRAY_INSERTED; + *leaf_node = separate(node, labels); + return true; } else if (node->label() == NODE_TERMINAL_LABEL) { *leaf_node = node; - return DOUBLE_ARRAY_INSERTED; + return true; } else { const uint64_t label = (key_pos < key.size()) ? key[key_pos] : NODE_TERMINAL_LABEL; - if (!resolve(node, label)) { - return DOUBLE_ARRAY_FAILED; - } - if (!insert_node(node, label, leaf_node)) { - return DOUBLE_ARRAY_FAILED; - } - return DOUBLE_ARRAY_INSERTED; + resolve(node, label); + *leaf_node = insert_node(node, label); + return true; } } -bool DoubleArray<Bytes>::insert_node(Node *node, uint64_t label, - Node **dest_node) { +auto DoubleArray<Bytes>::insert_node(Node *node, uint64_t label) -> Node * { uint64_t offset = node->offset(); if (node->is_leaf() || (offset == NODE_INVALID_OFFSET)) { - if (!find_offset(&label, 1, &offset)) { - // Error. - return false; - } + offset = find_offset(&label, 1); } const uint64_t next_node_id = offset ^ label; uint8_t *next_sibling = &siblings_->get_value(next_node_id); Node * const next_node = reserve_node(next_node_id); - if (!next_node) { - // Error. - return false; - } next_node->set_label(label); if (node->is_leaf()) { next_node[offset - next_node_id].set_is_origin(true); @@ -619,29 +583,15 @@ bool DoubleArray<Bytes>::insert_node(Node *node, uint64_t label, } prev_node->set_has_sibling(); } - *dest_node = next_node; - return true; + return next_node; } -bool DoubleArray<Bytes>::separate(Node *node, uint64_t labels[2], - Node **dest_node) { - uint64_t offset; - if (!find_offset(labels, 2, &offset)) { - // Error. - return false; - } +auto DoubleArray<Bytes>::separate(Node *node, uint64_t labels[2]) -> Node * { + const uint64_t offset = find_offset(labels, 2); uint64_t node_ids[2] = { offset ^ labels[0], offset ^ labels[1] }; Node *nodes[2]; nodes[0] = reserve_node(node_ids[0]); - if (!nodes[0]) { - // Error. - return false; - } nodes[1] = reserve_node(node_ids[1]); - if (!nodes[1]) { - // Error. - return false; - } uint8_t * const sibling_block = &siblings_->get_value(offset & ~(BLOCK_SIZE - 1)); nodes[0]->set_label(labels[0]); @@ -659,19 +609,18 @@ bool DoubleArray<Bytes>::separate(Node *node, uint64_t labels[2], nodes[1]->set_has_sibling(); node->set_child(labels[1]); } - *dest_node = nodes[1]; - return true; + return nodes[1]; } -bool DoubleArray<Bytes>::resolve(Node *node, uint64_t label) { +void DoubleArray<Bytes>::resolve(Node *node, uint64_t label) { uint64_t offset = node->offset(); if (offset == NODE_INVALID_OFFSET) { - return true; + return; } uint64_t dest_node_id = offset ^ label; Node * const dest_node = &nodes_->get_value(dest_node_id); if (dest_node->is_phantom()) { - return true; + return; } Node * const node_block = dest_node - (dest_node_id % BLOCK_SIZE); uint8_t * const sibling_block = @@ -689,18 +638,11 @@ bool DoubleArray<Bytes>::resolve(Node *node, uint64_t label) { } } labels[num_labels] = label; - if (!find_offset(labels, num_labels + 1, &offset)) { - // Error. - return false; - } - if (!migrate_nodes(node, offset, labels, num_labels)) { - // Error. - return false; - } - return true; + offset = find_offset(labels, num_labels + 1); + migrate_nodes(node, offset, labels, num_labels); } -bool DoubleArray<Bytes>::migrate_nodes(Node *node, uint64_t dest_offset, +void DoubleArray<Bytes>::migrate_nodes(Node *node, uint64_t dest_offset, const uint64_t *labels, uint64_t num_labels) { const uint64_t src_offset = node->offset(); @@ -718,10 +660,6 @@ bool DoubleArray<Bytes>::migrate_nodes(Node *node, uint64_t dest_offset, uint8_t * const src_sibling = &src_sibling_block[src_node_id % BLOCK_SIZE]; const uint64_t dest_node_id = dest_offset ^ labels[i]; Node * const dest_node = reserve_node(dest_node_id); - if (!dest_node) { - // Error. - return false; - } uint8_t * const dest_sibling = &dest_sibling_block[dest_node_id % BLOCK_SIZE]; Node dummy_node = *src_node; @@ -732,12 +670,10 @@ bool DoubleArray<Bytes>::migrate_nodes(Node *node, uint64_t dest_offset, header_->num_zombies += num_labels; dest_node_block[dest_offset % BLOCK_SIZE].set_is_origin(true); node->set_offset(dest_offset); - return true; } -bool DoubleArray<Bytes>::find_offset(const uint64_t *labels, - uint64_t num_labels, - uint64_t *found_offset) { +uint64_t DoubleArray<Bytes>::find_offset(const uint64_t *labels, + uint64_t num_labels) { // Blocks are tested in descending order of level. // Generally, a lower level contains more phantom nodes. uint64_t level = bit_scan_reverse(num_labels) + 1; @@ -766,8 +702,7 @@ bool DoubleArray<Bytes>::find_offset(const uint64_t *labels, } if (i >= num_labels) { // Found. - *found_offset = (block_id * BLOCK_SIZE) | offset; - return true; + return (block_id * BLOCK_SIZE) | offset; } } node_id = node_block[node_id].next(); @@ -794,8 +729,7 @@ bool DoubleArray<Bytes>::find_offset(const uint64_t *labels, (block_id != latest_block_id)); } while ((block_count < BLOCK_MAX_COUNT) && (level-- != 0)); // Use a new block. - *found_offset = (header_->num_blocks * BLOCK_SIZE) ^ labels[0]; - return true; + return (header_->num_blocks * BLOCK_SIZE) ^ labels[0]; } auto DoubleArray<Bytes>::reserve_node(uint64_t node_id) -> Node * { @@ -806,10 +740,6 @@ auto DoubleArray<Bytes>::reserve_node(uint64_t node_id) -> Node * { } else { block = &blocks_->get_value(block_id); } - if (!block) { - // Error. - return nullptr; - } Node * const node = &nodes_->get_value(node_id); Node * const node_block = node - (node_id % BLOCK_SIZE); Node * const next_node = &node_block[node->next()]; @@ -835,8 +765,8 @@ auto DoubleArray<Bytes>::reserve_node(uint64_t node_id) -> Node * { auto DoubleArray<Bytes>::reserve_block(uint64_t block_id) -> Block * { if (block_id >= blocks_->size()) { GRNXX_ERROR() << "too many blocks: block_id = " << block_id - << ", blocks_size = " << blocks_->size(); - return nullptr; + << ", max_block_id = " << (blocks_->size() - 1); + throw LogicError(); } Block * const block = &blocks_->get_value(block_id); Node * const node = &nodes_->get_value(block_id * BLOCK_SIZE); @@ -851,18 +781,14 @@ auto DoubleArray<Bytes>::reserve_block(uint64_t block_id) -> Block * { return block; } -bool DoubleArray<Bytes>::update_block_level(uint64_t block_id, Block *block, +void DoubleArray<Bytes>::update_block_level(uint64_t block_id, Block *block, uint64_t level) { - // TODO: If set_block_level() fails, the block gets lost. - if (!unset_block_level(block_id, block) || - !set_block_level(block_id, block, level)) { - // Error. - return false; - } - return true; + // FIXME: If set_block_level() fails, the block gets lost. + unset_block_level(block_id, block); + set_block_level(block_id, block, level); } -bool DoubleArray<Bytes>::set_block_level(uint64_t block_id, Block *block, +void DoubleArray<Bytes>::set_block_level(uint64_t block_id, Block *block, uint64_t level) { if (header_->latest_blocks[level] == BLOCK_INVALID_ID) { // The block becomes the only one member of the level group. @@ -882,10 +808,9 @@ bool DoubleArray<Bytes>::set_block_level(uint64_t block_id, Block *block, } block->set_level(level); block->set_failure_count(0); - return true; } -bool DoubleArray<Bytes>::unset_block_level(uint64_t block_id, Block *block) { +void DoubleArray<Bytes>::unset_block_level(uint64_t block_id, Block *block) { const uint64_t level = block->level(); const uint64_t next_block_id = block->next(); const uint64_t prev_block_id = block->prev(); @@ -902,7 +827,6 @@ bool DoubleArray<Bytes>::unset_block_level(uint64_t block_id, Block *block) { header_->latest_blocks[level] = next_block_id; } } - return true; } } // namespace map Modified: lib/grnxx/map/double_array.hpp (+10 -20) =================================================================== --- lib/grnxx/map/double_array.hpp 2013-07-24 17:05:52 +0900 (1deb0b2) +++ lib/grnxx/map/double_array.hpp 2013-07-24 18:37:56 +0900 (65dfec1) @@ -39,13 +39,6 @@ namespace map { template <typename T> class KeyPool; -enum DoubleArrayResult { - DOUBLE_ARRAY_FOUND, - DOUBLE_ARRAY_NOT_FOUND, - DOUBLE_ARRAY_INSERTED, - DOUBLE_ARRAY_FAILED -}; - struct DoubleArrayHeader; template <typename T> @@ -128,27 +121,24 @@ class DoubleArray<Bytes> : public Map<Bytes> { bool replace_key(int64_t key_id, KeyArg src_key, KeyArg dest_key); - DoubleArrayResult find_leaf(KeyArg key, Node **leaf_node, - uint64_t *leaf_key_pos); - DoubleArrayResult insert_leaf(KeyArg key, Node *node, uint64_t key_pos, - Node **leaf_node); + bool find_leaf(KeyArg key, Node **leaf_node, uint64_t *leaf_key_pos); + bool insert_leaf(KeyArg key, Node *node, uint64_t key_pos, Node **leaf_node); - bool insert_node(Node *node, uint64_t label, Node **dest_node); - bool separate(Node *node, uint64_t labels[2], Node **dest_node); + Node *insert_node(Node *node, uint64_t label); + Node *separate(Node *node, uint64_t labels[2]); - bool resolve(Node *node, uint64_t label); - bool migrate_nodes(Node *node, uint64_t dest_offset, + void resolve(Node *node, uint64_t label); + void migrate_nodes(Node *node, uint64_t dest_offset, const uint64_t *labels, uint64_t num_labels); - bool find_offset(const uint64_t *labels, uint64_t num_labels, - uint64_t *found_offset); + uint64_t find_offset(const uint64_t *labels, uint64_t num_labels); Node *reserve_node(uint64_t node_id); Block *reserve_block(uint64_t block_id); - bool update_block_level(uint64_t block_id, Block *block, uint64_t level); - bool set_block_level(uint64_t block_id, Block *block, uint64_t level); - bool unset_block_level(uint64_t block_id, Block *block); + void update_block_level(uint64_t block_id, Block *block, uint64_t level); + void set_block_level(uint64_t block_id, Block *block, uint64_t level); + void unset_block_level(uint64_t block_id, Block *block); }; } // namespace map -------------- next part -------------- HTML����������������������������...Download