Skip to content

Commit c38844c

Browse files
tedyudongjoon-hyun
authored andcommitted
[SPARK-49687][SQL] Delay sorting in validateAndMaybeEvolveStateSchema
### What changes were proposed in this pull request? In `validateAndMaybeEvolveStateSchema`, existing schema and new schema are sorted by column family name. The sorting can be delayed until `createSchemaFile` is called. When computing `colFamiliesAddedOrRemoved`, we can use `toSet` to compare column families. ### Why are the changes needed? This would make `validateAndMaybeEvolveStateSchema` faster. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48116 from tedyu/ty-comp-chk. Authored-by: Zhihong Yu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 6393afa commit c38844c

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,12 @@ class StateSchemaCompatibilityChecker(
168168
newStateSchema: List[StateStoreColFamilySchema],
169169
ignoreValueSchema: Boolean,
170170
stateSchemaVersion: Int): Boolean = {
171-
val existingStateSchemaList = getExistingKeyAndValueSchema().sortBy(_.colFamilyName)
172-
val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName)
171+
val existingStateSchemaList = getExistingKeyAndValueSchema()
172+
val newStateSchemaList = newStateSchema
173173

174174
if (existingStateSchemaList.isEmpty) {
175175
// write the schema file if it doesn't exist
176-
createSchemaFile(newStateSchemaList, stateSchemaVersion)
176+
createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion)
177177
true
178178
} else {
179179
// validate if the new schema is compatible with the existing schema
@@ -188,9 +188,9 @@ class StateSchemaCompatibilityChecker(
188188
}
189189
}
190190
val colFamiliesAddedOrRemoved =
191-
newStateSchemaList.map(_.colFamilyName) != existingStateSchemaList.map(_.colFamilyName)
191+
(newStateSchemaList.map(_.colFamilyName).toSet != existingSchemaMap.keySet)
192192
if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) {
193-
createSchemaFile(newStateSchemaList, stateSchemaVersion)
193+
createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion)
194194
}
195195
// TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3
196196
colFamiliesAddedOrRemoved

0 commit comments

Comments
 (0)