Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: close producer when topic is deleted #4147

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

fraidev
Copy link
Contributor

@fraidev fraidev commented Aug 21, 2024

Related: #3836

We already close the consumer when the topic is deleted, now we will close the producer too.

Given:

fluvio produce my-topic   

When:

fluvio topic delete my-topic   

Then:

Topic "my-topic" was deleted

@fraidev fraidev marked this pull request as ready for review August 22, 2024 00:01
@fraidev fraidev force-pushed the close_produce branch 3 times, most recently from 43c2c9e to 66461d5 Compare August 22, 2024 01:16
Copy link
Contributor

@sehz sehz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right approach. This should work for all client not just CLI. client should detect if topic is closed then close connection

@@ -39,6 +39,7 @@ producer-file-io = ["fluvio-cli-common/file-records"]
[dependencies]
async-channel = { workspace = true }
async-trait = { workspace = true }
async-std = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't bring in async-std.

@fraidev
Copy link
Contributor Author

fraidev commented Aug 22, 2024

I don't think this is right approach. This should work for all client not just CLI. client should detect if topic is closed then close connection

Make sense! But the producer is never what is blocking.
In this case, the blocker is the stdin and for connectors the source stream that is blocking:

#[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
    let source = TestJsonSource::new(&config)?;
    let mut stream = source.connect(None).await?;
    while let Some(item) = stream.next().await {
        println!("producing a value: {}", &item);
        producer.send(RecordKey::NULL, item).await?;
    }
    Ok(())
}

Maybe we can change TopicProducerPool to detect an error when the topic is deleted and also add a method to detect it, but we will also need to add something like select! and change uses of the producer.

Another solution that I had is have a method that receives a stream and returns a stream that handles it.

async fn from_stream(stream: GenericStream) -> ProducerStream

But we will need to change implementations too:

#[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
    let source = TestJsonSource::new(&config)?;
    let mut stream = source.connect(None).await?;
    while let Some(item) = producer.from_stream(stream).await {
        println!("producing a value: {}", &item);
        producer.send(RecordKey::NULL, item).await?;
    }
    Ok(())
}

What do you think?

@fraidev fraidev requested a review from sehz August 22, 2024 22:35
@digikata
Copy link
Contributor

Is cluster side socket already closed by the SPU? I think that's the first thing to handle for this case. In addition, if possible it would be nice for the SPU to send a message that the produce stream is being closed due to topic delete, and then close the spu side socket.

@fraidev
Copy link
Contributor Author

fraidev commented Aug 26, 2024

Is cluster side socket already closed by the SPU? I think that's the first thing to handle for this case. In addition, if possible it would be nice for the SPU to send a message that the produce stream is being closed due to topic delete, and then close the spu side socket.

Yeah, I think so. The SPU already send a TopicNotFound error when does not find it.

But I think that we will have the same problem. All usage of producer seems to be:

  • Await an input
  • Produce it

The Produce it part will return the SPU error, but it will be trigger only after the input.

I think that this issue aims to be notified before the input.

Copy link

Stale pull request message

@github-actions github-actions bot closed this Nov 3, 2024
@digikata digikata added the no-stale Opt-out of closing issue due to no activity label Nov 4, 2024
@digikata digikata reopened this Nov 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
no-pr-activity no-stale Opt-out of closing issue due to no activity
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants