[Groonga-commit] pgroonga/pgroonga at 2b944dc [master] Support schema replication

Back to archive index

Kouhei Sutou null+****@clear*****
Fri Oct 28 14:42:57 JST 2016


Kouhei Sutou	2016-10-28 14:42:57 +0900 (Fri, 28 Oct 2016)

  New Revision: 2b944dcc70b3b8972aec9aa65419667ffaf581f4
  https://github.com/pgroonga/pgroonga/commit/2b944dcc70b3b8972aec9aa65419667ffaf581f4

  Message:
    Support schema replication

  Modified files:
    src/pgrn_global.c
    src/pgrn_global.h
    src/pgrn_wal.c
    src/pgrn_wal.h

  Modified: src/pgrn_global.c (+4 -0)
===================================================================
--- src/pgrn_global.c    2016-10-28 14:42:48 +0900 (7a1b7f4)
+++ src/pgrn_global.c    2016-10-28 14:42:57 +0900 (4d26dbb)
@@ -17,6 +17,8 @@ PGrnInitializeBuffers(void)
 	GRN_UINT64_INIT(&(PGrnBuffers.ctid), 0);
 	GRN_FLOAT_INIT(&(PGrnBuffers.score), 0);
 	GRN_RECORD_INIT(&(PGrnBuffers.sourceIDs), GRN_OBJ_VECTOR, GRN_ID_NIL);
+	GRN_UINT64_INIT(&(PGrnBuffers.walPosition), 0);
+	GRN_VOID_INIT(&(PGrnBuffers.walValue));
 	GRN_TEXT_INIT(&(PGrnBuffers.head), 0);
 	GRN_TEXT_INIT(&(PGrnBuffers.body), 0);
 	GRN_TEXT_INIT(&(PGrnBuffers.foot), 0);
@@ -33,6 +35,8 @@ PGrnFinalizeBuffers(void)
 	GRN_OBJ_FIN(ctx, &(PGrnBuffers.ctid));
 	GRN_OBJ_FIN(ctx, &(PGrnBuffers.score));
 	GRN_OBJ_FIN(ctx, &(PGrnBuffers.sourceIDs));
+	GRN_OBJ_FIN(ctx, &(PGrnBuffers.walPosition));
+	GRN_OBJ_FIN(ctx, &(PGrnBuffers.walValue));
 	GRN_OBJ_FIN(ctx, &(PGrnBuffers.head));
 	GRN_OBJ_FIN(ctx, &(PGrnBuffers.body));
 	GRN_OBJ_FIN(ctx, &(PGrnBuffers.foot));

  Modified: src/pgrn_global.h (+2 -0)
===================================================================
--- src/pgrn_global.h    2016-10-28 14:42:48 +0900 (e00abf9)
+++ src/pgrn_global.h    2016-10-28 14:42:57 +0900 (79a2e4a)
@@ -11,6 +11,8 @@ struct PGrnBuffers
 	grn_obj ctid;
 	grn_obj score;
 	grn_obj sourceIDs;
+	grn_obj walPosition;
+	grn_obj walValue;
 	grn_obj head;
 	grn_obj body;
 	grn_obj foot;

  Modified: src/pgrn_wal.c (+274 -57)
===================================================================
--- src/pgrn_wal.c    2016-10-28 14:42:48 +0900 (ff398a2)
+++ src/pgrn_wal.c    2016-10-28 14:42:57 +0900 (2f88567)
@@ -168,7 +168,7 @@ PGrnWALUpdateStatus(Relation index,
 	uint32_t oid;
 	grn_id id;
 	uint64_t positionRaw;
-	grn_obj *position = &(buffers->general);
+	grn_obj *position = &(buffers->walPosition);
 
 	PGrnWALEnsureStatusesTable();
 
@@ -179,7 +179,7 @@ PGrnWALUpdateStatus(Relation index,
 	oid = RelationGetRelid(index);
 	id = grn_table_add(ctx, statusesTable, &oid, sizeof(uint32_t), NULL);
 	positionRaw = PGrnWALPackPosition(block, offset);
-	grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0);
+	GRN_BULK_REWIND(position);
 	GRN_UINT64_SET(ctx, position, positionRaw);
 	grn_obj_set_value(ctx, currentColumn, id, position, GRN_OBJ_SET);
 }
