Skip to content

Commit

Permalink
Remove airlift's EventClient usage
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Nov 5, 2024
1 parent e7f25ef commit a68ed81
Show file tree
Hide file tree
Showing 15 changed files with 1 addition and 326 deletions.
5 changes: 0 additions & 5 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>event</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
Expand Down Expand Up @@ -91,7 +90,6 @@ public static Connector createConnector(
ClassLoader classLoader = DeltaLakeConnectorFactory.class.getClassLoader();
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
new EventModule(),
new MBeanModule(),
new ConnectorObjectNameGeneratorModule("io.trino.plugin.deltalake", "trino.plugin.deltalake"),
new JsonModule(),
Expand Down
5 changes: 0 additions & 5 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>event</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.inject.Scopes;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
Expand Down Expand Up @@ -96,7 +95,6 @@ public static Connector createConnector(
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
new CatalogNameModule(catalogName),
new EventModule(),
new MBeanModule(),
new ConnectorObjectNameGeneratorModule("io.trino.plugin.hive", "trino.plugin.hive"),
new JsonModule(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import io.airlift.event.client.EventClient;
import io.trino.plugin.hive.avro.AvroFileWriterFactory;
import io.trino.plugin.hive.avro.AvroPageSourceFactory;
import io.trino.plugin.hive.fs.CachingDirectoryLister;
Expand Down Expand Up @@ -95,8 +94,6 @@ public void configure(Binder binder)

binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(HiveWriterStats.class).withGeneratedName();

newSetBinder(binder, EventClient.class).addBinding().to(HiveEventClient.class).in(Scopes.SINGLETON);
binder.bind(HivePartitionManager.class).in(Scopes.SINGLETON);
binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON);
Multibinder<SystemTableProvider> systemTableProviders = newSetBinder(binder, SystemTableProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import io.airlift.event.client.EventClient;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.metastore.SortingColumn;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.HivePageSinkMetadataProvider;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.NodeManager;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.PageSorter;
import io.trino.spi.connector.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -69,9 +67,6 @@ public class HivePageSinkProvider
private final LocationService locationService;
private final ListeningExecutorService writeVerificationExecutor;
private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
private final NodeManager nodeManager;
private final EventClient eventClient;
private final HiveSessionProperties hiveSessionProperties;
private final HiveWriterStats hiveWriterStats;
private final long perTransactionMetastoreCacheMaximumSize;
private final boolean temporaryStagingDirectoryEnabled;
Expand All @@ -89,9 +84,6 @@ public HivePageSinkProvider(
SortingFileWriterConfig sortingFileWriterConfig,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
NodeManager nodeManager,
EventClient eventClient,
HiveSessionProperties hiveSessionProperties,
HiveWriterStats hiveWriterStats)
{
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
Expand All @@ -106,9 +98,6 @@ public HivePageSinkProvider(
this.locationService = requireNonNull(locationService, "locationService is null");
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(config.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.eventClient = requireNonNull(eventClient, "eventClient is null");
this.hiveSessionProperties = requireNonNull(hiveSessionProperties, "hiveSessionProperties is null");
this.hiveWriterStats = requireNonNull(hiveWriterStats, "hiveWriterStats is null");
this.perTransactionMetastoreCacheMaximumSize = config.getPerTransactionMetastoreCacheMaximumSize();
this.temporaryStagingDirectoryEnabled = config.isTemporaryStagingDirectoryEnabled();
Expand Down Expand Up @@ -178,9 +167,6 @@ private HivePageSink createPageSink(HiveWritableTableHandle handle, boolean isCr
writerSortBufferSize,
maxOpenSortFiles,
session,
nodeManager,
eventClient,
hiveSessionProperties,
hiveWriterStats,
temporaryStagingDirectoryEnabled,
temporaryStagingDirectoryPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.Closeable;
import java.util.Optional;
import java.util.function.Consumer;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,7 +31,6 @@ public class HiveWriter
private final String fileName;
private final String writePath;
private final String targetPath;
private final Consumer<HiveWriter> onCommit;
private final HiveWriterStats hiveWriterStats;

private long rowCount;
Expand All @@ -45,7 +43,6 @@ public HiveWriter(
String fileName,
String writePath,
String targetPath,
Consumer<HiveWriter> onCommit,
HiveWriterStats hiveWriterStats)
{
this.fileWriter = requireNonNull(fileWriter, "fileWriter is null");
Expand All @@ -54,7 +51,6 @@ public HiveWriter(
this.fileName = requireNonNull(fileName, "fileName is null");
this.writePath = requireNonNull(writePath, "writePath is null");
this.targetPath = requireNonNull(targetPath, "targetPath is null");
this.onCommit = requireNonNull(onCommit, "onCommit is null");
this.hiveWriterStats = requireNonNull(hiveWriterStats, "hiveWriterStats is null");
}

Expand Down Expand Up @@ -89,9 +85,7 @@ public void append(Page dataPage)

public Closeable commit()
{
Closeable rollbackAction = fileWriter.commit();
onCommit.accept(this);
return rollbackAction;
return fileWriter.commit();
}

long getValidationCpuNanos()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.event.client.EventClient;
import io.airlift.units.DataSize;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
Expand All @@ -39,7 +38,6 @@
import io.trino.plugin.hive.metastore.HivePageSinkMetadataProvider;
import io.trino.plugin.hive.orc.OrcFileWriterFactory;
import io.trino.plugin.hive.util.HiveWriteUtils;
import io.trino.spi.NodeManager;
import io.trino.spi.Page;
import io.trino.spi.PageSorter;
import io.trino.spi.TrinoException;
Expand All @@ -50,25 +48,20 @@
import io.trino.spi.type.TypeManager;

import java.io.IOException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.immutableEntry;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.hive.formats.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS;
import static io.trino.metastore.AcidOperation.CREATE_TABLE;
Expand Down Expand Up @@ -147,11 +140,6 @@ public class HiveWriterFactory
private final ConnectorSession session;
private final OptionalInt bucketCount;
private final List<SortingColumn> sortedBy;

private final NodeManager nodeManager;
private final EventClient eventClient;
private final Map<String, String> sessionProperties;

private final HiveWriterStats hiveWriterStats;
private final Optional<Type> rowType;
private final Optional<HiveType> hiveRowtype;
Expand All @@ -178,9 +166,6 @@ public HiveWriterFactory(
DataSize sortBufferSize,
int maxOpenSortFiles,
ConnectorSession session,
NodeManager nodeManager,
EventClient eventClient,
HiveSessionProperties hiveSessionProperties,
HiveWriterStats hiveWriterStats,
boolean sortedWritingTempStagingPathEnabled,
String sortedWritingTempStagingPath)
Expand Down Expand Up @@ -256,20 +241,7 @@ public HiveWriterFactory(
}

this.sortedBy = ImmutableList.copyOf(requireNonNull(sortedBy, "sortedBy is null"));

this.session = requireNonNull(session, "session is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.eventClient = requireNonNull(eventClient, "eventClient is null");

requireNonNull(hiveSessionProperties, "hiveSessionProperties is null");
this.sessionProperties = hiveSessionProperties.getSessionProperties().stream()
.map(propertyMetadata -> immutableEntry(
propertyMetadata.getName(),
session.getProperty(propertyMetadata.getName(), propertyMetadata.getJavaType())))
// The session properties collected here are used for events only. Filter out nulls to avoid problems with downstream consumers
.filter(entry -> entry.getValue() != null)
.collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().toString()));

this.hiveWriterStats = requireNonNull(hiveWriterStats, "hiveWriterStats is null");
}

Expand Down Expand Up @@ -505,36 +477,6 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Writing not supported for " + outputStorageFormat);
}

String writePath = path.toString();
String writerImplementation = hiveFileWriter.getClass().getName();

Consumer<HiveWriter> onCommit = hiveWriter -> {
Optional<Long> size;
try {
size = Optional.of(hiveWriter.getWrittenBytes());
}
catch (RuntimeException e) {
// Do not fail the query if file system is not available
size = Optional.empty();
}

eventClient.post(new WriteCompletedEvent(
session.getQueryId(),
writePath,
schemaName,
tableName,
partitionName.orElse(null),
outputStorageFormat.getOutputFormat(),
writerImplementation,
nodeManager.getCurrentNode().getVersion(),
nodeManager.getCurrentNode().getHost(),
session.getIdentity().getPrincipal().map(Principal::getName).orElse(null),
nodeManager.getEnvironment(),
sessionProperties,
size.orElse(null),
hiveWriter.getRowCount()));
};

if (!sortedBy.isEmpty()) {
Location tempFilePath;
if (sortedWritingTempStagingPathEnabled) {
Expand Down Expand Up @@ -587,7 +529,6 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
path.fileName(),
writeInfo.writePath().toString(),
writeInfo.targetPath().toString(),
onCommit,
hiveWriterStats);
}

Expand Down
Loading

0 comments on commit a68ed81

Please sign in to comment.