[Groonga-commit] pgroonga/pgroonga at fcb0f5a [master] Support applying WAL

Back to archive index

Kouhei Sutou null+****@clear*****
Thu Oct 27 00:50:12 JST 2016


Kouhei Sutou	2016-10-27 00:50:12 +0900 (Thu, 27 Oct 2016)

  New Revision: fcb0f5a2d0973572e42b147a97e689f79052b5cf
  https://github.com/pgroonga/pgroonga/commit/fcb0f5a2d0973572e42b147a97e689f79052b5cf

  Message:
    Support applying WAL

  Modified files:
    src/pgrn_xlog.c
    src/pgrn_xlog.h
    src/pgroonga.c

  Modified: src/pgrn_xlog.c (+300 -0)
===================================================================
--- src/pgrn_xlog.c    2016-10-27 00:47:15 +0900 (37f927a)
+++ src/pgrn_xlog.c    2016-10-27 00:50:12 +0900 (9d9bc30)
@@ -3,6 +3,7 @@
 #include "pgrn_compatible.h"
 
 #include "pgrn_global.h"
+#include "pgrn_groonga.h"
 #include "pgrn_xlog.h"
 
 static bool PGrnXLogEnabled = false;
@@ -37,6 +38,7 @@ PGrnXLogDisable(void)
 
 #ifdef PGRN_SUPPORT_XLOG
 static grn_ctx *ctx = &PGrnContext;
+static struct PGrnBuffers *buffers = &PGrnBuffers;
 #endif
 
 #ifdef PGRN_SUPPORT_XLOG
@@ -82,6 +84,75 @@ struct PGrnXLogData_
 #endif
 };
 
+#define PGRN_XLOG_STATUES_TABLE_NAME "XLogStatuses"
+#define PGRN_XLOG_STATUES_TABLE_NAME_SIZE strlen(PGRN_XLOG_STATUES_TABLE_NAME)
+#define PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME "current"
+
+static void
+PGrnXLogEnsureStatusesTable(void)
+{
+#ifdef PGRN_SUPPORT_XLOG
+	grn_obj *xlogStatuses;
+
+	xlogStatuses = grn_ctx_get(ctx,
+							   PGRN_XLOG_STATUES_TABLE_NAME,
+							   PGRN_XLOG_STATUES_TABLE_NAME_SIZE);
+	if (xlogStatuses)
+		return;
+
+	xlogStatuses = PGrnCreateTable(PGRN_XLOG_STATUES_TABLE_NAME,
+								   GRN_OBJ_TABLE_HASH_KEY,
+								   grn_ctx_at(ctx, GRN_DB_UINT32));
+	PGrnCreateColumn(xlogStatuses,
+					 PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME,
+					 GRN_OBJ_COLUMN_SCALAR,
+					 grn_ctx_at(ctx, GRN_DB_UINT64));
+#endif
+}
+
+#ifdef PGRN_SUPPORT_XLOG
+static uint64_t
+PGrnXLogPackPosition(BlockNumber block, OffsetNumber offset)
+{
+	return (((uint64_t)block) << 32) + (uint64_t)offset;
+}
+
+static void
+PGrnXLogUnpackPosition(uint64_t position,
+					   BlockNumber *block,
+					   OffsetNumber *offset)
+{
+	*block = (BlockNumber)(position >> 32);
+	*offset = (OffsetNumber)(position & ((1 << 16) - 1));
+}
+
+static void
+PGrnXLogUpdateStatus(Relation index,
+					 BlockNumber block,
+					 OffsetNumber offset)
+{
+	grn_obj *statusesTable;
+	grn_obj *currentColumn;
+	uint32_t oid;
+	grn_id id;
+	uint64_t positionRaw;
+	grn_obj *position = &(buffers->general);
+
+	PGrnXLogEnsureStatusesTable();
+
+	statusesTable = PGrnLookup(PGRN_XLOG_STATUES_TABLE_NAME, ERROR);
+	currentColumn = PGrnLookupColumn(statusesTable,
+									 PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME,
+									 ERROR);
+	oid = RelationGetRelid(index);
+	id = grn_table_add(ctx, statusesTable, &oid, sizeof(uint32_t), NULL);
+	positionRaw = PGrnXLogPackPosition(block, offset);
+	grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0);
+	GRN_UINT64_SET(ctx, position, positionRaw);
+	grn_obj_set_value(ctx, currentColumn, id, position, GRN_OBJ_SET);
+}
+#endif
+
 #define PGRN_XLOG_META_PAGE_BLOCK_NUMBER 0
 
 #ifdef PGRN_SUPPORT_XLOG
@@ -189,6 +260,9 @@ PGrnXLogPageWriter(void *userData,
 				   buffer,
 				   length);
 			data->current.pageSpecial->current += length;