@@ -536,7 +536,7 @@ PGrnWALInsertColumn(PGrnWALData *data,
 void
 PGrnWALCreateTable(Relation index,
 				   const char *name,
-				   grn_obj_flags flags,
+				   grn_table_flags flags,
 				   grn_obj *type,
 				   grn_obj *tokenizer,
 				   grn_obj *normalizer)
@@ -580,7 +580,7 @@ void
 PGrnWALCreateColumn(Relation index,
 					grn_obj *table,
 					const char *name,
-					grn_obj_flags flags,
+					grn_column_flags flags,
 					grn_obj *type)
 {
 #ifdef PGRN_SUPPORT_WAL
@@ -710,8 +710,8 @@ PGrnWALApplyNeeded(PGrnWALApplyData *data)
 	BlockNumber nBlocks;
 
 	{
-		grn_obj *position = &(buffers->general);
-		grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0);
+		grn_obj *position = &(buffers->walPosition);
+		GRN_BULK_REWIND(position);
 		grn_obj_get_value(ctx, data->currentColumn, data->statusID, position);
 		PGrnWALUnpackPosition(GRN_UINT64_VALUE(position),
 							   &currentBlock,
@@ -743,7 +743,9 @@ PGrnWALApplyNeeded(PGrnWALApplyData *data)
 }
 
 static bool
-PGrnWALApplyKeyEqual(msgpack_object *key, const char *name)
+PGrnWALApplyKeyEqual(const char *context,
+					 msgpack_object *key,
+					 const char *name)
 {
 	size_t nameSize;
 
@@ -751,7 +753,10 @@ PGrnWALApplyKeyEqual(msgpack_object *key, const char *name)
 	{
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("pgroonga: WAL: apply: key must be map: <%#x>",
+				 errmsg("pgroonga: WAL: apply: %s%s"
+						"key must be map: <%#x>",
+						context ? context : "",
+						context ? ": " : "",
 						key->type)));
 	}
 
@@ -764,11 +769,143 @@ PGrnWALApplyKeyEqual(msgpack_object *key, const char *name)
 	return true;
 }
 
+static uint64_t
+PGrnWALApplyValueGetPositiveInteger(const char *context,
+									msgpack_object_kv *kv)
+{
+	if (kv->val.type != MSGPACK_OBJECT_POSITIVE_INTEGER)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pgroonga: WAL: apply: %s%s"
+						"%.*s value must be positive integer: "
+						"<%#x>",
+						context ? context : "",
+						context ? ": " : "",
+						kv->key.via.str.size,
+						kv->key.via.str.ptr,
+						kv->val.type)));
+	}
+
+	return kv->val.via.u64;
+}
+
+static void
+PGrnWALApplyValueGetString(const char *context,
+						   msgpack_object_kv *kv,
+						   const char **string,
+						   size_t *stringSize)
+{
+	if (kv->val.type != MSGPACK_OBJECT_STR)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pgroonga: WAL: apply: %s%s"
+						"%.*s value must be string: "
+						"<%#x>",
+						context ? context : "",
+						context ? ": " : "",
+						kv->key.via.str.size,
+						kv->key.via.str.ptr,
+						kv->val.type)));
+	}
+
+	*string = kv->val.via.str.ptr;
+	*stringSize = kv->val.via.str.size;
+}
+
+static grn_obj *
+PGrnWALApplyValueGetGroongaObject(const char *context,
+								  msgpack_object_kv *kv)
+{
+	grn_obj *object = NULL;
+
+	switch (kv->val.type)
+	{
+	case MSGPACK_OBJECT_NIL:
+		object = NULL;
+		break;
+	case MSGPACK_OBJECT_STR:
+		object = PGrnLookupWithSize(kv->val.via.str.ptr,
+									kv->val.via.str.size,
+									ERROR);
+		break;
+	default:
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pgroonga: WAL: apply: %s%s"
+						"%.*s value must be nil or string: "
+						"<%#x>",
+						context ? context : "",
+						context ? ": " : "",
+						kv->key.via.str.size,
+						kv->key.via.str.ptr,
+						kv->val.type)));
+		break;
+	}
+
+	return object;
+}
+
+static void
+PGrnWALApplyValueGetGroongaObjectIDs(const char *context,
+									 msgpack_object_kv *kv,
+									 grn_obj *ids)
+{
+	msgpack_object_array *array;
+	uint32_t i;
+
+	if (kv->val.type != MSGPACK_OBJECT_ARRAY)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pgroonga: WAL: apply: %s%s"
+						"%.*s value must be array: "
+						"<%#x>",
+						context ? context : "",
+						context ? ": " : "",
+						kv->key.via.str.size,
+						kv->key.via.str.ptr,
+						kv->val.type)));
+	}
+
+	array = &(kv->val.via.array);
+	for (i = 0; i < array->size; i++)
+	{
+		msgpack_object *element;
+		grn_obj *object;
+		grn_id objectID;
+
+		element = &(array->ptr[i]);
+		if (element->type != MSGPACK_OBJECT_STR)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("pgroonga: WAL: apply: %s%s"
+							"%.*s value must be array of string: "
+							"[%u]=<%#x>",
+							context ? context : "",
+							context ? ": " : "",
+							kv->key.via.str.size,
+							kv->key.via.str.ptr,
+							i,
+							element->type)));
+		}
+
+		object = PGrnLookupWithSize(element->via.str.ptr,
+									element->via.str.size,
+									ERROR);
+		objectID = grn_obj_id(ctx, object);
+		GRN_RECORD_PUT(ctx, ids, objectID);
+	}
+}
+
 static void
 PGrnWALApplyInsert(PGrnWALApplyData *data,
 				   msgpack_object_map *map,
 				   uint32_t currentElement)
 {
+	const char *context = "insert";
 	grn_obj *table = NULL;
 	const char *key = NULL;
 	size_t keySize = 0;
@@ -780,20 +917,9 @@ PGrnWALApplyInsert(PGrnWALApplyData *data,
 		msgpack_object_kv *kv;
 
 		kv = &(map->ptr[currentElement]);
-		if (PGrnWALApplyKeyEqual(&(kv->key), "_table"))
+		if (PGrnWALApplyKeyEqual(context, &(kv->key), "_table"))
 		{
-			if (kv->val.type != MSGPACK_OBJECT_STR)
-			{
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("pgroonga: WAL: apply: insert: "
-								"_table value must be string: "
-								"<%#x>",
-								kv->val.type)));
-			}
-			table = PGrnLookupWithSize(kv->val.via.str.ptr,
-									   kv->val.via.str.size,
-									   ERROR);
+			table = PGrnWALApplyValueGetGroongaObject(context, kv);
 			currentElement++;
 		}
 	}
