Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50714][SQL] Enable schema evolution for TransformWithState when Avro encoding is used #49277

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Dec 23, 2024

What changes were proposed in this pull request?

This PR introduces schema evolution support for stateful operators in Spark Structured Streaming by:

Adding support for Avro-based schema evolution in state store encoders
Introducing new classes and interfaces to manage schema metadata:

StateSchemaProvider interface and implementations
StateSchemaBroadcast for distributing schema information
StateSchemaMetadata to track schema versions

Modifying state store providers and encoders to handle schema evolution:

Updated RocksDBStateEncoder to support reading data with evolved schemas
Added schema ID tracking in StateStoreColFamilySchema
Modified state store initialization to support schema providers

Why are the changes needed?

Schema evolution is a critical feature for stateful stream processing applications that need to handle changing data schemas over time.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit and Integration tests in RocksDBStateStoreSuite and TransformWithStateSuite

Was this patch authored or co-authored using generative AI tooling?

readerSchema: Schema,
valueProj: UnsafeProjection): UnsafeRow = {
if (valueBytes != null) {
val reader = new GenericDatumReader[Any](writerSchema, readerSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add some comments here around the args

@ericm-db ericm-db changed the title Ssm [SPARK-50714] Enable schema evolution for TransformWithState when Avro encoding is used Jan 3, 2025
@HyukjinKwon HyukjinKwon changed the title [SPARK-50714] Enable schema evolution for TransformWithState when Avro encoding is used [SPARK-50714][SQL] Enable schema evolution for TransformWithState when Avro encoding is used Jan 3, 2025

/**
* Converts a Spark SQL schema to a corresponding Avro schema.
* Handles nested types and adds support for schema evolution.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add more details here ? Also maybe add comments for all the function args ?


dataType match {
// Basic types
case BooleanType => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these Avro defaults too ?


// Complex types
case ArrayType(elementType, _) =>
val defaultArray = new java.util.ArrayList[Any]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have empty collections ? i.e. empty array/map etc ?

catalystType: DataType,
nullable: Boolean = false,
recordName: String = "topLevelRecord",
nameSpace: String = "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just say namespace ? also - what does this refer to ?

@@ -64,7 +64,9 @@ class IncrementalExecution(
val watermarkPropagator: WatermarkPropagator,
val isFirstBatch: Boolean,
val currentStateStoreCkptId:
MutableMap[Long, Array[Array[String]]] = MutableMap[Long, Array[Array[String]]]())
MutableMap[Long, Array[Array[String]]] = MutableMap[Long, Array[Array[String]]](),
val stateSchemaMetadatas: MutableMap[Long, StateSchemaBroadcast] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some comments for this ?

val stateSchemaMetadata = StateSchemaMetadata.
createStateSchemaMetadata(checkpointLocation, hadoopConf, stateSchemaList.head)

val stateSchemaBroadcast =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets note that the broadcast happens here for the first run

@@ -149,7 +149,8 @@ case class TransformWithStateInPandasExec(
initialStateGroupingAttrs.map(SortOrder(_, Ascending)))

override def operatorStateMetadata(
stateSchemaPaths: List[String]): OperatorStateMetadata = {
stateSchemaPaths: List[List[String]]
): OperatorStateMetadata = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can fit in line above ?

@@ -21,6 +21,10 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProj
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore}
import org.apache.spark.sql.types._

object ListStateMetricsImpl {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets combine this with some existing utils object ?

@@ -135,6 +136,8 @@ class MicroBatchExecution(
// operatorID -> (partitionID -> array of uniqueID)
private val currentStateStoreCkptId = MutableMap[Long, Array[Array[String]]]()

private val stateSchemaMetadatas = MutableMap[Long, StateSchemaBroadcast]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets add some comment here explaining what this is for

hasTtl: Boolean): StateStoreColFamilySchema = {
StateStoreColFamilySchema(
hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = {
val schemas = mutable.Map[String, StateStoreColFamilySchema]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these maps ?

@@ -344,7 +347,7 @@ class StatefulProcessorHandleImpl(
* the StatefulProcessor is initialized.
*/
class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any])
extends StatefulProcessorHandleImplBase(timeMode, keyExprEnc) {
extends StatefulProcessorHandleImplBase(timeMode, keyExprEnc) with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional ?

@@ -43,6 +43,15 @@ object TimerStateUtils {
TimerStateUtils.PROC_TIMERS_STATE_NAME + TimerStateUtils.KEY_TO_TIMESTAMP_CF
}
}

def getTimerStateSecIndexName(timeMode: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we combine with function above and just make a generic function ?

val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas ++
Map(
StateStore.DEFAULT_COL_FAMILY_NAME ->
StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent seems off ?

}

/** Metadata of this stateful operator and its states stores. */
override def operatorStateMetadata(
stateSchemaPaths: List[String]): OperatorStateMetadata = {
stateSchemaPaths: List[List[String]]
): OperatorStateMetadata = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could go to line above ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants