Kouhei Sutou
null+****@clear*****
Fri Oct 28 14:42:57 JST 2016
Kouhei Sutou 2016-10-28 14:42:57 +0900 (Fri, 28 Oct 2016) New Revision: 2b944dcc70b3b8972aec9aa65419667ffaf581f4 https://github.com/pgroonga/pgroonga/commit/2b944dcc70b3b8972aec9aa65419667ffaf581f4 Message: Support schema replication Modified files: src/pgrn_global.c src/pgrn_global.h src/pgrn_wal.c src/pgrn_wal.h Modified: src/pgrn_global.c (+4 -0) =================================================================== --- src/pgrn_global.c 2016-10-28 14:42:48 +0900 (7a1b7f4) +++ src/pgrn_global.c 2016-10-28 14:42:57 +0900 (4d26dbb) @@ -17,6 +17,8 @@ PGrnInitializeBuffers(void) GRN_UINT64_INIT(&(PGrnBuffers.ctid), 0); GRN_FLOAT_INIT(&(PGrnBuffers.score), 0); GRN_RECORD_INIT(&(PGrnBuffers.sourceIDs), GRN_OBJ_VECTOR, GRN_ID_NIL); + GRN_UINT64_INIT(&(PGrnBuffers.walPosition), 0); + GRN_VOID_INIT(&(PGrnBuffers.walValue)); GRN_TEXT_INIT(&(PGrnBuffers.head), 0); GRN_TEXT_INIT(&(PGrnBuffers.body), 0); GRN_TEXT_INIT(&(PGrnBuffers.foot), 0); @@ -33,6 +35,8 @@ PGrnFinalizeBuffers(void) GRN_OBJ_FIN(ctx, &(PGrnBuffers.ctid)); GRN_OBJ_FIN(ctx, &(PGrnBuffers.score)); GRN_OBJ_FIN(ctx, &(PGrnBuffers.sourceIDs)); + GRN_OBJ_FIN(ctx, &(PGrnBuffers.walPosition)); + GRN_OBJ_FIN(ctx, &(PGrnBuffers.walValue)); GRN_OBJ_FIN(ctx, &(PGrnBuffers.head)); GRN_OBJ_FIN(ctx, &(PGrnBuffers.body)); GRN_OBJ_FIN(ctx, &(PGrnBuffers.foot)); Modified: src/pgrn_global.h (+2 -0) =================================================================== --- src/pgrn_global.h 2016-10-28 14:42:48 +0900 (e00abf9) +++ src/pgrn_global.h 2016-10-28 14:42:57 +0900 (79a2e4a) @@ -11,6 +11,8 @@ struct PGrnBuffers grn_obj ctid; grn_obj score; grn_obj sourceIDs; + grn_obj walPosition; + grn_obj walValue; grn_obj head; grn_obj body; grn_obj foot; Modified: src/pgrn_wal.c (+274 -57) =================================================================== --- src/pgrn_wal.c 2016-10-28 14:42:48 +0900 (ff398a2) +++ src/pgrn_wal.c 2016-10-28 14:42:57 +0900 (2f88567) @@ -168,7 +168,7 @@ PGrnWALUpdateStatus(Relation index, uint32_t oid; grn_id id; uint64_t positionRaw; - grn_obj *position = &(buffers->general); + grn_obj *position = &(buffers->walPosition); PGrnWALEnsureStatusesTable(); @@ -179,7 +179,7 @@ PGrnWALUpdateStatus(Relation index, oid = RelationGetRelid(index); id = grn_table_add(ctx, statusesTable, &oid, sizeof(uint32_t), NULL); positionRaw = PGrnWALPackPosition(block, offset); - grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0); + GRN_BULK_REWIND(position); GRN_UINT64_SET(ctx, position, positionRaw); grn_obj_set_value(ctx, currentColumn, id, position, GRN_OBJ_SET); } @@ -536,7 +536,7 @@ PGrnWALInsertColumn(PGrnWALData *data, void PGrnWALCreateTable(Relation index, const char *name, - grn_obj_flags flags, + grn_table_flags flags, grn_obj *type, grn_obj *tokenizer, grn_obj *normalizer) @@ -580,7 +580,7 @@ void PGrnWALCreateColumn(Relation index, grn_obj *table, const char *name, - grn_obj_flags flags, + grn_column_flags flags, grn_obj *type) { #ifdef PGRN_SUPPORT_WAL @@ -710,8 +710,8 @@ PGrnWALApplyNeeded(PGrnWALApplyData *data) BlockNumber nBlocks; { - grn_obj *position = &(buffers->general); - grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0); + grn_obj *position = &(buffers->walPosition); + GRN_BULK_REWIND(position); grn_obj_get_value(ctx, data->currentColumn, data->statusID, position); PGrnWALUnpackPosition(GRN_UINT64_VALUE(position), ¤tBlock, @@ -743,7 +743,9 @@ PGrnWALApplyNeeded(PGrnWALApplyData *data) } static bool -PGrnWALApplyKeyEqual(msgpack_object *key, const char *name) +PGrnWALApplyKeyEqual(const char *context, + msgpack_object *key, + const char *name) { size_t nameSize; @@ -751,7 +753,10 @@ PGrnWALApplyKeyEqual(msgpack_object *key, const char *name) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("pgroonga: WAL: apply: key must be map: <%#x>", + errmsg("pgroonga: WAL: apply: %s%s" + "key must be map: <%#x>", + context ? context : "", + context ? ": " : "", key->type))); } @@ -764,11 +769,143 @@ PGrnWALApplyKeyEqual(msgpack_object *key, const char *name) return true; } +static uint64_t +PGrnWALApplyValueGetPositiveInteger(const char *context, + msgpack_object_kv *kv) +{ + if (kv->val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: WAL: apply: %s%s" + "%.*s value must be positive integer: " + "<%#x>", + context ? context : "", + context ? ": " : "", + kv->key.via.str.size, + kv->key.via.str.ptr, + kv->val.type))); + } + + return kv->val.via.u64; +} + +static void +PGrnWALApplyValueGetString(const char *context, + msgpack_object_kv *kv, + const char **string, + size_t *stringSize) +{ + if (kv->val.type != MSGPACK_OBJECT_STR) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: WAL: apply: %s%s" + "%.*s value must be string: " + "<%#x>", + context ? context : "", + context ? ": " : "", + kv->key.via.str.size, + kv->key.via.str.ptr, + kv->val.type))); + } + + *string = kv->val.via.str.ptr; + *stringSize = kv->val.via.str.size; +} + +static grn_obj * +PGrnWALApplyValueGetGroongaObject(const char *context, + msgpack_object_kv *kv) +{ + grn_obj *object = NULL; + + switch (kv->val.type) + { + case MSGPACK_OBJECT_NIL: + object = NULL; + break; + case MSGPACK_OBJECT_STR: + object = PGrnLookupWithSize(kv->val.via.str.ptr, + kv->val.via.str.size, + ERROR); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: WAL: apply: %s%s" + "%.*s value must be nil or string: " + "<%#x>", + context ? context : "", + context ? ": " : "", + kv->key.via.str.size, + kv->key.via.str.ptr, + kv->val.type))); + break; + } + + return object; +} + +static void +PGrnWALApplyValueGetGroongaObjectIDs(const char *context, + msgpack_object_kv *kv, + grn_obj *ids) +{ + msgpack_object_array *array; + uint32_t i; + + if (kv->val.type != MSGPACK_OBJECT_ARRAY) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: WAL: apply: %s%s" + "%.*s value must be array: " + "<%#x>", + context ? context : "", + context ? ": " : "", + kv->key.via.str.size, + kv->key.via.str.ptr, + kv->val.type))); + } + + array = &(kv->val.via.array); + for (i = 0; i < array->size; i++) + { + msgpack_object *element; + grn_obj *object; + grn_id objectID; + + element = &(array->ptr[i]); + if (element->type != MSGPACK_OBJECT_STR) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pgroonga: WAL: apply: %s%s" + "%.*s value must be array of string: " + "[%u]=<%#x>", + context ? context : "", + context ? ": " : "", + kv->key.via.str.size, + kv->key.via.str.ptr, + i, + element->type))); + } + + object = PGrnLookupWithSize(element->via.str.ptr, + element->via.str.size, + ERROR); + objectID = grn_obj_id(ctx, object); + GRN_RECORD_PUT(ctx, ids, objectID); + } +} + static void PGrnWALApplyInsert(PGrnWALApplyData *data, msgpack_object_map *map, uint32_t currentElement) { + const char *context = "insert"; grn_obj *table = NULL; const char *key = NULL; size_t keySize = 0; @@ -780,20 +917,9 @@ PGrnWALApplyInsert(PGrnWALApplyData *data, msgpack_object_kv *kv; kv = &(map->ptr[currentElement]); - if (PGrnWALApplyKeyEqual(&(kv->key), "_table")) + if (PGrnWALApplyKeyEqual(context, &(kv->key), "_table")) { - if (kv->val.type != MSGPACK_OBJECT_STR) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("pgroonga: WAL: apply: insert: " - "_table value must be string: " - "<%#x>", - kv->val.type))); - } - table = PGrnLookupWithSize(kv->val.via.str.ptr, - kv->val.via.str.size, - ERROR); + table = PGrnWALApplyValueGetGroongaObject(context, kv); currentElement++; } } @@ -809,15 +935,16 @@ PGrnWALApplyInsert(PGrnWALApplyData *data, msgpack_object_kv *kv; kv = &(map->ptr[currentElement]); - if (PGrnWALApplyKeyEqual(&(kv->key), "_key")) + if (PGrnWALApplyKeyEqual(context, &(kv->key), "_key")) { if (kv->val.type != MSGPACK_OBJECT_BIN) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("pgroonga: WAL: apply: insert: " + errmsg("pgroonga: WAL: apply: %s: " "_key value must be binary: " "<%#x>", + context, kv->val.type))); } key = kv->val.via.bin.ptr; @@ -832,6 +959,7 @@ PGrnWALApplyInsert(PGrnWALApplyData *data, msgpack_object *key; msgpack_object *value; grn_obj *column; + grn_obj *walValue = &(buffers->walValue); key = &(map->ptr[i].key); value = &(map->ptr[i].val); @@ -840,8 +968,9 @@ PGrnWALApplyInsert(PGrnWALApplyData *data, { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("pgroonga: WAL: apply: insert: " + errmsg("pgroonga: WAL: apply: %s: " "key must be map: <%#x>", + context, key->type))); } @@ -852,24 +981,24 @@ PGrnWALApplyInsert(PGrnWALApplyData *data, switch (value->type) { case MSGPACK_OBJECT_BOOLEAN: - grn_obj_reinit(ctx, &(buffers->general), GRN_DB_BOOL, 0); - GRN_BOOL_SET(ctx, &(buffers->general), value->via.boolean); + grn_obj_reinit(ctx, walValue, GRN_DB_BOOL, 0); + GRN_BOOL_SET(ctx, walValue, value->via.boolean); break; case MSGPACK_OBJECT_POSITIVE_INTEGER: - grn_obj_reinit(ctx, &(buffers->general), GRN_DB_UINT64, 0); - GRN_UINT64_SET(ctx, &(buffers->general), value->via.u64); + grn_obj_reinit(ctx, walValue, GRN_DB_UINT64, 0); + GRN_UINT64_SET(ctx, walValue, value->via.u64); break; case MSGPACK_OBJECT_NEGATIVE_INTEGER: - grn_obj_reinit(ctx, &(buffers->general), GRN_DB_INT64, 0); - GRN_INT64_SET(ctx, &(buffers->general), value->via.i64); + grn_obj_reinit(ctx, walValue, GRN_DB_INT64, 0); + GRN_INT64_SET(ctx, walValue, value->via.i64); break; case MSGPACK_OBJECT_FLOAT: - grn_obj_reinit(ctx, &(buffers->general), GRN_DB_FLOAT, 0); - GRN_FLOAT_SET(ctx, &(buffers->general), value->via.f64); + grn_obj_reinit(ctx, walValue, GRN_DB_FLOAT, 0); + GRN_FLOAT_SET(ctx, walValue, value->via.f64); break; case MSGPACK_OBJECT_STR: - grn_obj_reinit(ctx, &(buffers->general), GRN_DB_TEXT, 0); - GRN_TEXT_SET(ctx, &(buffers->general), + grn_obj_reinit(ctx, walValue, GRN_DB_TEXT, 0); + GRN_TEXT_SET(ctx, walValue, value->via.str.ptr, value->via.str.size); break; @@ -886,12 +1015,13 @@ PGrnWALApplyInsert(PGrnWALApplyData *data, default: ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("pgroonga: WAL: apply: insert: " + errmsg("pgroonga: WAL: apply: %s: " "unexpected value type: <%#x>", + context, value->type))); break; } - grn_obj_set_value(ctx, column, id, &(buffers->general), GRN_OBJ_SET); + grn_obj_set_value(ctx, column, id, walValue, GRN_OBJ_SET); } } @@ -900,7 +1030,52 @@ PGrnWALApplyCreateTable(PGrnWALApplyData *data, msgpack_object_map *map, uint32_t currentElement) { - /* TODO */ + const char *context = "create table"; + const char *name = NULL; + size_t nameSize = 0; + grn_table_flags flags = 0; + grn_obj *type = NULL; + grn_obj *tokenizer = NULL; + grn_obj *normalizer = NULL; + uint32_t i; + grn_obj *table; + + for (i = currentElement; i < map->size; i++) + { + msgpack_object_kv *kv; + + kv = &(map->ptr[i]); + if (PGrnWALApplyKeyEqual(context, &(kv->key), "name")) + { + PGrnWALApplyValueGetString(context, kv, &name, &nameSize); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "flags")) + { + flags = PGrnWALApplyValueGetPositiveInteger(context, kv); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "type")) + { + type = PGrnWALApplyValueGetGroongaObject(context, kv); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "tokenizer")) + { + tokenizer = PGrnWALApplyValueGetGroongaObject(context, kv); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "normalizer")) + { + normalizer = PGrnWALApplyValueGetGroongaObject(context, kv); + } + } + + table = PGrnCreateTableWithSize(name, nameSize, flags, type); + if (tokenizer) + { + grn_obj_set_info(ctx, table, GRN_INFO_DEFAULT_TOKENIZER, tokenizer); + } + if (normalizer) + { + grn_obj_set_info(ctx, table, GRN_INFO_NORMALIZER, normalizer); + } } static void @@ -908,7 +1083,38 @@ PGrnWALApplyCreateColumn(PGrnWALApplyData *data, msgpack_object_map *map, uint32_t currentElement) { - /* TODO */ + const char *context = "create column"; + grn_obj *table = NULL; + const char *name = NULL; + size_t nameSize = 0; + grn_column_flags flags = 0; + grn_obj *type = NULL; + uint32_t i; + + for (i = currentElement; i < map->size; i++) + { + msgpack_object_kv *kv; + + kv = &(map->ptr[i]); + if (PGrnWALApplyKeyEqual(context, &(kv->key), "table")) + { + table = PGrnWALApplyValueGetGroongaObject(context, kv); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "name")) + { + PGrnWALApplyValueGetString(context, kv, &name, &nameSize); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "flags")) + { + flags = PGrnWALApplyValueGetPositiveInteger(context, kv); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "type")) + { + type = PGrnWALApplyValueGetGroongaObject(context, kv); + } + } + + PGrnCreateColumnWithSize(table, name, nameSize, flags, type); } static void @@ -916,12 +1122,34 @@ PGrnWALApplySetSources(PGrnWALApplyData *data, msgpack_object_map *map, uint32_t currentElement) { - /* TODO */ + const char *context = "set sources"; + grn_obj *column = NULL; + grn_obj *sourceIDs = &(buffers->sourceIDs); + uint32_t i; + + GRN_BULK_REWIND(sourceIDs); + for (i = currentElement; i < map->size; i++) + { + msgpack_object_kv *kv; + + kv = &(map->ptr[i]); + if (PGrnWALApplyKeyEqual(context, &(kv->key), "column")) + { + column = PGrnWALApplyValueGetGroongaObject(context, kv); + } + else if (PGrnWALApplyKeyEqual(context, &(kv->key), "sources")) + { + PGrnWALApplyValueGetGroongaObjectIDs(context, kv, sourceIDs); + } + } + + grn_obj_set_info(ctx, column, GRN_INFO_SOURCE, sourceIDs); } static void PGrnWALApplyObject(PGrnWALApplyData *data, msgpack_object *object) { + const char *context = NULL; msgpack_object_map *map; uint32_t currentElement = 0; PGrnWALAction action = PGRN_WAL_ACTION_INSERT; @@ -938,24 +1166,12 @@ PGrnWALApplyObject(PGrnWALApplyData *data, msgpack_object *object) if (currentElement < map->size) { - msgpack_object *key; + msgpack_object_kv *kv; - key = &(object->via.map.ptr[currentElement].key); - if (PGrnWALApplyKeyEqual(key, "_action")) + kv = &(object->via.map.ptr[currentElement]); + if (PGrnWALApplyKeyEqual(context, &(kv->key), "_action")) { - msgpack_object *value; - - value = &(object->via.map.ptr[currentElement].val); - if (value->type != MSGPACK_OBJECT_POSITIVE_INTEGER) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("pgroonga: WAL: apply: " - "_action value must be positive integer: " - "<%#x>", - value->type))); - } - action = value->via.u64; + action = PGrnWALApplyValueGetPositiveInteger(context, kv); currentElement++; } } @@ -1059,14 +1275,15 @@ PGrnWALApply(Relation index) LockRelation(index, RowExclusiveLock); { - grn_obj *position = &(buffers->general); + grn_obj *position = &(buffers->walPosition); - grn_obj_reinit(ctx, position, GRN_DB_UINT64, 0); + GRN_BULK_REWIND(position); grn_obj_get_value(ctx, data.currentColumn, data.statusID, position); PGrnWALUnpackPosition(GRN_UINT64_VALUE(position), &(data.current.block), &(data.current.offset)); } + data.sources = NULL; PGrnWALApplyConsume(&data); UnlockRelation(index, RowExclusiveLock); #endif Modified: src/pgrn_wal.h (+4 -4) =================================================================== --- src/pgrn_wal.h 2016-10-28 14:42:48 +0900 (1e559c1) +++ src/pgrn_wal.h 2016-10-28 14:42:57 +0900 (b25f1f8) @@ -20,12 +20,12 @@ void PGrnWALInsertFinish(PGrnWALData *data); void PGrnWALInsertColumnStart(PGrnWALData *data, const char *name); void PGrnWALInsertColumnFinish(PGrnWALData *data); void PGrnWALInsertColumn(PGrnWALData *data, - const char *name, - grn_obj *value); + const char *name, + grn_obj *value); void PGrnWALCreateTable(Relation index, const char *name, - grn_obj_flags flags, + grn_table_flags flags, grn_obj *type, grn_obj *tokenizer, grn_obj *normalizer); @@ -33,7 +33,7 @@ void PGrnWALCreateTable(Relation index, void PGrnWALCreateColumn(Relation index, grn_obj *table, const char *name, - grn_obj_flags flags, + grn_column_flags flags, grn_obj *type); void PGrnWALSetSource(Relation index, -------------- next part -------------- HTML����������������������������...Download