+			PGrnXLogUpdateStatus(data->index,
+								 BufferGetBlockNumber(data->current.buffer),
+								 data->current.pageSpecial->current);
 			written += length;
 		}
 		else
@@ -203,6 +277,9 @@ PGrnXLogPageWriter(void *userData,
 				   buffer,
 				   writableSize);
 			data->current.pageSpecial->current += writableSize;
+			PGrnXLogUpdateStatus(data->index,
+								 BufferGetBlockNumber(data->current.buffer),
+								 data->current.pageSpecial->current);
 			written += writableSize;
 			length -= writableSize;
 			buffer += writableSize;
@@ -423,3 +500,226 @@ PGrnXLogInsertColumn(PGrnXLogData *data,
 	PGrnXLogInsertColumnFinish(data);
 #endif
 }
+
+#ifdef PGRN_SUPPORT_XLOG
+typedef struct {
+	Relation index;
+	grn_obj *statusesTable;
+	grn_obj *currentColumn;
+	grn_id statusID;
+	struct {
+		BlockNumber block;
+		OffsetNumber offset;
+	} current;
+	grn_obj *sources;
+} PGrnXLogApplyData;
+
+static bool
+PGrnXLogApplyNeeded(PGrnXLogApplyData *data)
+{
+	BlockNumber currentBlock;
+	OffsetNumber currentOffset;
+	BlockNumber nBlocks;
+
+	{
+		grn_obj *position = &(buffers->general);
+		grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0);
+		grn_obj_get_value(ctx, data->currentColumn, data->statusID, position);
+		PGrnXLogUnpackPosition(GRN_UINT64_VALUE(position),
+							   &currentBlock,
+							   &currentOffset);
+	}
+
+	nBlocks = RelationGetNumberOfBlocks(data->index);
+	if (currentBlock >= nBlocks)
+	{
+		return false;
+	}
+	else if (currentBlock == (nBlocks - 1))
+	{
+		Buffer buffer;
+		Page page;
+		PGrnPageSpecial *pageSpecial;
+		bool needToApply;
+
+		buffer = ReadBuffer(data->index, currentBlock);
+		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buffer);
+		pageSpecial = (PGrnPageSpecial *)PageGetSpecialPointer(page);
+		needToApply = (pageSpecial->current > currentOffset);
+		UnlockReleaseBuffer(buffer);
+		return needToApply;
+	} else {
+		return true;
+	}
+}
+
+static void
+PGrnXLogApplyObject(PGrnXLogApplyData *data, msgpack_object *object)
+{
+	grn_id id;
+	uint32_t i, nColumns;
+
+	if (object->type != MSGPACK_OBJECT_MAP)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pgroonga: XLog: apply: record must be map: <%#x>",
+						object->type)));
+	}
+
+	id = grn_table_add(ctx, data->sources, NULL, 0, NULL);
+	nColumns = object->via.map.size;
+	for (i = 0; i < nColumns; i++)
+	{
+		msgpack_object *key;
+		msgpack_object *value;
+		grn_obj *column;
+
+		key = &(object->via.map.ptr[i].key);
+		value = &(object->via.map.ptr[i].val);
+
+		if (key->type != MSGPACK_OBJECT_STR)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("pgroonga: XLog: apply: key must be map: <%#x>",
+							key->type)));
+		}
+
+		column = PGrnLookupColumnWithSize(data->sources,
+										  key->via.str.ptr,
+										  key->via.str.size,
+										  ERROR);
+		switch (value->type)
+		{
+		case MSGPACK_OBJECT_BOOLEAN:
+			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_BOOL, 0);
+			GRN_BOOL_SET(ctx, &(buffers->general), value->via.boolean);
+			break;
+		case MSGPACK_OBJECT_POSITIVE_INTEGER:
+			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_UINT64, 0);
+			GRN_UINT64_SET(ctx, &(buffers->general), value->via.u64);
+			break;
+		case MSGPACK_OBJECT_NEGATIVE_INTEGER:
+			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_INT64, 0);
+			GRN_INT64_SET(ctx, &(buffers->general), value->via.i64);
+			break;
+		case MSGPACK_OBJECT_FLOAT:
+			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_FLOAT, 0);
+			GRN_FLOAT_SET(ctx, &(buffers->general), value->via.f64);
+			break;
+		case MSGPACK_OBJECT_STR:
+			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_TEXT, 0);
+			GRN_TEXT_SET(ctx, &(buffers->general),
+						 value->via.str.ptr,
+						 value->via.str.size);
+			break;
+/*
+		case MSGPACK_OBJECT_ARRAY:
+			break;
+		case MSGPACK_OBJECT_MAP:
+			break;
+		case MSGPACK_OBJECT_BIN:
+			break;
+		case MSGPACK_OBJECT_EXT:
+			break;
+*/
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("pgroonga: XLog: apply: unexpected value type: <%#x>",
+							value->type)));
+			break;
+		}
+		grn_obj_set_value(ctx, column, id, &(buffers->general), GRN_OBJ_SET);
+	}
+}
+
+static void
+PGrnXLogApplyConsume(PGrnXLogApplyData *data)
+{
+	BlockNumber i, nBlocks;
+	msgpack_unpacker unpacker;
+	msgpack_unpacked unpacked;
+	BlockNumber lastBlock = data->current.block;
+	OffsetNumber lastOffset = data->current.offset;
+
+	data->sources = PGrnLookupSourcesTable(data->index, ERROR);
+
+	msgpack_unpacker_init(&unpacker, PGRN_PAGE_DATA_SIZE);
+	msgpack_unpacked_init(&unpacked);
+	nBlocks = RelationGetNumberOfBlocks(data->index);
+	for (i = data->current.block; i < nBlocks; i++)
+	{
+		Buffer buffer;
+		Page page;
+		PGrnPageSpecial *pageSpecial;
+		size_t dataSize;
+
+		buffer = ReadBuffer(data->index, i);
+		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buffer);
+		pageSpecial = (PGrnPageSpecial *)PageGetSpecialPointer(page);
+		dataSize = pageSpecial->current - data->current.offset;
+		msgpack_unpacker_reserve_buffer(&unpacker, dataSize);
+		memcpy(msgpack_unpacker_buffer(&unpacker),
+			   pageSpecial->data + SizeOfPageHeaderData + data->current.offset,
+			   dataSize);
+		UnlockReleaseBuffer(buffer);
+		data->current.offset = 0;
+
+		msgpack_unpacker_buffer_consumed(&unpacker, dataSize);
+		while (msgpack_unpacker_next(&unpacker, &unpacked) ==
+			   MSGPACK_UNPACK_SUCCESS)
+		{
+			PGrnXLogApplyObject(data, &unpacked.data);
+		}
+
+		lastBlock = i;
+		lastOffset = pageSpecial->current;
+	}
+	msgpack_unpacked_destroy(&unpacked);
+	msgpack_unpacker_destroy(&unpacker);
+
+	PGrnXLogUpdateStatus(data->index, lastBlock, lastOffset);
+}
+#endif
+
+void
+PGrnXLogApply(Relation index)
+{
+#ifdef PGRN_SUPPORT_XLOG
+	PGrnXLogApplyData data;
+	uint32_t oid;
+
+	PGrnXLogEnsureStatusesTable();
+
+	data.index = index;
+	data.statusesTable = PGrnLookup(PGRN_XLOG_STATUES_TABLE_NAME, ERROR);
+	data.currentColumn = PGrnLookupColumn(data.statusesTable,
+										  PGRN_XLOG_STATUES_CURRENT_COLUMN_NAME,
+										  ERROR);
+	oid = RelationGetRelid(index);
+	data.statusID = grn_table_add(ctx,
+								  data.statusesTable,
+								  &oid,
+								  sizeof(uint32_t),
+								  NULL);
+	if (!PGrnXLogApplyNeeded(&data))
+		return;
+
+	LockRelation(index, RowExclusiveLock);
+	{
+		grn_obj *position = &(buffers->general);
+
+		grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0);
+		grn_obj_get_value(ctx, data.currentColumn, data.statusID, position);
+		PGrnXLogUnpackPosition(GRN_UINT64_VALUE(position),
+							   &(data.current.block),
+							   &(data.current.offset));
+	}
+	PGrnXLogApplyConsume(&data);
+	UnlockRelation(index, RowExclusiveLock);
+#endif
+}

  Modified: src/pgrn_xlog.h (+2 -0)
===================================================================
--- src/pgrn_xlog.h    2016-10-27 00:47:15 +0900 (8f185c3)
+++ src/pgrn_xlog.h    2016-10-27 00:50:12 +0900 (9b07dd6)
@@ -22,3 +22,5 @@ void PGrnXLogInsertColumnFinish(PGrnXLogData *data);
 void PGrnXLogInsertColumn(PGrnXLogData *data,
 						  const char *name,
 						  grn_obj *value);
+
+void PGrnXLogApply(Relation index);

  Modified: src/pgroonga.c (+1 -0)
===================================================================
--- src/pgroonga.c    2016-10-27 00:47:15 +0900 (88fc3b1)
+++ src/pgroonga.c    2016-10-27 00:50:12 +0900 (61ec415)
@@ -3987,6 +3987,7 @@ PGrnCostEstimateUpdateSelectivity(IndexPath *path)
 	ListCell *cell;
 
 	index = RelationIdGetRelation(indexInfo->indexoid);
+	PGrnXLogApply(index);
 	sourcesTable = PGrnLookupSourcesTable(index, ERROR);
 
 	foreach(cell, path->indexquals)
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index