@@ -809,15 +935,16 @@ PGrnWALApplyInsert(PGrnWALApplyData *data,
 		msgpack_object_kv *kv;
 
 		kv = &(map->ptr[currentElement]);
-		if (PGrnWALApplyKeyEqual(&(kv->key), "_key"))
+		if (PGrnWALApplyKeyEqual(context, &(kv->key), "_key"))
 		{
 			if (kv->val.type != MSGPACK_OBJECT_BIN)
 			{
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("pgroonga: WAL: apply: insert: "
+						 errmsg("pgroonga: WAL: apply: %s: "
 								"_key value must be binary: "
 								"<%#x>",
+								context,
 								kv->val.type)));
 			}
 			key = kv->val.via.bin.ptr;
@@ -832,6 +959,7 @@ PGrnWALApplyInsert(PGrnWALApplyData *data,
 		msgpack_object *key;
 		msgpack_object *value;
 		grn_obj *column;
+		grn_obj *walValue = &(buffers->walValue);
 
 		key = &(map->ptr[i].key);
 		value = &(map->ptr[i].val);
@@ -840,8 +968,9 @@ PGrnWALApplyInsert(PGrnWALApplyData *data,
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("pgroonga: WAL: apply: insert: "
+					 errmsg("pgroonga: WAL: apply: %s: "
 							"key must be map: <%#x>",
+							context,
 							key->type)));
 		}
 
@@ -852,24 +981,24 @@ PGrnWALApplyInsert(PGrnWALApplyData *data,
 		switch (value->type)
 		{
 		case MSGPACK_OBJECT_BOOLEAN:
-			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_BOOL, 0);
-			GRN_BOOL_SET(ctx, &(buffers->general), value->via.boolean);
+			grn_obj_reinit(ctx, walValue, GRN_DB_BOOL, 0);
+			GRN_BOOL_SET(ctx, walValue, value->via.boolean);
 			break;
 		case MSGPACK_OBJECT_POSITIVE_INTEGER:
-			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_UINT64, 0);
-			GRN_UINT64_SET(ctx, &(buffers->general), value->via.u64);
+			grn_obj_reinit(ctx, walValue, GRN_DB_UINT64, 0);
+			GRN_UINT64_SET(ctx, walValue, value->via.u64);
 			break;
 		case MSGPACK_OBJECT_NEGATIVE_INTEGER:
-			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_INT64, 0);
-			GRN_INT64_SET(ctx, &(buffers->general), value->via.i64);
+			grn_obj_reinit(ctx, walValue, GRN_DB_INT64, 0);
+			GRN_INT64_SET(ctx, walValue, value->via.i64);
 			break;
 		case MSGPACK_OBJECT_FLOAT:
