Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sftp): make sure to delete last file when watch and delete_on_finish are enabled #3037

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

ooesili
Copy link
Contributor

@ooesili ooesili commented Nov 26, 2024

Fixes #2435

Questions

I believe I have fixed the underlying issue, but I am not sure how to write an integration test to verify the fix. I have created a new integration test function with a TODO comment on where I got stuck. The questions I have around this are:

  • My plan was to start a pipeline with watch and delete_on_finished enabled, the use an SFTP client directly to inspect which files exist on the server to make sure they are all deleted after the pipeline runs. However, I'm not sure how to actually run the pipeline. Is too specific of a test to run using integration.StreamTests(), and if not, could you point me in the right direction?
  • The other pattern I've seen would be to call newSFTPReaderFromParsed() directly from the tests then use Connect(), and ReadBatch() to interact with the plugin. However this plugin appears to be unusually structured in the way that it progresses through the input files. What it does is finds the first file in Connect() and sets up the scanner for the file. In ReadBatch(), when the file is exhausted, ReadBatch() returns service.ErrNotConnected which will cause the engine to re-run Connect() which advances to the next file. If the plugin only required Connect() to be called once, I would be happy to drive the plugin directly in the tests, but because of the reconnection logic required, I was hesitant to reimplement the reconnection loop in the tests. Is there a utility somewhere that I can use from a test that implements the reconnect logic?

@ooesili ooesili added bug inputs Any tasks or issues relating specifically to inputs labels Nov 26, 2024
@ooesili ooesili self-assigned this Nov 26, 2024
@ooesili ooesili force-pushed the sftp-delete-last-file branch from 83668cd to 2e47f2a Compare November 26, 2024 19:50
@rockwotj
Copy link
Collaborator

I don't think there is a utility so either you need to do option 1 or implement the retry logic - which I don't think should be too bad?

Here's the code that drives this in benthos AFAIK: https://github.com/redpanda-data/benthos/blob/dad70374cd8fb323f0c7f47452498ea94c2ed7aa/internal/component/input/async_reader.go#L115

The pipeline option (number 1) might be the best route, but I'm not too familiar with that test helper myself.

This commit reduces the scope of critical sections guarded by scannerMut
to remove a deadlock that causes the last file to not be deleted when
the SFTP input is used with watching enabled.
`(*watcherPathProvider).Next()` currently uses recursion to loop until a
path is found. This commit refactors that function to use a for loop
instead which is more straight forward to read.
This integration test makes sure that when `delete_on_finish` is true
and watching is enabled that we delete every file.
@ooesili ooesili force-pushed the sftp-delete-last-file branch from 1bbf6fa to ab133f4 Compare December 3, 2024 21:34
@ooesili ooesili marked this pull request as ready for review December 3, 2024 22:01
Comment on lines +155 to +156
builder := service.NewStreamBuilder()
require.NoError(t, builder.SetYAML(config))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rockwotj

I don't think there is a utility so either you need to do option 1 or implement the retry logic - which I don't think should be too bad?

Here's the code that drives this in benthos AFAIK: https://github.com/redpanda-data/benthos/blob/dad70374cd8fb323f0c7f47452498ea94c2ed7aa/internal/component/input/async_reader.go#L115

The pipeline option (number 1) might be the best route, but I'm not too familiar with that test helper myself.

Knowledge from the great and powerful @mihaitodor

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great tests thanks :)

// path from the cache then we skip that path because the
// watcher will eventually poll again, and the cache.Get
// operation will re-run.
if v, err := cache.Get(ctx, path); errors.Is(err, service.ErrKeyNotFound) || (!w.followUpPoll && string(v) == "!") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: separately it would be nice to use constants for the pending symbol and other cache values :)

s.scannerMut.Lock()
defer s.scannerMut.Unlock()

if s.scanner != nil {
return nil
skip = true
Copy link
Collaborator

@rockwotj rockwotj Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we skip if there is a scanner (and what are we skipping)? Can you add a comment?

outErr = fmt.Errorf("remove %v: %w", nextPath, outErr)
}
}
s.scannerMut.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we always return after this block we could defer this right? I just get worried if the unlock is not right after.


details := service.NewScannerSourceDetails()
details.SetName(nextPath)
if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the assignment of s.scanner needs to be under the mutex as well based on the ReadBatch function right?

@@ -242,11 +241,22 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat
s.pathProvider = s.getFilePathProvider(ctx)
}

return s.client, s.pathProvider, false, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a newcomer to this code this feels unsafe.

If the only important part of this code that all these variables are accessed/set together atomically, I do wonder if an atomic is better suited. You can use Swap to set the new value and destroy in Close, Store in Connect and Load in ReadBatch. I don't quite understand the higher level contract here of why it's only required that they are accessed concurrently and we don't have to worry about Close clobbering something ongoing in Connect or ReadBatch.

Copy link
Collaborator

@rockwotj rockwotj Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And when I talk about using atomics I mean using typed.AtomicValue in our typed package in internal/typed and wrapping all this state into a struct so it becomes typed.AtomicValue[*sftpReaderState]

Comment on lines +155 to +156
builder := service.NewStreamBuilder()
require.NoError(t, builder.SetYAML(config))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great tests thanks :)

@mihaitodor mihaitodor mentioned this pull request Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug inputs Any tasks or issues relating specifically to inputs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SFTP input - last file not deleted
2 participants