Skip to content

Commit

Permalink
Gobbler should take MDC into account (#442)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Sep 24, 2020
1 parent 7aa5a2a commit 7796fdb
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,42 @@
import io.airbyte.commons.concurrency.VoidCallable;
import java.io.BufferedReader;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class LineGobbler implements VoidCallable {

private final static Logger LOGGER = LoggerFactory.getLogger(LineGobbler.class);

public static void gobble(final InputStream is, final Consumer<String> consumer) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new LineGobbler(is, consumer, executor));
final Map<String, String> mdc = MDC.getCopyOfContextMap();
executor.submit(new LineGobbler(is, consumer, executor, mdc));
}

private final BufferedReader is;
private final Consumer<String> consumer;
private final ExecutorService executor;
private final Map<String, String> mdc;

LineGobbler(final InputStream is, final Consumer<String> consumer, final ExecutorService executor) {
LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc) {
this.is = IOs.newBufferedReader(is);
this.consumer = consumer;
this.executor = executor;
this.mdc = mdc;
}

@Override
public void voidCall() {
MDC.setContextMap(mdc);
try {
String line;
while ((line = is.readLine()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand All @@ -46,7 +47,7 @@ void readAllLines() {
final InputStream is = new ByteArrayInputStream("test\ntest2\n".getBytes(StandardCharsets.UTF_8));
final ExecutorService executor = spy(MoreExecutors.newDirectExecutorService());

executor.submit(new LineGobbler(is, consumer, executor));
executor.submit(new LineGobbler(is, consumer, executor, ImmutableMap.of()));

Mockito.verify(consumer).accept("test");
Mockito.verify(consumer).accept("test2");
Expand All @@ -60,7 +61,7 @@ void shutdownOnSuccess() {
final InputStream is = new ByteArrayInputStream("test\ntest2\n".getBytes(StandardCharsets.UTF_8));
final ExecutorService executor = spy(MoreExecutors.newDirectExecutorService());

executor.submit(new LineGobbler(is, consumer, executor));
executor.submit(new LineGobbler(is, consumer, executor, ImmutableMap.of()));

Mockito.verify(consumer, Mockito.times(2)).accept(anyString());
Mockito.verify(executor).shutdown();
Expand All @@ -74,7 +75,7 @@ void shutdownOnError() {
final InputStream is = new ByteArrayInputStream("test\ntest2\n".getBytes(StandardCharsets.UTF_8));
final ExecutorService executor = spy(MoreExecutors.newDirectExecutorService());

executor.submit(new LineGobbler(is, consumer, executor));
executor.submit(new LineGobbler(is, consumer, executor, ImmutableMap.of()));

verify(consumer).accept(anyString());
Mockito.verify(executor).shutdown();
Expand Down

0 comments on commit 7796fdb

Please sign in to comment.