-			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_FLOAT, 0);
-			GRN_FLOAT_SET(ctx, &(buffers->general), value->via.f64);
+			grn_obj_reinit(ctx, walValue, GRN_DB_FLOAT, 0);
+			GRN_FLOAT_SET(ctx, walValue, value->via.f64);
 			break;
 		case MSGPACK_OBJECT_STR:
-			grn_obj_reinit(ctx, &(buffers->general), GRN_DB_TEXT, 0);
-			GRN_TEXT_SET(ctx, &(buffers->general),
+			grn_obj_reinit(ctx, walValue, GRN_DB_TEXT, 0);
+			GRN_TEXT_SET(ctx, walValue,
 						 value->via.str.ptr,
 						 value->via.str.size);
 			break;
@@ -886,12 +1015,13 @@ PGrnWALApplyInsert(PGrnWALApplyData *data,
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("pgroonga: WAL: apply: insert: "
+					 errmsg("pgroonga: WAL: apply: %s: "
 							"unexpected value type: <%#x>",
+							context,
 							value->type)));
 			break;
 		}
-		grn_obj_set_value(ctx, column, id, &(buffers->general), GRN_OBJ_SET);
+		grn_obj_set_value(ctx, column, id, walValue, GRN_OBJ_SET);
 	}
 }
 
@@ -900,7 +1030,52 @@ PGrnWALApplyCreateTable(PGrnWALApplyData *data,
 						msgpack_object_map *map,
 						uint32_t currentElement)
 {
-	/* TODO */
+	const char *context = "create table";
+	const char *name = NULL;
+	size_t nameSize = 0;
+	grn_table_flags flags = 0;
+	grn_obj *type = NULL;
+	grn_obj *tokenizer = NULL;
+	grn_obj *normalizer = NULL;
+	uint32_t i;
+	grn_obj *table;
+
+	for (i = currentElement; i < map->size; i++)
+	{
+		msgpack_object_kv *kv;
+
+		kv = &(map->ptr[i]);
+		if (PGrnWALApplyKeyEqual(context, &(kv->key), "name"))
+		{
+			PGrnWALApplyValueGetString(context, kv, &name, &nameSize);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "flags"))
+		{
+			flags = PGrnWALApplyValueGetPositiveInteger(context, kv);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "type"))
+		{
+			type = PGrnWALApplyValueGetGroongaObject(context, kv);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "tokenizer"))
+		{
+			tokenizer = PGrnWALApplyValueGetGroongaObject(context, kv);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "normalizer"))
+		{
+			normalizer = PGrnWALApplyValueGetGroongaObject(context, kv);
+		}
+	}
+
+	table = PGrnCreateTableWithSize(name, nameSize, flags, type);
+	if (tokenizer)
+	{
+		grn_obj_set_info(ctx, table, GRN_INFO_DEFAULT_TOKENIZER, tokenizer);
+	}
+	if (normalizer)
+	{
+		grn_obj_set_info(ctx, table, GRN_INFO_NORMALIZER, normalizer);
+	}
 }
 
 static void
@@ -908,7 +1083,38 @@ PGrnWALApplyCreateColumn(PGrnWALApplyData *data,
 						 msgpack_object_map *map,
 						 uint32_t currentElement)
 {
-	/* TODO */
+	const char *context = "create column";
+	grn_obj *table = NULL;
+	const char *name = NULL;
+	size_t nameSize = 0;
+	grn_column_flags flags = 0;
+	grn_obj *type = NULL;
+	uint32_t i;
+
+	for (i = currentElement; i < map->size; i++)
+	{
+		msgpack_object_kv *kv;
+
+		kv = &(map->ptr[i]);
+		if (PGrnWALApplyKeyEqual(context, &(kv->key), "table"))
+		{
+			table = PGrnWALApplyValueGetGroongaObject(context, kv);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "name"))
+		{
+			PGrnWALApplyValueGetString(context, kv, &name, &nameSize);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "flags"))
+		{
+			flags = PGrnWALApplyValueGetPositiveInteger(context, kv);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "type"))
+		{
+			type = PGrnWALApplyValueGetGroongaObject(context, kv);
+		}
+	}
+
+	PGrnCreateColumnWithSize(table, name, nameSize, flags, type);
 }
 
 static void
