Kouhei Sutou
null+****@clear*****
Thu Oct 27 00:50:12 JST 2016
Kouhei Sutou 2016-10-27 00:50:12 +0900 (Thu, 27 Oct 2016) New Revision: fcb0f5a2d0973572e42b147a97e689f79052b5cf https://github.com/pgroonga/pgroonga/commit/fcb0f5a2d0973572e42b147a97e689f79052b5cf Message: Support applying WAL Modified files: src/pgrn_xlog.c src/pgrn_xlog.h src/pgroonga.c Modified: src/pgrn_xlog.c (+300 -0) =================================================================== --- src/pgrn_xlog.c 2016-10-27 00:47:15 +0900 (37f927a) +++ src/pgrn_xlog.c 2016-10-27 00:50:12 +0900 (9d9bc30) @@ -3,6 +3,7 @@ #include "pgrn_compatible.h" #include "pgrn_global.h" +#include "pgrn_groonga.h" #include "pgrn_xlog.h" static bool PGrnXLogEnabled = false; @@ -37,6 +38,7 @@ PGrnXLogDisable(void) #ifdef PGRN_SUPPORT_XLOG static grn_ctx *ctx = &PGrnContext; +static struct PGrnBuffers *buffers = &PGrnBuffers; #endif #ifdef PGRN_SUPPORT_XLOG @@ -82,6 +84,75 @@ struct PGrnXLogData_ #endif }; +#define PGRN_XLOG_STATUES_TABLE_NAME "XLogStatuses" +#define PGRN_XLOG_STATUES_TABLE_NAME_SIZE strlen(PGRN_XLOG_STATUES_TABLE_NAME) +#define PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME "current" + +static void +PGrnXLogEnsureStatusesTable(void) +{ +#ifdef PGRN_SUPPORT_XLOG + grn_obj *xlogStatuses; + + xlogStatuses = grn_ctx_get(ctx, + PGRN_XLOG_STATUES_TABLE_NAME, + PGRN_XLOG_STATUES_TABLE_NAME_SIZE); + if (xlogStatuses) + return; + + xlogStatuses = PGrnCreateTable(PGRN_XLOG_STATUES_TABLE_NAME, + GRN_OBJ_TABLE_HASH_KEY, + grn_ctx_at(ctx, GRN_DB_UINT32)); + PGrnCreateColumn(xlogStatuses, + PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME, + GRN_OBJ_COLUMN_SCALAR, + grn_ctx_at(ctx, GRN_DB_UINT64)); +#endif +} + +#ifdef PGRN_SUPPORT_XLOG +static uint64_t +PGrnXLogPackPosition(BlockNumber block, OffsetNumber offset) +{ + return (((uint64_t)block) << 32) + (uint64_t)offset; +} + +static void +PGrnXLogUnpackPosition(uint64_t position, + BlockNumber *block, + OffsetNumber *offset) +{ + *block = (BlockNumber)(position >> 32); + *offset = (OffsetNumber)(position & ((1 << 16) - 1)); +} + +static void +PGrnXLogUpdateStatus(Relation index, + BlockNumber block, + OffsetNumber offset) +{ + grn_obj *statusesTable; + grn_obj *currentColumn; + uint32_t oid; + grn_id id; + uint64_t positionRaw; + grn_obj *position = &(buffers->general); + + PGrnXLogEnsureStatusesTable(); + + statusesTable = PGrnLookup(PGRN_XLOG_STATUES_TABLE_NAME, ERROR); + currentColumn = PGrnLookupColumn(statusesTable, + PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME, + ERROR); + oid = RelationGetRelid(index); + id = grn_table_add(ctx, statusesTable, &oid, sizeof(uint32_t), NULL); + positionRaw = PGrnXLogPackPosition(block, offset); + grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0); + GRN_UINT64_SET(ctx, position, positionRaw); + grn_obj_set_value(ctx, currentColumn, id, position, GRN_OBJ_SET); +} +#endif + #define PGRN_XLOG_META_PAGE_BLOCK_NUMBER 0 #ifdef PGRN_SUPPORT_XLOG @@ -189,6 +260,9 @@ PGrnXLogPageWriter(void *userData, buffer, length); data->current.pageSpecial->current += length; + PGrnXLogUpdateStatus(data->index, + BufferGetBlockNumber(data->current.buffer), + data->current.pageSpecial->current); written += length; } else @@ -203,6 +277,9 @@ PGrnXLogPageWriter(void *userData, buffer, writableSize); data->current.pageSpecial->current += writableSize; + PGrnXLogUpdateStatus(data->index, + BufferGetBlockNumber(data->current.buffer), + data->current.pageSpecial->current); written += writableSize; length -= writableSize; buffer += writableSize; @@ -423,3 +500,226 @@ PGrnXLogInsertColumn(PGrnXLogData *data, PGrnXLogInsertColumnFinish(data); #endif } + +#ifdef PGRN_SUPPORT_XLOG +typedef struct { + Relation index; + grn_obj *statusesTable; + grn_obj *currentColumn; + grn_id statusID; + struct { + BlockNumber block; + OffsetNumber offset; + } current; + grn_obj *sources; +} PGrnXLogApplyData; + +static bool +PGrnXLogApplyNeeded(PGrnXLogApplyData *data) +{ + BlockNumber currentBlock; + OffsetNumber currentOffset; + BlockNumber nBlocks; + + { + grn_obj *position = &(buffers->general); + grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0); + grn_obj_get_value(ctx, data->currentColumn, data->statusID, position); + PGrnXLogUnpackPosition(GRN_UINT64_VALUE(position), + ¤tBlock, + ¤tOffset); + } + + nBlocks = RelationGetNumberOfBlocks(data->index); + if (currentBlock >= nBlocks) + { + return false; + } + else if (currentBlock == (nBlocks - 1)) + { + Buffer buffer; + Page page; + PGrnPageSpecial *pageSpecial; + bool needToApply; + + buffer = ReadBuffer(data->index, currentBlock); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + pageSpecial = (PGrnPageSpecial *)PageGetSpecialPointer(page); + needToApply = (pageSpecial->current > currentOffset); + UnlockReleaseBuffer(buffer); + return needToApply; + } else { + return true; + } +} + +static void +PGrnXLogApplyObject(PGrnXLogApplyData *data, msgpack_object *object) +{ + grn_id id; + uint32_t i, nColumns; + + if (object->type != MSGPACK_OBJECT_MAP) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: XLog: apply: record must be map: <%#x>", + object->type))); + } + + id = grn_table_add(ctx, data->sources, NULL, 0, NULL); + nColumns = object->via.map.size; + for (i = 0; i < nColumns; i++) + { + msgpack_object *key; + msgpack_object *value; + grn_obj *column; + + key = &(object->via.map.ptr[i].key); + value = &(object->via.map.ptr[i].val); + + if (key->type != MSGPACK_OBJECT_STR) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: XLog: apply: key must be map: <%#x>", + key->type))); + } + + column = PGrnLookupColumnWithSize(data->sources, + key->via.str.ptr, + key->via.str.size, + ERROR); + switch (value->type) + { + case MSGPACK_OBJECT_BOOLEAN: + grn_obj_reinit(ctx, &(buffers->general), GRN_DB_BOOL, 0); + GRN_BOOL_SET(ctx, &(buffers->general), value->via.boolean); + break; + case MSGPACK_OBJECT_POSITIVE_INTEGER: + grn_obj_reinit(ctx, &(buffers->general), GRN_DB_UINT64, 0); + GRN_UINT64_SET(ctx, &(buffers->general), value->via.u64); + break; + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + grn_obj_reinit(ctx, &(buffers->general), GRN_DB_INT64, 0); + GRN_INT64_SET(ctx, &(buffers->general), value->via.i64); + break; + case MSGPACK_OBJECT_FLOAT: + grn_obj_reinit(ctx, &(buffers->general), GRN_DB_FLOAT, 0); + GRN_FLOAT_SET(ctx, &(buffers->general), value->via.f64); + break; + case MSGPACK_OBJECT_STR: + grn_obj_reinit(ctx, &(buffers->general), GRN_DB_TEXT, 0); + GRN_TEXT_SET(ctx, &(buffers->general), + value->via.str.ptr, + value->via.str.size); + break; +/* + case MSGPACK_OBJECT_ARRAY: + break; + case MSGPACK_OBJECT_MAP: + break; + case MSGPACK_OBJECT_BIN: + break; + case MSGPACK_OBJECT_EXT: + break; +*/ + default: + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: XLog: apply: unexpected value type: <%#x>", + value->type))); + break; + } + grn_obj_set_value(ctx, column, id, &(buffers->general), GRN_OBJ_SET); + } +} + +static void +PGrnXLogApplyConsume(PGrnXLogApplyData *data) +{ + BlockNumber i, nBlocks; + msgpack_unpacker unpacker; + msgpack_unpacked unpacked; + BlockNumber lastBlock = data->current.block; + OffsetNumber lastOffset = data->current.offset; + + data->sources = PGrnLookupSourcesTable(data->index, ERROR); + + msgpack_unpacker_init(&unpacker, PGRN_PAGE_DATA_SIZE); + msgpack_unpacked_init(&unpacked); + nBlocks = RelationGetNumberOfBlocks(data->index); + for (i = data->current.block; i < nBlocks; i++) + { + Buffer buffer; + Page page; + PGrnPageSpecial *pageSpecial; + size_t dataSize; + + buffer = ReadBuffer(data->index, i); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + pageSpecial = (PGrnPageSpecial *)PageGetSpecialPointer(page); + dataSize = pageSpecial->current - data->current.offset; + msgpack_unpacker_reserve_buffer(&unpacker, dataSize); + memcpy(msgpack_unpacker_buffer(&unpacker), + pageSpecial->data + SizeOfPageHeaderData + data->current.offset, + dataSize); + UnlockReleaseBuffer(buffer); + data->current.offset = 0; + + msgpack_unpacker_buffer_consumed(&unpacker, dataSize); + while (msgpack_unpacker_next(&unpacker, &unpacked) == + MSGPACK_UNPACK_SUCCESS) + { + PGrnXLogApplyObject(data, &unpacked.data); + } + + lastBlock = i; + lastOffset = pageSpecial->current; + } + msgpack_unpacked_destroy(&unpacked); + msgpack_unpacker_destroy(&unpacker); + + PGrnXLogUpdateStatus(data->index, lastBlock, lastOffset); +} +#endif + +void +PGrnXLogApply(Relation index) +{ +#ifdef PGRN_SUPPORT_XLOG + PGrnXLogApplyData data; + uint32_t oid; + + PGrnXLogEnsureStatusesTable(); + + data.index = index; + data.statusesTable = PGrnLookup(PGRN_XLOG_STATUES_TABLE_NAME, ERROR); + data.currentColumn = PGrnLookupColumn(data.statusesTable, + PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME, + ERROR); + oid = RelationGetRelid(index); + data.statusID = grn_table_add(ctx, + data.statusesTable, + &oid, + sizeof(uint32_t), + NULL); + if (!PGrnXLogApplyNeeded(&data)) + return; + + LockRelation(index, RowExclusiveLock); + { + grn_obj *position = &(buffers->general); + + grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0); + grn_obj_get_value(ctx, data.currentColumn, data.statusID, position); + PGrnXLogUnpackPosition(GRN_UINT64_VALUE(position), + &(data.current.block), + &(data.current.offset)); + } + PGrnXLogApplyConsume(&data); + UnlockRelation(index, RowExclusiveLock); +#endif +} Modified: src/pgrn_xlog.h (+2 -0) =================================================================== --- src/pgrn_xlog.h 2016-10-27 00:47:15 +0900 (8f185c3) +++ src/pgrn_xlog.h 2016-10-27 00:50:12 +0900 (9b07dd6) @@ -22,3 +22,5 @@ void PGrnXLogInsertColumnFinish(PGrnXLogData *data); void PGrnXLogInsertColumn(PGrnXLogData *data, const char *name, grn_obj *value); + +void PGrnXLogApply(Relation index); Modified: src/pgroonga.c (+1 -0) =================================================================== --- src/pgroonga.c 2016-10-27 00:47:15 +0900 (88fc3b1) +++ src/pgroonga.c 2016-10-27 00:50:12 +0900 (61ec415) @@ -3987,6 +3987,7 @@ PGrnCostEstimateUpdateSelectivity(IndexPath *path) ListCell *cell; index = RelationIdGetRelation(indexInfo->indexoid); + PGrnXLogApply(index); sourcesTable = PGrnLookupSourcesTable(index, ERROR); foreach(cell, path->indexquals) -------------- next part -------------- HTML����������������������������...Download