From 19878b8ae3698931b3e0c9758049c6bf8004af3f Mon Sep 17 00:00:00 2001 From: Tigran Manasyan Date: Wed, 11 Dec 2024 19:05:26 +0400 Subject: [PATCH 1/4] [ADH-5435] Implement HDFS exporter for lineage events --- extensions/spark/kyuubi-spark-lineage/pom.xml | 6 + ....events.handler.CustomEventHandlerProvider | 1 + .../plugin/lineage/LineageDispatcher.scala | 3 +- .../lineage/LineageDispatcherType.scala | 2 +- .../lineage/detailed/HdfsLineageLogger.scala | 74 ++++++++ .../detailed/KyuubiDetailedEventHandler.scala | 52 ++++++ .../KyuubiDetailedEventHandlerProvider.scala | 38 +++++ .../detailed/LineageJsonSerializer.scala | 37 ++++ .../lineage/detailed/LineageLogger.scala | 26 +++ .../lineage/detailed/LineageSerializer.scala | 24 +++ .../KyuubiDetailedEventDispatcher.scala | 47 +++++ .../HdfsDetailedEventHandlerSuite.scala | 161 ++++++++++++++++++ kyuubi-common/pom.xml | 6 + .../org/apache/kyuubi/config/KyuubiConf.scala | 21 ++- .../apache/kyuubi/WithSimpleDFSService.scala | 0 .../apache/kyuubi/server/MiniDFSService.scala | 0 16 files changed, 494 insertions(+), 4 deletions(-) create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/resources/META-INF/services/org.apache.kyuubi.events.handler.CustomEventHandlerProvider create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandler.scala create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandlerProvider.scala create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageJsonSerializer.scala create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageLogger.scala create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageSerializer.scala create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiDetailedEventDispatcher.scala create mode 100644 extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala rename {kyuubi-server => kyuubi-common}/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala (100%) rename {kyuubi-server => kyuubi-common}/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala (100%) diff --git a/extensions/spark/kyuubi-spark-lineage/pom.xml b/extensions/spark/kyuubi-spark-lineage/pom.xml index c01fc2bb898..806edc1b4d2 100644 --- a/extensions/spark/kyuubi-spark-lineage/pom.xml +++ b/extensions/spark/kyuubi-spark-lineage/pom.xml @@ -179,6 +179,12 @@ jakarta.servlet-api test + + + org.apache.hadoop + hadoop-client-minicluster + test + diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/resources/META-INF/services/org.apache.kyuubi.events.handler.CustomEventHandlerProvider b/extensions/spark/kyuubi-spark-lineage/src/main/resources/META-INF/services/org.apache.kyuubi.events.handler.CustomEventHandlerProvider new file mode 100644 index 00000000000..b59fda66c3e --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/resources/META-INF/services/org.apache.kyuubi.events.handler.CustomEventHandlerProvider @@ -0,0 +1 @@ +org.apache.kyuubi.plugin.lineage.detailed.KyuubiDetailedEventHandlerProvider \ No newline at end of file diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala index b993f14282a..42745f6bf7a 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.plugin.lineage import org.apache.spark.sql.execution.QueryExecution -import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher, SparkEventDispatcher} +import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiDetailedEventDispatcher, KyuubiEventDispatcher, SparkEventDispatcher} import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasLineageDispatcher trait LineageDispatcher { @@ -36,6 +36,7 @@ object LineageDispatcher { LineageDispatcherType.withName(dispatcherType) match { case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher() case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher() + case LineageDispatcherType.KYUUBI_DETAILED_EVENT => new KyuubiDetailedEventDispatcher() case LineageDispatcherType.ATLAS => new AtlasLineageDispatcher() case _ => throw new UnsupportedOperationException( s"Unsupported lineage dispatcher: $dispatcherType.") diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala index 8e07f6d7769..00c84af8105 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala @@ -20,5 +20,5 @@ package org.apache.kyuubi.plugin.lineage object LineageDispatcherType extends Enumeration { type LineageDispatcherType = Value - val SPARK_EVENT, KYUUBI_EVENT, ATLAS = Value + val SPARK_EVENT, KYUUBI_EVENT, KYUUBI_DETAILED_EVENT, ATLAS = Value } diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala new file mode 100644 index 00000000000..c28a719e018 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.detailed + +import java.nio.charset.StandardCharsets + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.spark.sql.execution.QueryExecution + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.plugin.lineage.Lineage +import org.apache.kyuubi.plugin.lineage.detailed.HdfsLineageLogger.{LINEAGE_FILE_NAME, PLAN_FILE_NAME} +import org.apache.kyuubi.util.JdbcUtils + +class HdfsLineageLogger( + val rootDir: String, + config: Configuration, + val lineageSerializer: LineageSerializer = new LineageJsonSerializer()) extends LineageLogger + with Logging { + private val fileSystem: FileSystem = FileSystem.get(config) + + override def log(execution: QueryExecution, lineage: Lineage): Unit = { + val executionDir = new Path(rootDir, execution.id.toString) + + if (!fileSystem.mkdirs(executionDir)) { + throw new RuntimeException(s"Error creating directory $executionDir") + } + + logExecutionPlan(executionDir, execution) + logLineage(executionDir, lineage) + } + + private def logExecutionPlan(executionDir: Path, execution: QueryExecution): Unit = { + val queryPlanPath = new Path(executionDir, PLAN_FILE_NAME) + withNewFile(queryPlanPath) { + IOUtils.write(execution.toString(), _, StandardCharsets.UTF_8) + } + } + + private def logLineage(executionDir: Path, lineage: Lineage): Unit = { + val lineagePath = new Path(executionDir, LINEAGE_FILE_NAME) + withNewFile(lineagePath) { + IOUtils.write(lineageSerializer.serialize(lineage), _) + } + } + + private def withNewFile(filePath: Path)(action: FSDataOutputStream => Unit): Unit = { + JdbcUtils.withCloseable(fileSystem.create(filePath)) { + action(_) + } + } +} + +object HdfsLineageLogger { + val PLAN_FILE_NAME = "execution_plan.txt" + val LINEAGE_FILE_NAME = "lineage" +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandler.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandler.scala new file mode 100644 index 00000000000..04c2ecbd3a4 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandler.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.detailed + +import scala.util.control.NonFatal + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.events.KyuubiEvent +import org.apache.kyuubi.events.handler.EventHandler +import org.apache.kyuubi.plugin.lineage.dispatcher.OperationLineageKyuubiDetailedEvent + +class KyuubiDetailedEventHandler(val lineageLogger: LineageLogger) + extends EventHandler[KyuubiEvent] + with Logging { + + override def apply(event: KyuubiEvent): Unit = { + event match { + case lineageEvent: OperationLineageKyuubiDetailedEvent => + handleLineageEvent(lineageEvent) + case _ => + } + } + + private def handleLineageEvent(lineageEvent: OperationLineageKyuubiDetailedEvent): Unit = { + try { + lineageEvent.lineage + .filter { l => + l.inputTables.nonEmpty || l.outputTables.nonEmpty + } + .foreach(lineageLogger.log(lineageEvent.execution, _)) + } catch { + case NonFatal(exception: Exception) => error( + s"Error during handling lineage event for ${lineageEvent.execution.id}", + exception) + } + } +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandlerProvider.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandlerProvider.scala new file mode 100644 index 00000000000..89203aa2a88 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/KyuubiDetailedEventHandlerProvider.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.detailed + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.{HDFS_LOGGER_ROOT, HDFS_LOGGER_URL} +import org.apache.kyuubi.events.KyuubiEvent +import org.apache.kyuubi.events.handler.{CustomEventHandlerProvider, EventHandler} +import org.apache.kyuubi.util.KyuubiHadoopUtils + +class KyuubiDetailedEventHandlerProvider extends CustomEventHandlerProvider { + + override def create(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = { + val hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf) + val hdfsUrl = kyuubiConf.getOption(HDFS_LOGGER_URL.key).getOrElse { + throw new IllegalArgumentException(s"Option ${HDFS_LOGGER_URL.key} should be set") + } + hadoopConf.set("fs.defaultFS", hdfsUrl) + + new KyuubiDetailedEventHandler( + new HdfsLineageLogger(kyuubiConf.get(HDFS_LOGGER_ROOT), hadoopConf)) + } +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageJsonSerializer.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageJsonSerializer.scala new file mode 100644 index 00000000000..dcd071d9f77 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageJsonSerializer.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.detailed + +import java.nio.charset.{Charset, StandardCharsets} + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.kyuubi.plugin.lineage.Lineage + +class LineageJsonSerializer( + val charset: Charset = StandardCharsets.UTF_8) extends LineageSerializer { + + private val objectWriter = new ObjectMapper() + .registerModule(DefaultScalaModule) + .writerWithDefaultPrettyPrinter() + + override def serialize(lineage: Lineage): Array[Byte] = { + objectWriter.writeValueAsString(lineage).getBytes(charset) + } +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageLogger.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageLogger.scala new file mode 100644 index 00000000000..5ad239afe57 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageLogger.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.detailed + +import org.apache.spark.sql.execution.QueryExecution + +import org.apache.kyuubi.plugin.lineage.Lineage + +trait LineageLogger { + def log(execution: QueryExecution, lineage: Lineage): Unit +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageSerializer.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageSerializer.scala new file mode 100644 index 00000000000..e93ee49c442 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/LineageSerializer.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.detailed + +import org.apache.kyuubi.plugin.lineage.Lineage + +trait LineageSerializer { + def serialize(lineage: Lineage): Array[Byte] +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiDetailedEventDispatcher.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiDetailedEventDispatcher.scala new file mode 100644 index 00000000000..eef858d3d26 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/KyuubiDetailedEventDispatcher.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.dispatcher + +import org.apache.spark.sql.execution.QueryExecution + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.events.{EventBus, KyuubiEvent} +import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher} + +class KyuubiDetailedEventDispatcher extends LineageDispatcher { + + override def send(qe: QueryExecution, lineage: Option[Lineage]): Unit = { + val event = OperationLineageKyuubiDetailedEvent(qe, System.currentTimeMillis(), lineage, None) + EventBus.post(event) + } + + override def onFailure(qe: QueryExecution, exception: Exception): Unit = { + val event = + OperationLineageKyuubiDetailedEvent(qe, System.currentTimeMillis(), None, Some(exception)) + EventBus.post(event) + } +} + +case class OperationLineageKyuubiDetailedEvent( + execution: QueryExecution, + eventTime: Long, + lineage: Option[Lineage], + exception: Option[Throwable]) extends KyuubiEvent { + override def partitions: Seq[(String, String)] = + ("day", Utils.getDateFromTimestamp(eventTime)) :: Nil +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala new file mode 100644 index 00000000000..0f2da756592 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.detailed + +import java.nio.charset.StandardCharsets +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Using} + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.kyuubi.lineage.LineageConf.{DEFAULT_CATALOG, DISPATCHERS} +import org.apache.spark.sql.SparkListenerExtensionTest +import org.apache.spark.sql.execution.QueryExecution + +import org.apache.kyuubi.{KyuubiFunSuite, WithSimpleDFSService} +import org.apache.kyuubi.events.EventBus +import org.apache.kyuubi.plugin.lineage.Lineage +import org.apache.kyuubi.plugin.lineage.detailed.HdfsLineageLogger.{LINEAGE_FILE_NAME, PLAN_FILE_NAME} +import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION + +class HdfsDetailedEventHandlerSuite extends KyuubiFunSuite + with WithSimpleDFSService + with SparkListenerExtensionTest { + + val catalogName: String = if (SPARK_RUNTIME_VERSION <= "3.1") { + "org.apache.spark.sql.connector.InMemoryTableCatalog" + } else { + "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" + } + + override protected val catalogImpl: String = "hive" + + private var hdfsClient: FileSystem = _ + + override def beforeAll(): Unit = { + super.beforeAll() + hdfsClient = FileSystem.get(getHadoopConf) + } + + override def sparkConf(): SparkConf = { + super.sparkConf() + .set("spark.sql.catalog.v2_catalog", catalogName) + .set( + "spark.sql.queryExecutionListeners", + "org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener") + .set(DISPATCHERS.key, "KYUUBI_DETAILED_EVENT") + } + + test("lineage event was captured") { + val eventHandler = registerLineageEventHandler(1) + + withTable("test_table0") { _ => + spark.sql("create table test_table0(a string, b string)") + spark.sql("select a as col0, b as col1 from test_table0").collect() + + eventHandler.expectedEventsLatch.await(20, TimeUnit.SECONDS) + + assert(eventHandler.handledEvents.size == 1) + + val actualEvent = eventHandler.handledEvents(0) + val expectedLineage = Lineage( + List(s"$DEFAULT_CATALOG.default.test_table0"), + List(), + List( + ("col0", Set(s"$DEFAULT_CATALOG.default.test_table0.a")), + ("col1", Set(s"$DEFAULT_CATALOG.default.test_table0.b")))) + assert(actualEvent._2 == expectedLineage) + + val executionDir = s"/test/lineage/${actualEvent._1.id}/" + assert(hdfsClient.exists(new Path(executionDir, LINEAGE_FILE_NAME))) + assert(hdfsClient.exists(new Path(executionDir, PLAN_FILE_NAME))) + } + } + + test("lineage event was written to HDFS") { + val eventHandler = registerLineageEventHandler(1) + + withTable("test_table0") { _ => + spark.sql("create table test_table0(a string, b string)") + spark.sql("select a as col0, b as col1 from test_table0").collect() + + eventHandler.expectedEventsLatch.await(20, TimeUnit.SECONDS) + assert(eventHandler.handledEvents.size == 1) + + val lineageEvent = eventHandler.handledEvents(0) + val executionDir = s"/test/lineage/${lineageEvent._1.id}/" + + val actualPlan = readUtf8(executionDir, PLAN_FILE_NAME) + assert(lineageEvent._1.toString() == actualPlan) + + val expectedLineage = + """{ + | "inputTables" : [ "spark_catalog.default.test_table0" ], + | "outputTables" : [ ], + | "columnLineage" : [ { + | "column" : "col0", + | "originalColumns" : [ "spark_catalog.default.test_table0.a" ] + | }, { + | "column" : "col1", + | "originalColumns" : [ "spark_catalog.default.test_table0.b" ] + | } ] + |}""".stripMargin + + val actualLineage = readUtf8(executionDir, LINEAGE_FILE_NAME) + assert(actualLineage == expectedLineage) + } + } + + private def readUtf8(parent: String, file: String): String = { + Using(hdfsClient.open(new Path(parent, file))) { + IOUtils.readFullyToByteArray(_) + } match { + case Success(bytes) => new String(bytes, StandardCharsets.UTF_8) + case Failure(exception) => throw exception + } + } + + private def registerLineageEventHandler( + expectedEvents: Int, + eventRootPath: String = "/test/lineage"): LineageLoggerWrapper = { + val eventLogger = new LineageLoggerWrapper( + new HdfsLineageLogger(eventRootPath, getHadoopConf), + new CountDownLatch(expectedEvents)) + EventBus.register(new KyuubiDetailedEventHandler(eventLogger)) + eventLogger + } + + private class LineageLoggerWrapper( + val delegate: LineageLogger, + val expectedEventsLatch: CountDownLatch) extends LineageLogger { + + val handledEvents: ArrayBuffer[(QueryExecution, Lineage)] = new ArrayBuffer() + + override def log(execution: QueryExecution, lineage: Lineage): Unit = { + try { + delegate.log(execution, lineage) + } finally { + expectedEventsLatch.countDown() + handledEvents += Tuple2(execution, lineage) + } + } + } +} diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml index 9a9a95d68e2..281e71a649b 100644 --- a/kyuubi-common/pom.xml +++ b/kyuubi-common/pom.xml @@ -178,6 +178,12 @@ flexmark-all test + + + org.apache.hadoop + hadoop-client-minicluster + test + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index d77f7103a53..5db21e9f2c7 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -38,6 +38,7 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { private val settings = new ConcurrentHashMap[String, String]() private lazy val reader: ConfigProvider = new ConfigProvider(settings) + private def loadFromMap(props: Map[String, String]): Unit = { settings.putAll(props.asJava) } @@ -163,7 +164,8 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { /** * Retrieve key-value pairs from [[KyuubiConf]] starting with `dropped.remainder`, and put them to * the result map with the `dropped` of key being dropped. - * @param dropped first part of prefix which will dropped for the new key + * + * @param dropped first part of prefix which will dropped for the new key * @param remainder second part of the prefix which will be remained in the key */ def getAllWithPrefix(dropped: String, remainder: String): Map[String, String] = { @@ -2574,6 +2576,7 @@ object KyuubiConf { object OperationLanguages extends Enumeration with Logging { type OperationLanguage = Value val PYTHON, SQL, SCALA, UNKNOWN = Value + def apply(language: String): OperationLanguage = { language.toUpperCase(Locale.ROOT) match { case "PYTHON" => PYTHON @@ -3137,7 +3140,7 @@ object KyuubiConf { /** * Holds information about keys that have been deprecated. * - * @param key The deprecated key. + * @param key The deprecated key. * @param version Version of Kyuubi where key was deprecated. * @param comment Additional info regarding to the removed config. For example, * reasons of config deprecation, what users should use instead of it. @@ -3528,4 +3531,18 @@ object KyuubiConf { .version("1.9.1") .serverOnly .fallbackConf(HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE) + + val HDFS_LOGGER_URL: OptionalConfigEntry[String] = + buildConf("kyuubi.lineage.hdfs.url") + .doc("URL of the HDFS namenode where lineage metadata files will be written") + .version("1.9.0") + .stringConf + .createOptional + + val HDFS_LOGGER_ROOT: ConfigEntry[String] = + buildConf("kyuubi.lineage.hdfs.root") + .doc("HDFS root directory where lineage metadata files will be written") + .version("1.9.0") + .stringConf + .createWithDefault("/kyuubi/lineage") } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala similarity index 100% rename from kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala rename to kyuubi-common/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala similarity index 100% rename from kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala rename to kyuubi-common/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala From 5664e707769eaa23cbc760ce89cc9d56478165d4 Mon Sep 17 00:00:00 2001 From: Tigran Manasyan Date: Fri, 13 Dec 2024 19:31:05 +0400 Subject: [PATCH 2/4] Fix authz module test dependencies --- extensions/spark/kyuubi-spark-authz/pom.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml index 499c918e5d8..217c3c91e23 100644 --- a/extensions/spark/kyuubi-spark-authz/pom.xml +++ b/extensions/spark/kyuubi-spark-authz/pom.xml @@ -336,6 +336,14 @@ ${delta.artifact}_${scala.binary.version} test + + + org.apache.kyuubi + kyuubi-util-scala_${scala.binary.version} + ${project.version} + test + test-jar + From 0f70d68fd16163145b7409f54161d049ca16f20a Mon Sep 17 00:00:00 2001 From: Tigran Manasyan Date: Mon, 16 Dec 2024 18:32:19 +0400 Subject: [PATCH 3/4] Fix spotless checkstyle issue --- extensions/spark/kyuubi-spark-authz/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml index 217c3c91e23..b93ce391ec3 100644 --- a/extensions/spark/kyuubi-spark-authz/pom.xml +++ b/extensions/spark/kyuubi-spark-authz/pom.xml @@ -341,8 +341,8 @@ org.apache.kyuubi kyuubi-util-scala_${scala.binary.version} ${project.version} - test test-jar + test From 572e008024dafd133ec38671edee3862a8d436ea Mon Sep 17 00:00:00 2001 From: Tigran Manasyan Date: Wed, 18 Dec 2024 16:45:47 +0400 Subject: [PATCH 4/4] Save lineage events in session directory and also log sql corresponding sql statements --- .../lineage/detailed/HdfsLineageLogger.scala | 32 ++++++++++++++----- .../HdfsDetailedEventHandlerSuite.scala | 15 ++++++--- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala index c28a719e018..add7044e04e 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsLineageLogger.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.kyuubi.Logging import org.apache.kyuubi.plugin.lineage.Lineage -import org.apache.kyuubi.plugin.lineage.detailed.HdfsLineageLogger.{LINEAGE_FILE_NAME, PLAN_FILE_NAME} +import org.apache.kyuubi.plugin.lineage.detailed.HdfsLineageLogger.{LINEAGE_FILE_NAME, PLAN_FILE_NAME, SQL_QUERY_HEADER} import org.apache.kyuubi.util.JdbcUtils class HdfsLineageLogger( @@ -37,20 +37,30 @@ class HdfsLineageLogger( private val fileSystem: FileSystem = FileSystem.get(config) override def log(execution: QueryExecution, lineage: Lineage): Unit = { - val executionDir = new Path(rootDir, execution.id.toString) + val executionDir = getSessionDirectory(execution) if (!fileSystem.mkdirs(executionDir)) { throw new RuntimeException(s"Error creating directory $executionDir") } - logExecutionPlan(executionDir, execution) + logQueryMetadata(executionDir, execution) logLineage(executionDir, lineage) } - private def logExecutionPlan(executionDir: Path, execution: QueryExecution): Unit = { - val queryPlanPath = new Path(executionDir, PLAN_FILE_NAME) - withNewFile(queryPlanPath) { - IOUtils.write(execution.toString(), _, StandardCharsets.UTF_8) + private def logQueryMetadata(executionDir: Path, execution: QueryExecution): Unit = { + val path = new Path(executionDir, PLAN_FILE_NAME) + + val queryMetadata = execution.logical.origin + .sqlText + .map { sqlQuery => + s"""$SQL_QUERY_HEADER + |$sqlQuery + | + |$execution""".stripMargin + }.getOrElse(execution.toString()) + + withNewFile(path) { + IOUtils.write(queryMetadata, _, StandardCharsets.UTF_8) } } @@ -66,9 +76,15 @@ class HdfsLineageLogger( action(_) } } + + private def getSessionDirectory(execution: QueryExecution): Path = { + val sparkAppName = execution.sparkSession.conf.get("spark.app.name") + new Path(rootDir, new Path(sparkAppName, execution.id.toString)) + } } object HdfsLineageLogger { - val PLAN_FILE_NAME = "execution_plan.txt" + val PLAN_FILE_NAME = "query_metadata.txt" val LINEAGE_FILE_NAME = "lineage" + val SQL_QUERY_HEADER = "== SQL Query ==" } diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala index 0f2da756592..979bc00c0bb 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/detailed/HdfsDetailedEventHandlerSuite.scala @@ -62,6 +62,7 @@ class HdfsDetailedEventHandlerSuite extends KyuubiFunSuite "spark.sql.queryExecutionListeners", "org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener") .set(DISPATCHERS.key, "KYUUBI_DETAILED_EVENT") + .set("spark.app.name", "kyuubi_app") } test("lineage event was captured") { @@ -84,7 +85,7 @@ class HdfsDetailedEventHandlerSuite extends KyuubiFunSuite ("col1", Set(s"$DEFAULT_CATALOG.default.test_table0.b")))) assert(actualEvent._2 == expectedLineage) - val executionDir = s"/test/lineage/${actualEvent._1.id}/" + val executionDir = s"/test/lineage/kyuubi_app/${actualEvent._1.id}/" assert(hdfsClient.exists(new Path(executionDir, LINEAGE_FILE_NAME))) assert(hdfsClient.exists(new Path(executionDir, PLAN_FILE_NAME))) } @@ -95,16 +96,22 @@ class HdfsDetailedEventHandlerSuite extends KyuubiFunSuite withTable("test_table0") { _ => spark.sql("create table test_table0(a string, b string)") - spark.sql("select a as col0, b as col1 from test_table0").collect() + val sqlQuery = "select a as col0, b as col1 from test_table0" + spark.sql(sqlQuery).collect() eventHandler.expectedEventsLatch.await(20, TimeUnit.SECONDS) assert(eventHandler.handledEvents.size == 1) val lineageEvent = eventHandler.handledEvents(0) - val executionDir = s"/test/lineage/${lineageEvent._1.id}/" + val executionDir = s"/test/lineage/kyuubi_app/${lineageEvent._1.id}/" val actualPlan = readUtf8(executionDir, PLAN_FILE_NAME) - assert(lineageEvent._1.toString() == actualPlan) + val expectedPlan = + s"""${HdfsLineageLogger.SQL_QUERY_HEADER} + |$sqlQuery + | + |${lineageEvent._1}""".stripMargin + assert(expectedPlan == actualPlan) val expectedLineage = """{