From 69215c0f5837bb64f9ad8daeecb28294a015e44e Mon Sep 17 00:00:00 2001 From: Yury Brigadirenko Date: Thu, 14 Nov 2024 19:33:36 -0500 Subject: [PATCH] server: cleanup control chars from jsonb (#1034) --- .../v2/runner/DefaultEventReportingService.java | 17 ++++++++++++++++- .../v2/runner/EventReportingServiceTest.java | 6 +++--- .../concord/server/ConcordObjectMapper.java | 9 ++++++--- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultEventReportingService.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultEventReportingService.java index b7edd244a9..4b82894b7d 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultEventReportingService.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/DefaultEventReportingService.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.List; import java.util.TimerTask; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -53,14 +54,17 @@ public class DefaultEventReportingService implements EventReportingService, Exec private final int maxBatchSize; private final Object batchLock = new Object(); private final ScheduledExecutorService flushScheduler; + private final PersistenceService persistenceService; @Inject public DefaultEventReportingService(InstanceId instanceId, ProcessConfiguration processConfiguration, - ApiClient apiClient) { + ApiClient apiClient, + PersistenceService persistenceService) { this.instanceId = instanceId; this.processEventsApi = new ProcessEventsApi(apiClient); this.maxBatchSize = processConfiguration.events().batchSize(); + this.persistenceService = persistenceService; this.eventQueue = initializeQueue(maxBatchSize); this.flushScheduler = Executors.newSingleThreadScheduledExecutor(); @@ -132,6 +136,9 @@ private void sendBatch(List eventBatch) { try { getProcessEventsApi().batchEvent(instanceId.getValue(), eventBatch); } catch (ApiException e) { + for (var event: eventBatch) { + saveEvent(event); + } log.warn("Error while sending batch of {} event{} to the server: {}", eventBatch.size(), eventBatch.isEmpty() ? "" : "s", e.getMessage()); } @@ -141,6 +148,7 @@ private void sendSingle(ProcessEventRequest req) { try { getProcessEventsApi().event(instanceId.getValue(), req); } catch (ApiException e) { + saveEvent(req); log.warn("error while sending an event to the server: {}", e.getMessage()); } } @@ -171,4 +179,11 @@ public void run() { } } + private void saveEvent(ProcessEventRequest event) { + try { + persistenceService.persistFile("invalid_event_" + UUID.randomUUID() + ".json", out -> processEventsApi.getApiClient().getObjectMapper().writeValue(out, event)); + } catch (Exception e) { + log.warn("can't save event", e); + } + } } diff --git a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/EventReportingServiceTest.java b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/EventReportingServiceTest.java index 7cb59158cb..ea9b8705c9 100644 --- a/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/EventReportingServiceTest.java +++ b/runtime/v2/runner/src/test/java/com/walmartlabs/concord/runtime/v2/runner/EventReportingServiceTest.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -81,7 +81,7 @@ private static class MockedEventReportingService extends DefaultEventReportingSe private final AtomicInteger flushCounter; public MockedEventReportingService(ProcessConfiguration processConfiguration) { - super(new InstanceId(UUID.randomUUID()), processConfiguration, mock(ApiClient.class)); + super(new InstanceId(UUID.randomUUID()), processConfiguration, mock(ApiClient.class), mock(PersistenceService.class)); this.mockProcessEventsApi = mock(ProcessEventsApi.class); this.flushCounter = new AtomicInteger(); } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordObjectMapper.java b/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordObjectMapper.java index 502086d3e7..9709a47bf7 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordObjectMapper.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/ConcordObjectMapper.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,6 +29,7 @@ import javax.inject.Singleton; import java.io.IOException; import java.util.Map; +import java.util.regex.Pattern; @Named @Singleton @@ -37,6 +38,8 @@ public class ConcordObjectMapper { public static final TypeReference> MAP_TYPE = new TypeReference>() { }; + private static final Pattern CONTROL_CHARS = Pattern.compile("[\\x00-\\x1F]"); + private final ObjectMapper delegate; @Inject @@ -113,7 +116,7 @@ public T fromString(String s, Class valueType) { } private static String removeUnsupportedEscape(String str) { - return str.replace("\\u0000", ""); + return CONTROL_CHARS.matcher(str).replaceAll(""); } private T deserialize(String o, TypeReference valueTypeRef) {