Skip to content

Commit e2a1bf4

Browse files
committed
Merge pull request grpc#3726 from dgquintas/iomgr_executor
Introducing grpc_executor, for all your threading needs
2 parents 8093b57 + 661ad7f commit e2a1bf4

File tree

19 files changed

+286
-29
lines changed

19 files changed

+286
-29
lines changed

BUILD

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ cc_library(
185185
"src/core/iomgr/endpoint.h",
186186
"src/core/iomgr/endpoint_pair.h",
187187
"src/core/iomgr/exec_ctx.h",
188+
"src/core/iomgr/executor.h",
188189
"src/core/iomgr/fd_posix.h",
189190
"src/core/iomgr/iocp_windows.h",
190191
"src/core/iomgr/iomgr.h",
@@ -321,6 +322,7 @@ cc_library(
321322
"src/core/iomgr/endpoint_pair_posix.c",
322323
"src/core/iomgr/endpoint_pair_windows.c",
323324
"src/core/iomgr/exec_ctx.c",
325+
"src/core/iomgr/executor.c",
324326
"src/core/iomgr/fd_posix.c",
325327
"src/core/iomgr/iocp_windows.c",
326328
"src/core/iomgr/iomgr.c",
@@ -469,6 +471,7 @@ cc_library(
469471
"src/core/iomgr/endpoint.h",
470472
"src/core/iomgr/endpoint_pair.h",
471473
"src/core/iomgr/exec_ctx.h",
474+
"src/core/iomgr/executor.h",
472475
"src/core/iomgr/fd_posix.h",
473476
"src/core/iomgr/iocp_windows.h",
474477
"src/core/iomgr/iomgr.h",
@@ -585,6 +588,7 @@ cc_library(
585588
"src/core/iomgr/endpoint_pair_posix.c",
586589
"src/core/iomgr/endpoint_pair_windows.c",
587590
"src/core/iomgr/exec_ctx.c",
591+
"src/core/iomgr/executor.c",
588592
"src/core/iomgr/fd_posix.c",
589593
"src/core/iomgr/iocp_windows.c",
590594
"src/core/iomgr/iomgr.c",
@@ -1111,6 +1115,7 @@ objc_library(
11111115
"src/core/iomgr/endpoint_pair_posix.c",
11121116
"src/core/iomgr/endpoint_pair_windows.c",
11131117
"src/core/iomgr/exec_ctx.c",
1118+
"src/core/iomgr/executor.c",
11141119
"src/core/iomgr/fd_posix.c",
11151120
"src/core/iomgr/iocp_windows.c",
11161121
"src/core/iomgr/iomgr.c",
@@ -1256,6 +1261,7 @@ objc_library(
12561261
"src/core/iomgr/endpoint.h",
12571262
"src/core/iomgr/endpoint_pair.h",
12581263
"src/core/iomgr/exec_ctx.h",
1264+
"src/core/iomgr/executor.h",
12591265
"src/core/iomgr/fd_posix.h",
12601266
"src/core/iomgr/iocp_windows.h",
12611267
"src/core/iomgr/iomgr.h",

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4082,6 +4082,7 @@ LIBGRPC_SRC = \
40824082
src/core/iomgr/endpoint_pair_posix.c \
40834083
src/core/iomgr/endpoint_pair_windows.c \
40844084
src/core/iomgr/exec_ctx.c \
4085+
src/core/iomgr/executor.c \
40854086
src/core/iomgr/fd_posix.c \
40864087
src/core/iomgr/iocp_windows.c \
40874088
src/core/iomgr/iomgr.c \
@@ -4362,6 +4363,7 @@ LIBGRPC_UNSECURE_SRC = \
43624363
src/core/iomgr/endpoint_pair_posix.c \
43634364
src/core/iomgr/endpoint_pair_windows.c \
43644365
src/core/iomgr/exec_ctx.c \
4366+
src/core/iomgr/executor.c \
43654367
src/core/iomgr/fd_posix.c \
43664368
src/core/iomgr/iocp_windows.c \
43674369
src/core/iomgr/iomgr.c \

binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@
191191
'src/core/iomgr/endpoint_pair_posix.c',
192192
'src/core/iomgr/endpoint_pair_windows.c',
193193
'src/core/iomgr/exec_ctx.c',
194+
'src/core/iomgr/executor.c',
194195
'src/core/iomgr/fd_posix.c',
195196
'src/core/iomgr/iocp_windows.c',
196197
'src/core/iomgr/iomgr.c',

build.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ filegroups:
141141
- src/core/iomgr/endpoint.h
142142
- src/core/iomgr/endpoint_pair.h
143143
- src/core/iomgr/exec_ctx.h
144+
- src/core/iomgr/executor.h
144145
- src/core/iomgr/fd_posix.h
145146
- src/core/iomgr/iocp_windows.h
146147
- src/core/iomgr/iomgr.h
@@ -254,6 +255,7 @@ filegroups:
254255
- src/core/iomgr/endpoint_pair_posix.c
255256
- src/core/iomgr/endpoint_pair_windows.c
256257
- src/core/iomgr/exec_ctx.c
258+
- src/core/iomgr/executor.c
257259
- src/core/iomgr/fd_posix.c
258260
- src/core/iomgr/iocp_windows.c
259261
- src/core/iomgr/iomgr.c

gRPC.podspec

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ Pod::Spec.new do |s|
189189
'src/core/iomgr/endpoint.h',
190190
'src/core/iomgr/endpoint_pair.h',
191191
'src/core/iomgr/exec_ctx.h',
192+
'src/core/iomgr/executor.h',
192193
'src/core/iomgr/fd_posix.h',
193194
'src/core/iomgr/iocp_windows.h',
194195
'src/core/iomgr/iomgr.h',
@@ -332,6 +333,7 @@ Pod::Spec.new do |s|
332333
'src/core/iomgr/endpoint_pair_posix.c',
333334
'src/core/iomgr/endpoint_pair_windows.c',
334335
'src/core/iomgr/exec_ctx.c',
336+
'src/core/iomgr/executor.c',
335337
'src/core/iomgr/fd_posix.c',
336338
'src/core/iomgr/iocp_windows.c',
337339
'src/core/iomgr/iomgr.c',
@@ -479,6 +481,7 @@ Pod::Spec.new do |s|
479481
'src/core/iomgr/endpoint.h',
480482
'src/core/iomgr/endpoint_pair.h',
481483
'src/core/iomgr/exec_ctx.h',
484+
'src/core/iomgr/executor.h',
482485
'src/core/iomgr/fd_posix.h',
483486
'src/core/iomgr/iocp_windows.h',
484487
'src/core/iomgr/iomgr.h',

src/core/iomgr/closure.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
7272
src->head = src->tail = NULL;
7373
}
7474

75+
grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) {
76+
grpc_closure *head;
77+
if (list->head == NULL) {
78+
return NULL;
79+
}
80+
head = list->head;
81+
list->head = list->head->next;
82+
return head;
83+
}
84+
7585
typedef struct {
7686
grpc_iomgr_cb_func cb;
7787
void *cb_arg;

src/core/iomgr/closure.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,18 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
8383
#define GRPC_CLOSURE_LIST_INIT \
8484
{ NULL, NULL }
8585

86+
/** add \a closure to the end of \a list and set \a closure's success to \a
87+
* success */
8688
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
8789
int success);
90+
91+
/** append all closures from \a src to \a dst and empty \a src. */
8892
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
93+
94+
/** pop (return and remove) the head closure from \a list. */
95+
grpc_closure *grpc_closure_list_pop(grpc_closure_list *list);
96+
97+
/** return whether \a list is empty. */
8998
int grpc_closure_list_empty(grpc_closure_list list);
9099

91100
#endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */

src/core/iomgr/executor.c

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
*
3+
* Copyright 2015, Google Inc.
4+
* All rights reserved.
5+
*
6+
* Redistribution and use in source and binary forms, with or without
7+
* modification, are permitted provided that the following conditions are
8+
* met:
9+
*
10+
* * Redistributions of source code must retain the above copyright
11+
* notice, this list of conditions and the following disclaimer.
12+
* * Redistributions in binary form must reproduce the above
13+
* copyright notice, this list of conditions and the following disclaimer
14+
* in the documentation and/or other materials provided with the
15+
* distribution.
16+
* * Neither the name of Google Inc. nor the names of its
17+
* contributors may be used to endorse or promote products derived from
18+
* this software without specific prior written permission.
19+
*
20+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31+
*
32+
*/
33+
34+
#include "src/core/iomgr/executor.h"
35+
36+
#include <string.h>
37+
38+
#include <grpc/support/alloc.h>
39+
#include <grpc/support/log.h>
40+
#include <grpc/support/sync.h>
41+
#include <grpc/support/thd.h>
42+
#include "src/core/iomgr/exec_ctx.h"
43+
44+
typedef struct grpc_executor_data {
45+
int busy; /**< is the thread currently running? */
46+
int shutting_down; /**< has \a grpc_shutdown() been invoked? */
47+
int pending_join; /**< has the thread finished but not been joined? */
48+
grpc_closure_list closures; /**< collection of pending work */
49+
gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
50+
pending_join are true */
51+
gpr_thd_options options;
52+
gpr_mu mu;
53+
} grpc_executor;
54+
55+
static grpc_executor g_executor;
56+
57+
void grpc_executor_init() {
58+
memset(&g_executor, 0, sizeof(grpc_executor));
59+
gpr_mu_init(&g_executor.mu);
60+
g_executor.options = gpr_thd_options_default();
61+
gpr_thd_options_set_joinable(&g_executor.options);
62+
}
63+
64+
/* thread body */
65+
static void closure_exec_thread_func(void *ignored) {
66+
grpc_closure *closure;
67+
68+
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
69+
while (1) {
70+
gpr_mu_lock(&g_executor.mu);
71+
if (g_executor.shutting_down != 0) {
72+
gpr_mu_unlock(&g_executor.mu);
73+
break;
74+
}
75+
closure = grpc_closure_list_pop(&g_executor.closures);
76+
if (closure == NULL) {
77+
/* no more work, time to die */
78+
GPR_ASSERT(g_executor.busy == 1);
79+
g_executor.busy = 0;
80+
gpr_mu_unlock(&g_executor.mu);
81+
break;
82+
}
83+
gpr_mu_unlock(&g_executor.mu);
84+
closure->cb(&exec_ctx, closure->cb_arg, closure->success);
85+
grpc_exec_ctx_flush(&exec_ctx);
86+
}
87+
grpc_exec_ctx_finish(&exec_ctx);
88+
}
89+
90+
/* Spawn the thread if new work has arrived a no thread is up */
91+
static void maybe_spawn_locked() {
92+
if (grpc_closure_list_empty(g_executor.closures) == 1) {
93+
return;
94+
}
95+
if (g_executor.shutting_down == 1) {
96+
return;
97+
}
98+
99+
if (g_executor.busy != 0) {
100+
/* Thread still working. New work will be picked up by already running
101+
* thread. Not spawning anything. */
102+
return;
103+
} else if (g_executor.pending_join != 0) {
104+
/* Pickup the remains of the previous incarnations of the thread. */
105+
gpr_thd_join(g_executor.tid);
106+
g_executor.pending_join = 0;
107+
}
108+
109+
/* All previous instances of the thread should have been joined at this point.
110+
* Spawn time! */
111+
g_executor.busy = 1;
112+
gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
113+
&g_executor.options);
114+
g_executor.pending_join = 1;
115+
}
116+
117+
void grpc_executor_enqueue(grpc_closure *closure, int success) {
118+
gpr_mu_lock(&g_executor.mu);
119+
if (g_executor.shutting_down == 0) {
120+
grpc_closure_list_add(&g_executor.closures, closure, success);
121+
maybe_spawn_locked();
122+
}
123+
gpr_mu_unlock(&g_executor.mu);
124+
}
125+
126+
void grpc_executor_shutdown() {
127+
int pending_join;
128+
grpc_closure *closure;
129+
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
130+
131+
gpr_mu_lock(&g_executor.mu);
132+
pending_join = g_executor.pending_join;
133+
g_executor.shutting_down = 1;
134+
gpr_mu_unlock(&g_executor.mu);
135+
/* we can release the lock at this point despite the access to the closure
136+
* list below because we aren't accepting new work */
137+
138+
/* Execute pending callbacks, some may be performing cleanups */
139+
while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) {
140+
closure->cb(&exec_ctx, closure->cb_arg, closure->success);
141+
}
142+
grpc_exec_ctx_finish(&exec_ctx);
143+
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
144+
if (pending_join) {
145+
gpr_thd_join(g_executor.tid);
146+
}
147+
gpr_mu_destroy(&g_executor.mu);
148+
}

src/core/iomgr/executor.h

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
*
3+
* Copyright 2015, Google Inc.
4+
* All rights reserved.
5+
*
6+
* Redistribution and use in source and binary forms, with or without
7+
* modification, are permitted provided that the following conditions are
8+
* met:
9+
*
10+
* * Redistributions of source code must retain the above copyright
11+
* notice, this list of conditions and the following disclaimer.
12+
* * Redistributions in binary form must reproduce the above
13+
* copyright notice, this list of conditions and the following disclaimer
14+
* in the documentation and/or other materials provided with the
15+
* distribution.
16+
* * Neither the name of Google Inc. nor the names of its
17+
* contributors may be used to endorse or promote products derived from
18+
* this software without specific prior written permission.
19+
*
20+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31+
*
32+
*/
33+
34+
#ifndef GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H
35+
#define GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H
36+
37+
#include "src/core/iomgr/closure.h"
38+
39+
/** Initialize the global executor.
40+
*
41+
* This mechanism is meant to outsource work (grpc_closure instances) to a
42+
* thread, for those cases where blocking isn't an option but there isn't a
43+
* non-blocking solution available. */
44+
void grpc_executor_init();
45+
46+
/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
47+
* thread */
48+
void grpc_executor_enqueue(grpc_closure *closure, int success);
49+
50+
/** Shutdown the executor, running all pending work as part of the call */
51+
void grpc_executor_shutdown();
52+
53+
#endif /* GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H */

0 commit comments

Comments
 (0)