Skip to content

Commit 8f21248

Browse files
cosmo0920edsiper
authored andcommitted
filter_wasm: Optimize for high throughput
Instantiate Wasm programs only once and execute with the instantiated instance. Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 0174471 commit 8f21248

File tree

1 file changed

+52
-27
lines changed

1 file changed

+52
-27
lines changed

plugins/filter_wasm/filter_wasm.c

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@ static int cb_wasm_filter(const void *data, size_t bytes,
5555
char *json_buf = NULL;
5656
size_t json_size;
5757
int root_type;
58-
struct flb_wasm *wasm = NULL;
58+
/* Get the persistent WASM instance from the filter context. */
59+
struct flb_filter_wasm *ctx = filter_context;
60+
struct flb_wasm *wasm = ctx->wasm;
5961
size_t buf_size;
6062

61-
struct flb_filter_wasm *ctx = filter_context;
6263
struct flb_log_event_encoder log_encoder;
6364
struct flb_log_event_decoder log_decoder;
6465
struct flb_log_event log_event;
@@ -67,6 +68,12 @@ static int cb_wasm_filter(const void *data, size_t bytes,
6768
(void) i_ins;
6869
(void) config;
6970

71+
/* Safeguard in case initialization failed. */
72+
if (!wasm) {
73+
flb_plg_error(ctx->ins, "WASM instance is not available, skipping.");
74+
return FLB_FILTER_NOTOUCH;
75+
}
76+
7077
ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
7178

7279
if (ret != FLB_EVENT_DECODER_SUCCESS) {
@@ -88,13 +95,6 @@ static int cb_wasm_filter(const void *data, size_t bytes,
8895
return FLB_FILTER_NOTOUCH;
8996
}
9097

91-
wasm = flb_wasm_instantiate(config, ctx->wasm_path, ctx->accessible_dir_list,
92-
ctx->wasm_conf);
93-
if (wasm == NULL) {
94-
flb_plg_debug(ctx->ins, "instantiate wasm [%s] failed", ctx->wasm_path);
95-
goto on_error;
96-
}
97-
9898
while ((ret = flb_log_event_decoder_next(
9999
&log_decoder,
100100
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
@@ -117,8 +117,8 @@ static int cb_wasm_filter(const void *data, size_t bytes,
117117
}
118118
else {
119119
flb_plg_error(ctx->ins, "encode as JSON from msgpack is failed");
120-
121-
goto on_error;
120+
/* Go to on_error without destroying the persistent wasm instance */
121+
goto on_error_without_wasm_destroy;
122122
}
123123
break;
124124
case FLB_FILTER_WASM_FMT_MSGPACK:
@@ -127,8 +127,8 @@ static int cb_wasm_filter(const void *data, size_t bytes,
127127
(void **)&buf, &buf_size);
128128
if (ret < 0) {
129129
flb_plg_error(ctx->ins, "format msgpack is failed");
130-
131-
goto on_error;
130+
/* Go to on_error without destroying the persistent wasm instance */
131+
goto on_error_without_wasm_destroy;
132132
}
133133

134134
/* Execute WASM program */
@@ -227,9 +227,6 @@ static int cb_wasm_filter(const void *data, size_t bytes,
227227
}
228228
}
229229

230-
/* Teardown WASM context */
231-
flb_wasm_destroy(wasm);
232-
233230
*out_buf = log_encoder.output_buffer;
234231
*out_bytes = log_encoder.output_length;
235232

@@ -240,14 +237,10 @@ static int cb_wasm_filter(const void *data, size_t bytes,
240237

241238
return FLB_FILTER_MODIFIED;
242239

243-
on_error:
240+
/* A new error handler that doesn't destroy the persistent wasm instance */
241+
on_error_without_wasm_destroy:
244242
flb_log_event_decoder_destroy(&log_decoder);
245243
flb_log_event_encoder_destroy(&log_encoder);
246-
247-
if (wasm != NULL) {
248-
flb_wasm_destroy(wasm);
249-
}
250-
251244
return FLB_FILTER_NOTOUCH;
252245
}
253246

@@ -313,6 +306,7 @@ static int cb_wasm_pre_run(struct flb_filter_instance *f_ins,
313306
/* Check accessibility for the wasm path */
314307
ret = access(ctx->wasm_path, R_OK);
315308
if (ret != 0) {
309+
flb_plg_error(f_ins, "cannot access wasm program at %s", ctx->wasm_path);
316310
goto pre_run_error;
317311
}
318312

@@ -341,6 +335,9 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins,
341335
if (!ctx) {
342336
return -1;
343337
}
338+
/* Initialize wasm pointer to NULL */
339+
ctx->wasm = NULL;
340+
344341

345342
/* Initialize exec config */
346343
ret = filter_wasm_config_read(ctx, f_ins, config);
@@ -378,22 +375,50 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins,
378375
wasm_conf->stack_size = ctx->wasm_stack_size;
379376
}
380377

381-
/* Set context */
378+
/* Set context before instantiating */
382379
flb_filter_set_context(f_ins, ctx);
380+
381+
/* Instantiate the WASM module once and store it in the context */
382+
ctx->wasm = flb_wasm_instantiate(config, ctx->wasm_path,
383+
ctx->accessible_dir_list,
384+
ctx->wasm_conf);
385+
386+
if (ctx->wasm == NULL) {
387+
flb_plg_error(ctx->ins, "failed to instantiate wasm program: %s",
388+
ctx->wasm_path);
389+
goto init_error;
390+
}
391+
383392
return 0;
384393

385394
init_error:
386-
delete_wasm_config(ctx);
387-
395+
if (ctx) {
396+
if (ctx->wasm_conf) {
397+
flb_wasm_config_destroy(ctx->wasm_conf);
398+
ctx->wasm_conf = NULL;
399+
}
400+
delete_wasm_config(ctx);
401+
}
402+
flb_filter_set_context(f_ins, NULL);
388403
return -1;
389404
}
390405

391406
static int cb_wasm_exit(void *data, struct flb_config *config)
392407
{
393408
struct flb_filter_wasm *ctx = data;
394409

395-
flb_wasm_config_destroy(ctx->wasm_conf);
396-
flb_wasm_destroy_all(config);
410+
if (!ctx) {
411+
return 0;
412+
}
413+
414+
/* Destroy the single, persistent WASM instance */
415+
if (ctx->wasm) {
416+
flb_wasm_destroy(ctx->wasm);
417+
}
418+
/* Destroy the WASM configuration */
419+
if (ctx->wasm_conf) {
420+
flb_wasm_config_destroy(ctx->wasm_conf);
421+
}
397422
delete_wasm_config(ctx);
398423
return 0;
399424
}

0 commit comments

Comments
 (0)