Skip to content

Commit

Permalink
Merge pull request #11 from nubank/events-emission-enhancements
Browse files Browse the repository at this point in the history
DGD-4258 DGD-4311 Events emission enhancements
  • Loading branch information
jrosend authored Oct 23, 2024
2 parents 37c8280 + 975f831 commit 78946e4
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.openlineage.spark.agent;

import io.openlineage.client.OpenLineage;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Field;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.openlineage.client.OpenLineage.RunEvent.EventType;
import static io.openlineage.client.OpenLineage.RunEvent;
import static io.openlineage.client.OpenLineage.RunEvent.EventType.*;
import static java.util.Objects.isNull;

@Slf4j
public class NuEventEmitter {

private static final Set<String> WANTED_JOB_TYPES = Set.of(
"SQL_JOB" // as defined in SparkSQLExecutionContext.SPARK_JOB_TYPE
);

private static final Set<String> WANTED_EVENT_NAME_SUBSTRINGS = Set.of(
".execute_insert_into_hadoop_fs_relation_command.",
".adaptive_spark_plan."
);

private static Boolean isPermittedJobType(RunEvent event) {
String jobType = event.getJob().getFacets().getJobType().getJobType();
if (WANTED_JOB_TYPES.stream().noneMatch(jobType::equals)) {
log.debug("OpenLineage event with job type {} has no lineage value and should not be emitted", jobType);
return false;
}
return true;
}

private static Boolean isPermitedEventType(RunEvent event) {
if (RUNNING.equals(event.getEventType())) {
log.debug("OpenLineage event is {} and should not be emitted", RUNNING);
return false;
}
return true;
}

private static Boolean isPermittedJobName(RunEvent event) {
String jobName = event.getJob().getName();
if (isNull(jobName)) {
log.debug("OpenLineage event has no job name and should not be emitted");
return false;
}
if (WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) {
log.debug("OpenLineage event job name has no permitted substring and should not be emitted");
return false;
}
return true;
}

private static Boolean shouldEmit(RunEvent event) {
return Stream.of(
isPermittedJobType(event),
isPermitedEventType(event),
isPermittedJobName(event)
).noneMatch(Boolean.FALSE::equals);
}

private static Boolean shouldDiscardColumnLineageFacet(EventType eventType) {
return !COMPLETE.equals(eventType);
}

private static void discardColumnLineageFacet(RunEvent event) {
try {
Field columnLineageFacetField = OpenLineage.DatasetFacets.class.getDeclaredField("columnLineage");
columnLineageFacetField.setAccessible(true);
Stream
.concat(event.getInputs().stream(), event.getOutputs().stream())
.collect(Collectors.toList())
.forEach(dataset -> {
try {
log.debug("Discarding column lineage facet for dataset {} {} {}",
dataset.getClass().getSimpleName(), dataset.getNamespace(), dataset.getName());
columnLineageFacetField.set(dataset.getFacets(), null);
} catch (IllegalAccessException e) {
log.error("Failed to discard column lineage facet", e);
}
});
} catch (NoSuchFieldException e) {
log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
}
}

public static void emit(RunEvent event, EventEmitter eventEmitter) {
if (!shouldEmit(event)) {
return;
}

if (shouldDiscardColumnLineageFacet(event.getEventType())) {
discardColumnLineageFacet(event);
}

eventEmitter.emit(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.client.utils.UUIDUtils;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.NuEventEmitter;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder;
Expand Down Expand Up @@ -224,8 +225,9 @@ public void start(SparkListenerJobStart jobStart) {
.build())
.job(buildJob(jobStart.jobId()))
.build();

log.debug("Posting event for start {}: {}", jobStart, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

@Override
Expand Down Expand Up @@ -257,8 +259,9 @@ public void end(SparkListenerJobEnd jobEnd) {
.build())
.job(buildJob(jobEnd.jobId()))
.build();

log.debug("Posting event for end {}: {}", jobEnd, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, SparkListenerEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.NuEventEmitter;
import io.openlineage.spark.agent.filters.EventFilterUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.naming.JobNameBuilder;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void start(SparkListenerApplicationStart applicationStart) {
.build());

log.debug("Posting event for applicationId {} start: {}", applicationId, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

@Override
Expand Down Expand Up @@ -126,7 +127,7 @@ public void end(SparkListenerApplicationEnd applicationEnd) {
.build());

log.debug("Posting event for applicationId {} end: {}", applicationId, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

private OpenLineage.ParentRunFacet buildApplicationParentFacet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import io.openlineage.client.OpenLineage.RunEvent.EventType;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.NuEventEmitter;
import io.openlineage.spark.agent.filters.EventFilterUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.naming.JobNameBuilder;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
Expand All @@ -50,7 +53,6 @@ class SparkSQLExecutionContext implements ExecutionContext {
private static final String SPARK_PROCESSING_TYPE_BATCH = "BATCH";
private static final String SPARK_PROCESSING_TYPE_STREAMING = "STREAMING";
private final long executionId;
private String jobName;
private final OpenLineageContext olContext;
private final EventEmitter eventEmitter;
private final OpenLineageRunEventBuilder runEventBuilder;
Expand Down Expand Up @@ -88,30 +90,28 @@ public void start(SparkListenerSQLExecutionStart startEvent) {
"OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart");
// return;
}

olContext.setActiveJobId(activeJobId);
// We shall skip this START event, focusing on the first SparkListenerJobStart event to be the START, because of the presence of the job nurn
// only one START event is expected, in case it was already sent with jobStart, we send running
// EventType eventType = emittedOnJobStart ? RUNNING : START;
// emittedOnSqlExecutionStart = true;

// RunEvent event =
// runEventBuilder.buildRun(
// OpenLineageRunEventContext.builder()
// .applicationParentRunFacet(buildApplicationParentFacet())
// .event(startEvent)
// .runEventBuilder(
// olContext
// .getOpenLineage()
// .newRunEventBuilder()
// .eventTime(toZonedTime(startEvent.time()))
// .eventType(eventType))
// .jobBuilder(buildJob())
// .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
// .build());

// log.debug("Posting event for start {}: {}", executionId, event);
// eventEmitter.emit(event);
EventType eventType = emittedOnJobStart ? RUNNING : START;
emittedOnSqlExecutionStart = true;

RunEvent event =
runEventBuilder.buildRun(
OpenLineageRunEventContext.builder()
.applicationParentRunFacet(buildApplicationParentFacet())
.event(startEvent)
.runEventBuilder(
olContext
.getOpenLineage()
.newRunEventBuilder()
.eventTime(toZonedTime(startEvent.time()))
.eventType(eventType))
.jobBuilder(buildJob())
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
.build());

log.debug("Posting event for start {}: {}", executionId, event);
NuEventEmitter.emit(event, eventEmitter);
}

@Override
Expand Down Expand Up @@ -160,7 +160,7 @@ public void end(SparkListenerSQLExecutionEnd endEvent) {
if (log.isDebugEnabled()) {
log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event));
}
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

// TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed
Expand Down Expand Up @@ -191,7 +191,7 @@ public void start(SparkListenerStageSubmitted stageSubmitted) {
.build());

log.debug("Posting event for stage submitted {}: {}", executionId, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

// TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed
Expand Down Expand Up @@ -221,7 +221,7 @@ public void end(SparkListenerStageCompleted stageCompleted) {
.build());

log.debug("Posting event for stage completed {}: {}", executionId, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

@Override
Expand All @@ -244,12 +244,6 @@ public void setActiveJob(ActiveJob activeJob) {
@Override
public void start(SparkListenerJobStart jobStart) {
log.debug("SparkListenerJobStart - executionId: {}", executionId);
try {
jobName = jobStart.properties().getProperty("spark.job.name");
} catch (RuntimeException e) {
log.info("spark.job.name property not found in the context");
}
olContext.setJobNurn(jobName);
if (!olContext.getQueryExecution().isPresent()) {
log.info(NO_EXECUTION_INFO, olContext);
return;
Expand All @@ -262,7 +256,6 @@ public void start(SparkListenerJobStart jobStart) {
// only one START event is expected, in case it was already sent with sqlExecutionStart, we send
// running
EventType eventType = emittedOnSqlExecutionStart ? RUNNING : START;
emittedOnSqlExecutionStart = true;
emittedOnJobStart = true;

RunEvent event =
Expand All @@ -281,7 +274,7 @@ public void start(SparkListenerJobStart jobStart) {
.build());

log.debug("Posting event for start {}: {}", executionId, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

@Override
Expand Down Expand Up @@ -330,7 +323,7 @@ public void end(SparkListenerJobEnd jobEnd) {
.build());

log.debug("Posting event for end {}: {}", executionId, event);
eventEmitter.emit(event);
NuEventEmitter.emit(event, eventEmitter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.Versions;

import java.util.NoSuchElementException;
import java.util.Properties;
import lombok.Getter;
import lombok.NonNull;
import io.openlineage.spark.api.OpenLineageContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.SparkSession;

/** Captures information related to the Apache Spark job. */
@Getter
@Slf4j
public class NuFacet extends OpenLineage.DefaultRunFacet {
// @JsonProperty("jobId")
// @NonNull
Expand All @@ -26,8 +31,23 @@ public class NuFacet extends OpenLineage.DefaultRunFacet {
@JsonProperty("jobNurn")
private String jobNurn;

private String fetchJobNurn(OpenLineageContext olContext) {
if (olContext.getSparkSession().isPresent()) {
SparkSession sparkSession = olContext.getSparkSession().get();
try {
return sparkSession.conf().get("spark.job.name");
} catch (NoSuchElementException e) {
log.warn("spark.job.name property not found in the context");
return null;
}
}

log.warn("spark.job.name property not found because the SparkContext could not be retrieved from OpenLineageContext");
return null;
}

public NuFacet(@NonNull OpenLineageContext olContext) {
super(Versions.OPEN_LINEAGE_PRODUCER_URI);
this.jobNurn = olContext.getJobNurn();
this.jobNurn = fetchJobNurn(olContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,6 @@ public String getSparkVersion() {
*/
@Getter @Setter String jobName;

/**
* Job nurn is collected during the Spark runs, and stored for creating a custom facet within
* the Run facets. It should help us to enhance the events further in the lineage pipeline.
*/
@Getter @Setter String jobNurn;

@Setter Integer activeJobId;

public Optional<Integer> getActiveJobId() {
Expand Down

0 comments on commit 78946e4

Please sign in to comment.