Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16339: [1/4 KStream#transform] Remove Deprecated "transformer" methods and classes #17198

Open
wants to merge 24 commits into
base: trunk
Choose a base branch
from

Conversation

fonsdant
Copy link
Contributor

Hi, @mjsax! I have removed the transform methods from the main and test classes. However, I have asked myself whether the approach I have used for StandbyTaskEOSIntegrationTest and StreamThreadTest is correct. Should I replace transform with process as suggested in transform Javadoc or remove these tests? Thanks!

@fonsdant fonsdant changed the title KAFKA-16339: [1/4 KStream#tranform] Remove Deprecated "transformer" methods and classes KAFKA-16339: [1/4 KStream#transform] Remove Deprecated "transformer" methods and classes Sep 15, 2024
@fonsdant
Copy link
Contributor Author

I have missed removing the references to transform on Javadocs too. I will push another commit for it.

@fonsdant
Copy link
Contributor Author

I have noticed that KStreamTransformIntegrationTest.TestTransformer could be removed as it has become unused.

@fonsdant fonsdant force-pushed the kstream-transform-remove-deprecated-transformer-methods-and-classes branch 2 times, most recently from bafaa10 to f28ac72 Compare September 17, 2024 23:17
@fonsdant
Copy link
Contributor Author

For now, I have removed the build with deduplicaiton topology test until we decide how to replace the transform method inside it.

@fonsdant
Copy link
Contributor Author

I have replaced tranform with process as suggested by docs. I have implemented Processor based on StandbyTaskEOSMultiRebalanceIntegrationTest.buildWithUniqueIdAssignmentTopology, but shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing has failed.

@fonsdant
Copy link
Contributor Author

I have managed to fix the test! :D

@fonsdant
Copy link
Contributor Author

I have removed the references to transform from docs. I am wondering if it would be better to replace transform with process, instead of removing it. I have started to think about it while removing the flatTransformValues (the third method to remove). @mjsax, could you help me with this decision?

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Made a pass.

Btw: is there anything in docs/streams/... that we need to update/remove, too (we can also do this as a single docs-only PR for the whole ticket, ie, all transformXxx() method)

@@ -179,8 +178,7 @@ <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? exten
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
* stateful record transformation).
* This is a stateless record-by-record operation.
Copy link
Member

@mjsax mjsax Sep 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it think we should link to #process(...) here, instead of removing (same elsewhere where it make sense).

Btw: feel free to skip updating JavaDocs of other deprecated methods which will be remove in follow up PRs of this ticket.

@@ -360,11 +368,9 @@ public KeyValue<Integer, Integer> transform(final Integer key, final Integer val
throw new RuntimeException("Injected test error");
}

return KeyValue.pair(key, value);
store.put(key, value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do we add a put here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the return null replacement with delete. Since we would return a pair with key and value, now I put it in store.

@ParameterizedTest
@MethodSource("data")
@SuppressWarnings("deprecation")
public void shouldPunctuateWithTimestampPreservedInProcessorContext(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep this test, but rewrite using #process().

@@ -276,61 +274,6 @@ class TopologyTest {
assertEquals(getTopologyScala, getTopologyJava)
}

@nowarn
@Test
def shouldBuildIdenticalTopologyInJavaNScalaTransform(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we have a test gap, as we don't have a similar test for process() -- should we add a new test for #process() like this one (or rewrite this test for #process())?

Copy link
Contributor Author

@fonsdant fonsdant Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have rewritten this test with process, but have missed to rewrite the test name. I will fix it.

@@ -222,42 +221,6 @@ class KStreamTest extends TestDriver {
testDriver.close()
}

@nowarn
@Test
def testTransformCorrectlyRecords(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above. Seems we are lacking a similar test for #process() ?

Copy link
Contributor Author

@fonsdant fonsdant Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I have replaced Transformer with ProcessorSupplier and transform with process. Have I missed something else?

@nowarn
@Test
def testSettingNameOnTransform(): Unit = {
class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

@fonsdant
Copy link
Contributor Author

fonsdant commented Oct 8, 2024

I will push some commits, one for each file from which I have removed the transform link, replacing it with process.

@fonsdant fonsdant closed this Oct 14, 2024
@fonsdant fonsdant force-pushed the kstream-transform-remove-deprecated-transformer-methods-and-classes branch from 35fd141 to 203f323 Compare October 14, 2024 20:48
@github-actions github-actions bot added the small Small PRs label Oct 14, 2024
@fonsdant fonsdant reopened this Oct 14, 2024
@github-actions github-actions bot removed the small Small PRs label Oct 14, 2024
@fonsdant
Copy link
Contributor Author

I have rebased my branch on trunk and this caused GitHub to close the PR. I am implementing the last review points again.

@fonsdant
Copy link
Contributor Author

This time, I have focused on removing the methods first and doing refactorings. After this, I will proceed with Javadoc updates.

I have made the removing part. Now, I will do the refactoring part.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR. LGTM.

Just triggered CI. Can merge after it finished and passed.

@mjsax
Copy link
Member

mjsax commented Oct 18, 2024

@fonsdant -- seems there is some compilation error:

StandbyTaskEOSIntegrationTest.java:332: error: cannot find symbol

            .transform(
> Task :streams:compileTestJava
            ^
  symbol:   method transform(()->new or[...]{ } },String)
  location: interface KStream<Integer,Integer>

Seem we need to rewrite this test to use the new process() instead of transform()

@fonsdant
Copy link
Contributor Author

Thanks, @mjsax, still need to refactor it. Meanwhile, do you prefer that I convert this PR to draft? Or should I keep it as it is?

@mjsax
Copy link
Member

mjsax commented Oct 22, 2024

We can keep this PR as-is -- you can also to the refactoring as part of this PR if you want.

@mjsax
Copy link
Member

mjsax commented Oct 30, 2024

@fonsdant Any update on this PR? -- We are approaching AK 4.0 release deadlines, and there is 3 more PRs for this ticket...

@fonsdant
Copy link
Contributor Author

@mjsax, sorry for the delay, I am still refactoring the tests, replacing transform with process.

Given the coming deadline, I agree it would be adequate someone more experienced to resume it.

@fonsdant
Copy link
Contributor Author

I will push the work I done this passed week.

@mjsax
Copy link
Member

mjsax commented Oct 31, 2024

Given the coming deadline, I agree it would be adequate someone more experienced to resume it.

As long as you have enough time to keep working, I think we are a good. Of course, if you are blocked, let us know.

Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
@fonsdant fonsdant force-pushed the kstream-transform-remove-deprecated-transformer-methods-and-classes branch from 46a01a6 to 7a65aed Compare November 4, 2024 11:59
//
// val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
// assertEquals("my-name", transformNode.name())
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rewrite this to testSettingNameOnProcess?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry! I have missed it up. Of course, I will rewrite it!

Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
Signed-off-by: Joao Pedro Fonseca Dantas <[email protected]>
@fonsdant
Copy link
Contributor Author

fonsdant commented Nov 5, 2024

@mjsax, I have removed the adding lines in buildTopologyWithDeduplication in StandbyTaskEOSIntegrationTest to match the previous implementation with transform (but still with process). Also, removed the comments of the KStreamTest scala test and rewritten it. Finally, removed var usage.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turn around. I think the Scala test can be simplified significantly. In the end, we only verify the name of he processor node in the Topology.

@Test
def testSettingNameOnTransform(): Unit = {
val processorSupplier: ProcessorSupplier[String, String, String, String] =
new api.ProcessorSupplier[String, String, String, String] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need an explicit ProcessorSupplier for this test? And we use process(() => new TestProcessor) similar to the old code?

def testSettingNameOnTransform(): Unit = {
val processorSupplier: ProcessorSupplier[String, String, String, String] =
new api.ProcessorSupplier[String, String, String, String] {
private val storeName = "store-name"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we add a store?

val processedKey = s"$key-processed"
val processedValue = s"$value-processed"
store.put(processedKey, processedValue)
context.forward(new api.Record(processedKey, processedValue, record.timestamp()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the Processor does is actually pretty meaningless for the test, so I think the while implementation could actually be empty? For this case, we don't even need init() method, as we don't need access to the context either.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants