-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DB trimming #174
Add DB trimming #174
Conversation
c136d10
to
c27c97a
Compare
a09d3b1
to
850befa
Compare
In addition to updating dependencies, also update the GoLang version that the application is dependent on to 1.23. In that connection the base docker image is also updated, which now depends on Alpine 3.20. On the way to updating dependency versions also made the tests compatible with WSL2; the primary step was to skip tests that depends on observing configuration file changes since that doesn't work in WSL2.
This refactor is the first step in introducing second command of running a DB trim command to export, archive and delete completed messages and jobs
Currently only the web service command "broker" is used.
syscall.Kill is a system library call not available in Windows making it difficult to compile the code as is in Windows and for proper IDE support. So wrapped syscall.Kill to a system specific build constrained implementation to avoid the compile error.
This is done to ensure the project builds (not run tests) in Windows
The configuration allows for remote export and even local export path to be optional. Retention days and export node name enables pruning to take place
Add prune service directory for adding the prune implementation. Also fix Makefile to install the right mockery tool.
These data access implementations are meant to be used by pruning to delete old messages and their respective completed jobs for archiving purpose
3 changes required to make the prune work properly was - 1. Respect absolute max when return batch of messages 2. If remote archive is nil do not attempt close 3. Make sure if remote file prefix is empty that remote object name does not have a / at the beginning of the key name
Messages involved loading channel and producer; and loading job involved loading consumer; so to avoid n*2 + 1 for loading messages we added caching for channel, producer and consumer; this will be effective largely when archiving thousands and millions of old messages and jobs. This is wired through the dependency injection keeping it transparent to rest of the stack via delegator pattern.
r, err := bucket.NewReader(ctx, objectName, nil) | ||
exists, exists_err := bucket.Exists(ctx, objectName) | ||
if exists_err != nil { | ||
return nil, fmt.Errorf("failed to check if object exists: %w", exists_err) | ||
} | ||
if err != nil && exists { | ||
return nil, fmt.Errorf("failed to create reader: %w", err) | ||
} | ||
if err == nil { | ||
defer r.Close() | ||
rm.currentSize = r.Size() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we intentionally ignored err for a case here? err != nil && !exists
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we ignore the open error if the file does not exist, IOW file needs to be created
var ( | ||
initLocalArchiveManager = func(pruneConfig config.MessagePruningConfig) (*ArchiveWriteManager, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why are these functions defined like this (why not func initLocalArchiveManager
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defining function as a variable makes it easier to mock this function when we want to test this function returning an error.
return fmt.Errorf("failed to write message and jobs to local archive: %w", err) | ||
} | ||
if director.RemoteArchiveManager != nil { | ||
_, err = director.RemoteArchiveManager.Write(context.Background(), jsonStr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we writing to remote destination one message at a time? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not make any difference if its one at a time or 100s; since the write would be flushed only at .close()
or when the file reaches rotation size. With 20MB file size we observed approximately 3000 messages are written to a file.
@@ -115,6 +115,13 @@ func (djRepo *DeliveryJobDBRepository) MarkDeadJobAsInflight(deliveryJob *data.D | |||
return err | |||
} | |||
|
|||
func (djRepo *DeliveryJobDBRepository) DeleteJobsForMessage(message *data.Message) error { | |||
err := transactionalWrites(djRepo.db, func(tx *sql.Tx) error { | |||
return inTransactionExec(tx, emptyOps, "DELETE FROM job WHERE messageId like ?", args2SliceFnWrapper(message.ID), 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, why like
and not =
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No particular reason, just that I prefer using LIKE
operator for string matching
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🛳️
Since writes can fail at any given point, the actual deletion of messages and jobs should happen only if writes complete and that too within a single transaction.
Functionally tested the code, seems be working as expected. So merging the PR. |
Auto Pruning of Webhook Broker Database.
Problem Statement
While Webhook Broker has been running in production, one of the main issues that effects webhook broker is the data volume that is in the DB. As messages and jobs are never deleted, these 2 tables keeps getting very large. At one stage sieging to operate as DB queries for failure jobs become extremely less responsive.
Solution Approach
While we want to prune, we also want to retain the messages for archival purpose just in case we need it. So what we are doing is, there is a retention period to keep data in the database; by default its set to 0 days, which means pruning is disabled; recommendation is for it to be set to 3~7; so any messages older than 7 days and if all of its jobs has been delivered, that is, there is no job in DLQ for that message, then it will be selected for deletion. The message and its jobs will be serialized in a single JSON-Line to 2 files - one in local directory and one remote (test purpose remote can also be
file:///
and it is optional). The files will also be rotated if they reach file limit, default size is 100MB. Configuration template discusses the[prune]
section in more details.To make sure pruning and broker does not run at the same time, we have introduced sub-commands. Default command is to run the broker to keep it backward compatible; to run prune mode just add
-command prune
.Some implementation quirks (intentional)
Other changes made
go build
to pass in Windows to make development some what feasible, still test execution is a nightmareNext changes
This change is the major work for #154