Kouhei Sutou
null+****@clear*****
Thu Sep 24 17:29:28 JST 2015
Kouhei Sutou 2015-09-24 17:29:28 +0900 (Thu, 24 Sep 2015) New Revision: 7c02d77ac47096fe2053b84ff65532f5cc674f9e https://github.com/groonga/groonga/commit/7c02d77ac47096fe2053b84ff65532f5cc674f9e Message: Ensure locking mutex Modified files: lib/grn.h src/groonga.c Modified: lib/grn.h (+23 -7) =================================================================== --- lib/grn.h 2015-09-19 00:47:14 +0900 (a654c39) +++ lib/grn.h 2015-09-24 17:29:28 +0900 (c5151db) @@ -222,9 +222,10 @@ typedef void * grn_thread_func_result; (pthread_create(&(thread), NULL, (func), (arg))) # define THREAD_JOIN(thread) (pthread_join(thread, NULL)) typedef pthread_mutex_t grn_mutex; -# define MUTEX_INIT(m) pthread_mutex_init(&m, NULL) -# define MUTEX_LOCK(m) pthread_mutex_lock(&m) -# define MUTEX_UNLOCK(m) pthread_mutex_unlock(&m) +# define MUTEX_INIT(m) pthread_mutex_init(&m, NULL) +# define MUTEX_LOCK(m) pthread_mutex_lock(&m) +# define MUTEX_LOCK_CHECK(m) (MUTEX_LOCK(m) == 0) +# define MUTEX_UNLOCK(m) pthread_mutex_unlock(&m) # define MUTEX_FIN(m) # ifdef HAVE_PTHREAD_MUTEXATTR_SETPSHARED # define MUTEX_INIT_SHARED(m) do {\ @@ -315,10 +316,11 @@ typedef unsigned int grn_thread_func_result; # define THREAD_JOIN(thread) \ (WaitForSingleObject((HANDLE)(thread), INFINITE) == WAIT_FAILED) typedef HANDLE grn_mutex; -# define MUTEX_INIT(m) ((m) = CreateMutex(0, FALSE, NULL)) -# define MUTEX_LOCK(m) WaitForSingleObject((m), INFINITE) -# define MUTEX_UNLOCK(m) ReleaseMutex(m) -# define MUTEX_FIN(m) CloseHandle(m) +# define MUTEX_INIT(m) ((m) = CreateMutex(0, FALSE, NULL)) +# define MUTEX_LOCK(m) WaitForSingleObject((m), INFINITE) +# define MUTEX_LOCK_CHECK(m) (MUTEX_LOCK(m) == WAIT_OBJECT_0) +# define MUTEX_UNLOCK(m) ReleaseMutex(m) +# define MUTEX_FIN(m) CloseHandle(m) typedef CRITICAL_SECTION grn_critical_section; # define CRITICAL_SECTION_INIT(cs) InitializeCriticalSection(&(cs)) # define CRITICAL_SECTION_ENTER(cs) EnterCriticalSection(&(cs)) @@ -412,6 +414,20 @@ typedef int grn_cond; #endif /* HAVE_PTHREAD_H */ +#define MUTEX_LOCK_ENSURE(ctx_, mutex) do { \ + grn_ctx *ctx__ = (ctx_); \ + do { \ + grn_ctx *ctx = ctx__; \ + if (MUTEX_LOCK_CHECK(mutex)) { \ + break; \ + } \ + if (ctx) { \ + SERR("MUTEX_LOCK"); \ + } \ + grn_nanosleep(1000000); \ + } while (GRN_TRUE); \ +} while (GRN_FALSE) + /* format string for printf */ #ifdef HAVE_INTTYPES_H # include <inttypes.h> Modified: src/groonga.c (+10 -10) =================================================================== --- src/groonga.c 2015-09-19 00:47:14 +0900 (38820e0) +++ src/groonga.c 2015-09-24 17:29:28 +0900 (6b27d21) @@ -491,14 +491,14 @@ groonga_set_thread_limit(uint32_t new_limit, void *data) uint32_t i; uint32_t current_nfthreads; - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(NULL, q_mutex); current_nfthreads = nfthreads; max_nfthreads = new_limit; MUTEX_UNLOCK(q_mutex); if (current_nfthreads > new_limit) { for (i = 0; i < current_nfthreads; i++) { - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(NULL, q_mutex); COND_SIGNAL(q_cond); MUTEX_UNLOCK(q_mutex); } @@ -657,7 +657,7 @@ run_server_loop(grn_ctx *ctx, grn_com_event *ev) /* todo : log stat */ } for (;;) { - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(ctx, q_mutex); if (nthreads == nfthreads) { break; } MUTEX_UNLOCK(q_mutex); grn_nanosleep(1000000); @@ -1944,7 +1944,7 @@ h_worker(void *arg) grn_ctx_init(ctx, 0); grn_ctx_use(ctx, (grn_obj *)arg); grn_ctx_recv_handler_set(ctx, h_output, &hc); - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(ctx, q_mutex); GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, "thread start (%d/%d)", nfthreads, nthreads); do { grn_obj *msg; @@ -1966,7 +1966,7 @@ h_worker(void *arg) hc.in_body = GRN_FALSE; hc.is_chunked = GRN_FALSE; do_htreq(ctx, (grn_msg *)msg); - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(ctx, q_mutex); } while (nfthreads < max_nfthreads && grn_gctx.stat != GRN_CTX_QUIT); exit : nthreads--; @@ -1989,7 +1989,7 @@ h_handler(grn_ctx *ctx, grn_obj *msg) /* if not keep alive connection */ grn_com_event_del(ctx, com->ev, fd); ((grn_msg *)msg)->u.fd = fd; - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(ctx, q_mutex); grn_com_queue_enque(ctx, &ctx_new, (grn_com_queue_entry *)msg); if (!nfthreads && nthreads < max_nfthreads) { grn_thread thread; @@ -2021,7 +2021,7 @@ h_server(char *path) static grn_thread_func_result CALLBACK g_worker(void *arg) { - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(NULL, q_mutex); GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, "thread start (%d/%d)", nfthreads, nthreads); do { grn_ctx *ctx; @@ -2068,7 +2068,7 @@ g_worker(void *arg) while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) { grn_msg_close(ctx, msg); } - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(ctx, q_mutex); if (ctx->stat == GRN_CTX_QUIT || edge->stat == EDGE_ABORT) { break; } } } @@ -2089,7 +2089,7 @@ exit : static void g_dispatcher(grn_ctx *ctx, grn_edge *edge) { - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(ctx, q_mutex); if (edge->stat == EDGE_IDLE) { grn_com_queue_enque(ctx, &ctx_new, (grn_com_queue_entry *)edge); edge->stat = EDGE_WAIT; @@ -2130,7 +2130,7 @@ g_handler(grn_ctx *ctx, grn_obj *msg) if (ctx->rc) { if (com->has_sid) { if ((edge = com->opaque)) { - MUTEX_LOCK(q_mutex); + MUTEX_LOCK_ENSURE(ctx, q_mutex); if (edge->stat == EDGE_IDLE) { grn_com_queue_enque(ctx, &ctx_old, (grn_com_queue_entry *)edge); } -------------- next part -------------- HTML����������������������������...Download