Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADH-5435] Add HDFS lineage logger #7

Open
wants to merge 4 commits into
base: 1.9.2-develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions extensions/spark/kyuubi-spark-authz/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@
<artifactId>${delta.artifact}_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
6 changes: 6 additions & 0 deletions extensions/spark/kyuubi-spark-lineage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@
<artifactId>jakarta.servlet-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.kyuubi.plugin.lineage.detailed.KyuubiDetailedEventHandlerProvider
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.plugin.lineage

import org.apache.spark.sql.execution.QueryExecution

import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher, SparkEventDispatcher}
import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiDetailedEventDispatcher, KyuubiEventDispatcher, SparkEventDispatcher}
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasLineageDispatcher

trait LineageDispatcher {
Expand All @@ -36,6 +36,7 @@ object LineageDispatcher {
LineageDispatcherType.withName(dispatcherType) match {
case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher()
case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher()
case LineageDispatcherType.KYUUBI_DETAILED_EVENT => new KyuubiDetailedEventDispatcher()
case LineageDispatcherType.ATLAS => new AtlasLineageDispatcher()
case _ => throw new UnsupportedOperationException(
s"Unsupported lineage dispatcher: $dispatcherType.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ package org.apache.kyuubi.plugin.lineage
object LineageDispatcherType extends Enumeration {
type LineageDispatcherType = Value

val SPARK_EVENT, KYUUBI_EVENT, ATLAS = Value
val SPARK_EVENT, KYUUBI_EVENT, KYUUBI_DETAILED_EVENT, ATLAS = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.detailed

import java.nio.charset.StandardCharsets

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.spark.sql.execution.QueryExecution

import org.apache.kyuubi.Logging
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.detailed.HdfsLineageLogger.{LINEAGE_FILE_NAME, PLAN_FILE_NAME, SQL_QUERY_HEADER}
import org.apache.kyuubi.util.JdbcUtils

class HdfsLineageLogger(
val rootDir: String,
config: Configuration,
val lineageSerializer: LineageSerializer = new LineageJsonSerializer()) extends LineageLogger
with Logging {
private val fileSystem: FileSystem = FileSystem.get(config)

override def log(execution: QueryExecution, lineage: Lineage): Unit = {
val executionDir = getSessionDirectory(execution)

if (!fileSystem.mkdirs(executionDir)) {
throw new RuntimeException(s"Error creating directory $executionDir")
}

logQueryMetadata(executionDir, execution)
logLineage(executionDir, lineage)
}

private def logQueryMetadata(executionDir: Path, execution: QueryExecution): Unit = {
val path = new Path(executionDir, PLAN_FILE_NAME)

val queryMetadata = execution.logical.origin
.sqlText
.map { sqlQuery =>
s"""$SQL_QUERY_HEADER
|$sqlQuery
|
|$execution""".stripMargin
}.getOrElse(execution.toString())

withNewFile(path) {
IOUtils.write(queryMetadata, _, StandardCharsets.UTF_8)
}
}

private def logLineage(executionDir: Path, lineage: Lineage): Unit = {
val lineagePath = new Path(executionDir, LINEAGE_FILE_NAME)
withNewFile(lineagePath) {
IOUtils.write(lineageSerializer.serialize(lineage), _)
}
}

private def withNewFile(filePath: Path)(action: FSDataOutputStream => Unit): Unit = {
JdbcUtils.withCloseable(fileSystem.create(filePath)) {
action(_)
}
}

private def getSessionDirectory(execution: QueryExecution): Path = {
val sparkAppName = execution.sparkSession.conf.get("spark.app.name")
new Path(rootDir, new Path(sparkAppName, execution.id.toString))
}
}

object HdfsLineageLogger {
val PLAN_FILE_NAME = "query_metadata.txt"
val LINEAGE_FILE_NAME = "lineage"
val SQL_QUERY_HEADER = "== SQL Query =="
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.detailed

import scala.util.control.NonFatal

import org.apache.kyuubi.Logging
import org.apache.kyuubi.events.KyuubiEvent
import org.apache.kyuubi.events.handler.EventHandler
import org.apache.kyuubi.plugin.lineage.dispatcher.OperationLineageKyuubiDetailedEvent

class KyuubiDetailedEventHandler(val lineageLogger: LineageLogger)
extends EventHandler[KyuubiEvent]
with Logging {

override def apply(event: KyuubiEvent): Unit = {
event match {
case lineageEvent: OperationLineageKyuubiDetailedEvent =>
handleLineageEvent(lineageEvent)
case _ =>
}
}

private def handleLineageEvent(lineageEvent: OperationLineageKyuubiDetailedEvent): Unit = {
try {
lineageEvent.lineage
.filter { l =>
l.inputTables.nonEmpty || l.outputTables.nonEmpty
}
.foreach(lineageLogger.log(lineageEvent.execution, _))
} catch {
case NonFatal(exception: Exception) => error(
s"Error during handling lineage event for ${lineageEvent.execution.id}",
exception)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.detailed

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{HDFS_LOGGER_ROOT, HDFS_LOGGER_URL}
import org.apache.kyuubi.events.KyuubiEvent
import org.apache.kyuubi.events.handler.{CustomEventHandlerProvider, EventHandler}
import org.apache.kyuubi.util.KyuubiHadoopUtils

class KyuubiDetailedEventHandlerProvider extends CustomEventHandlerProvider {

override def create(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
val hdfsUrl = kyuubiConf.getOption(HDFS_LOGGER_URL.key).getOrElse {
throw new IllegalArgumentException(s"Option ${HDFS_LOGGER_URL.key} should be set")
}
hadoopConf.set("fs.defaultFS", hdfsUrl)

new KyuubiDetailedEventHandler(
new HdfsLineageLogger(kyuubiConf.get(HDFS_LOGGER_ROOT), hadoopConf))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.detailed

import java.nio.charset.{Charset, StandardCharsets}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.kyuubi.plugin.lineage.Lineage

class LineageJsonSerializer(
val charset: Charset = StandardCharsets.UTF_8) extends LineageSerializer {

private val objectWriter = new ObjectMapper()
.registerModule(DefaultScalaModule)
.writerWithDefaultPrettyPrinter()

override def serialize(lineage: Lineage): Array[Byte] = {
objectWriter.writeValueAsString(lineage).getBytes(charset)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.detailed

import org.apache.spark.sql.execution.QueryExecution

import org.apache.kyuubi.plugin.lineage.Lineage

trait LineageLogger {
def log(execution: QueryExecution, lineage: Lineage): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.detailed

import org.apache.kyuubi.plugin.lineage.Lineage

trait LineageSerializer {
def serialize(lineage: Lineage): Array[Byte]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.dispatcher

import org.apache.spark.sql.execution.QueryExecution

import org.apache.kyuubi.Utils
import org.apache.kyuubi.events.{EventBus, KyuubiEvent}
import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}

class KyuubiDetailedEventDispatcher extends LineageDispatcher {

override def send(qe: QueryExecution, lineage: Option[Lineage]): Unit = {
val event = OperationLineageKyuubiDetailedEvent(qe, System.currentTimeMillis(), lineage, None)
EventBus.post(event)
}

override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
val event =
OperationLineageKyuubiDetailedEvent(qe, System.currentTimeMillis(), None, Some(exception))
EventBus.post(event)
}
}

case class OperationLineageKyuubiDetailedEvent(
execution: QueryExecution,
eventTime: Long,
lineage: Option[Lineage],
exception: Option[Throwable]) extends KyuubiEvent {
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(eventTime)) :: Nil
}
Loading