@@ -916,12 +1122,34 @@ PGrnWALApplySetSources(PGrnWALApplyData *data,
 					   msgpack_object_map *map,
 					   uint32_t currentElement)
 {
-	/* TODO */
+	const char *context = "set sources";
+	grn_obj *column = NULL;
+	grn_obj *sourceIDs = &(buffers->sourceIDs);
+	uint32_t i;
+
+	GRN_BULK_REWIND(sourceIDs);
+	for (i = currentElement; i < map->size; i++)
+	{
+		msgpack_object_kv *kv;
+
+		kv = &(map->ptr[i]);
+		if (PGrnWALApplyKeyEqual(context, &(kv->key), "column"))
+		{
+			column = PGrnWALApplyValueGetGroongaObject(context, kv);
+		}
+		else if (PGrnWALApplyKeyEqual(context, &(kv->key), "sources"))
+		{
+			PGrnWALApplyValueGetGroongaObjectIDs(context, kv, sourceIDs);
+		}
+	}
+
+	grn_obj_set_info(ctx, column, GRN_INFO_SOURCE, sourceIDs);
 }
 
 static void
 PGrnWALApplyObject(PGrnWALApplyData *data, msgpack_object *object)
 {
+	const char *context = NULL;
 	msgpack_object_map *map;
 	uint32_t currentElement = 0;
 	PGrnWALAction action = PGRN_WAL_ACTION_INSERT;
@@ -938,24 +1166,12 @@ PGrnWALApplyObject(PGrnWALApplyData *data, msgpack_object *object)
 
 	if (currentElement < map->size)
 	{
-		msgpack_object *key;
+		msgpack_object_kv *kv;
 
-		key = &(object->via.map.ptr[currentElement].key);
-		if (PGrnWALApplyKeyEqual(key, "_action"))
+		kv = &(object->via.map.ptr[currentElement]);
+		if (PGrnWALApplyKeyEqual(context, &(kv->key), "_action"))
 		{
-			msgpack_object *value;
-
-			value = &(object->via.map.ptr[currentElement].val);
-			if (value->type != MSGPACK_OBJECT_POSITIVE_INTEGER)
-			{
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("pgroonga: WAL: apply: "
-								"_action value must be positive integer: "
-								"<%#x>",
-								value->type)));
-			}
-			action = value->via.u64;
+			action = PGrnWALApplyValueGetPositiveInteger(context, kv);
 			currentElement++;
 		}
 	}
@@ -1059,14 +1275,15 @@ PGrnWALApply(Relation index)
 
 	LockRelation(index, RowExclusiveLock);
 	{
-		grn_obj *position = &(buffers->general);
+		grn_obj *position = &(buffers->walPosition);
 
-		grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0);
+		GRN_BULK_REWIND(position);
 		grn_obj_get_value(ctx, data.currentColumn, data.statusID, position);
 		PGrnWALUnpackPosition(GRN_UINT64_VALUE(position),
 							   &(data.current.block),
 							   &(data.current.offset));
 	}
+	data.sources = NULL;
 	PGrnWALApplyConsume(&data);
 	UnlockRelation(index, RowExclusiveLock);
 #endif

  Modified: src/pgrn_wal.h (+4 -4)
===================================================================
--- src/pgrn_wal.h    2016-10-28 14:42:48 +0900 (1e559c1)
+++ src/pgrn_wal.h    2016-10-28 14:42:57 +0900 (b25f1f8)
@@ -20,12 +20,12 @@ void PGrnWALInsertFinish(PGrnWALData *data);
 void PGrnWALInsertColumnStart(PGrnWALData *data, const char *name);
 void PGrnWALInsertColumnFinish(PGrnWALData *data);
 void PGrnWALInsertColumn(PGrnWALData *data,
-						  const char *name,
-						  grn_obj *value);
+						 const char *name,
+						 grn_obj *value);
 
 void PGrnWALCreateTable(Relation index,
 						const char *name,
-						grn_obj_flags flags,
+						grn_table_flags flags,
 						grn_obj *type,
 						grn_obj *tokenizer,
 						grn_obj *normalizer);
@@ -33,7 +33,7 @@ void PGrnWALCreateTable(Relation index,
 void PGrnWALCreateColumn(Relation index,
 						 grn_obj *table,
 						 const char *name,
-						 grn_obj_flags flags,
+						 grn_column_flags flags,
 						 grn_obj *type);
 
 void PGrnWALSetSource(Relation index,
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index