[#2596] feat(spark): Introduce fory serializer#2597
[#2596] feat(spark): Introduce fory serializer#2597zuston wants to merge 8 commits intoapache:masterfrom
Conversation
|
cc @chaokunyang . If you have time, could you help review this integration with Fory? So far, this implementation hasn’t shown significant improvements. I would greatly appreciate any guidance you could provide on using Fory. |
Test Results 2 731 files - 359 2 731 suites - 359 4h 10m 44s ⏱️ - 2h 38m 52s For more details on these failures and errors, see this check. Results for commit c2a7d46. ± Comparison against base commit d5e689c. This pull request removes 99 and adds 13 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
| <dependency> | ||
| <groupId>org.apache.fory</groupId> | ||
| <artifactId>fory-core</artifactId> | ||
| <version>0.12.0</version> |
There was a problem hiding this comment.
Please also introduce fory-scala dependency: https://mvnrepository.com/artifact/org.apache.fory/fory-scala
There was a problem hiding this comment.
Pity to say that Spark still uses the scala2.x
| .withRefTracking(true) | ||
| .withCompatibleMode(CompatibleMode.COMPATIBLE) | ||
| .requireClassRegistration(false) | ||
| .build() |
There was a problem hiding this comment.
you should also register scala serializers and enable scala serialization optimization:
val f = Fory.builder()
.withLanguage(Language.JAVA)
.withRefTracking(true)
.withCompatibleMode(CompatibleMode.COMPATIBLE)
.requireClassRegistration(false)
.withScalaOptimizationEnabled(true)
.build()
ScalaSerializers.registerSerializers(f)See more details on https://fory.apache.org/docs/docs/guide/scala_guide#fory-creation
| } | ||
|
|
||
| override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { | ||
| val array = if (bytes.hasArray) { |
There was a problem hiding this comment.
You can pass bytebuffer to fory directly without an extra copy
| throw new IllegalStateException("Stream is closed") | ||
| } | ||
|
|
||
| val bytes = fury.serialize(t.asInstanceOf[AnyRef]) |
There was a problem hiding this comment.
Maybe hold a Fory MemoryBuffer as an instance field in the class, and serialize object into that buffer, then you can get heap buffer from that buffer, and write it into out. In this way, yo u can reduce a copy
| } | ||
|
|
||
| private def writeInt(value: Int): Unit = { | ||
| out.write((value >>> 24) & 0xFF) |
There was a problem hiding this comment.
Just use:
public void writeInt64(MemoryBuffer buffer, long value) {
LongSerializer.writeInt64(buffer, value, longEncoding);
}
public long readInt64(MemoryBuffer buffer) {
return LongSerializer.readInt64(buffer, longEncoding);
}
This will be faster and simple, it also compress data
|
Shuffle data should already be binary, is there anything that needs being serialized? Have you ever benchmark your job to see whether there is bottleneck on serialization? |
|
Big thanks for your quick and patient review. @chaokunyang
If using the vanilla spark, record is a object class and then serialized into bytes to push to remote shuffle-server. If using the gluten/auron/datafusion-comet, there is no need to serialize.
Haven't. This PR is still in initial phase |
|
Only if you are using spark rdd with raw java objects, there will be serialization bottleneck. Such cases are similiar to datastream in flink. We've observed several times of e2e performance speed up for multiple cases. |
Thanks for your sharing. Do you mean that there is no need to optimize performance of vanilla spark SQL shuffle serialization ? |
|
Data record in Spark SQL are alreay binary, there is no serialization happened. I suggest benchmark first before optimizing. |
It seems that serialization is still happening. https://github.com/apache/spark/blob/2de0248071035aa94818386c2402169f6670d2d4/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala#L57 The product2 contains the Key/Value that will be serializated. refer: https://github.com/apache/spark/blob/47991b074a5a277e1fb75be3a5cc207f400b0b0c/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L243 |
|
The serialization of Spark happens in the shuffle write shuffle stage. |
What changes were proposed in this pull request?
This is an experimental feature to introduce the fory serializer to replace the villina spark serializer to speed up.
Why are the changes needed?
for #2596
Does this PR introduce any user-facing change?
Yes.
spark.rss.client.shuffle.serializer=FORYHow was this patch tested?
Unit test.