Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cf0f1d2
add sequentialkey column with default empty
rushatgabhane Jan 18, 2026
90893ca
add sequentialkey support to queue dependents
rushatgabhane Jan 18, 2026
69c2f22
add sequentialkey to select and bind to result
rushatgabhane Jan 18, 2026
de8b677
promote next waiting job in sequence to queued
rushatgabhane Jan 18, 2026
58e06af
read sequentialkey and promote next waiting job
rushatgabhane Jan 18, 2026
30d62cc
fix style
rushatgabhane Jan 18, 2026
2876b99
add tests for sequential job queuing and promotion
rushatgabhane Jan 18, 2026
49b64a2
fix style
rushatgabhane Jan 18, 2026
22d2b00
add sequentialkey param to createjob
rushatgabhane Jan 18, 2026
b6e277a
add sequential_key queueing and promote waiting
rushatgabhane Jan 18, 2026
a2faf7f
add test for finish job promotes waiting job
rushatgabhane Jan 18, 2026
e4f34ae
test: promote next waiting with same key
rushatgabhane Jan 18, 2026
2d81e34
add sequential key tests for create job
rushatgabhane Jan 18, 2026
331ce81
add tests for sequentialKey queuing behavior
rushatgabhane Jan 18, 2026
1e2c79b
add test for deletejobpromoteswaitingjob
rushatgabhane Jan 18, 2026
968c24e
add test delete first promotes waiting job
rushatgabhane Jan 18, 2026
a571f6e
add test for failjobpromoteswaitingjob
rushatgabhane Jan 18, 2026
5699abb
add test: fail promotes next waiting with same key
rushatgabhane Jan 18, 2026
c20d8b7
remove sequential job tests
rushatgabhane Jan 18, 2026
16cc3f6
add test for sequential jobs run in order
rushatgabhane Jan 18, 2026
2daa995
add test for three sequential jobs in order
rushatgabhane Jan 18, 2026
4cfa067
Merge branch 'Expensify:main' into sequential-jobs
rushatgabhane Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 71 additions & 14 deletions plugins/Jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ void BedrockPlugin_Jobs::upgradeDatabase(SQLite& db)
"data TEXT NOT NULL, "
"priority INTEGER NOT NULL DEFAULT " + SToStr(JOBS_DEFAULT_PRIORITY) + ", "
"parentJobID INTEGER NOT NULL DEFAULT 0, "
"retryAfter TEXT NOT NULL DEFAULT \"\")",
"retryAfter TEXT NOT NULL DEFAULT \"\", "
"sequentialKey TEXT NOT NULL DEFAULT \"\")",
Copy link

Copilot AI Jan 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing database index for sequentialKey column. The queries at lines 617-620, 1224-1230, 1330-1336, and 1383-1389 filter by sequentialKey without an index, which will cause performance issues as the jobs table grows. An index should be added similar to other indexed columns.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an index? I thinkk let's add it when it becomes a problem

Comment on lines 96 to +100

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Add migration for sequentialKey column

This change only updates the CREATE TABLE jobs SQL. verifyTable doesn’t alter existing tables, so an upgraded deployment with a pre-existing jobs table will still be missing sequentialKey, and subsequent statements like INSERT INTO jobs (..., sequentialKey) or the new SELECT ... sequentialKey will fail with “no such column,” breaking job creation/finish on upgrade. Please add an explicit schema migration (e.g., addColumn) or a guarded alter step for existing databases.

Useful? React with 👍 / 👎.

Copy link
Member Author

@rushatgabhane rushatgabhane Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't do this in code right? have to manually run Alter table before deploying this code

ignore));
// verify and conditionally create indexes
SASSERT(db.verifyIndex("jobsName", "jobs", "( name )", false, !BedrockPlugin_Jobs::isLive));
Expand Down Expand Up @@ -608,13 +609,29 @@ void BedrockJobsCommand::process(SQLite& db)
}
}

// If sequentialKey is set, check if there are other pending jobs with the same key.
// If so, this job should wait for them to complete before running by setting the initial state to WAITING.
const string& safeSequentialKey = SContains(job, "sequentialKey") ? SQ(job["sequentialKey"]) : SQ("");
if (safeSequentialKey != SQ("")) {
SQResult result;
if (!db.read("SELECT 1 FROM jobs "
"WHERE sequentialKey=" + safeSequentialKey + " "
"AND state IN ('QUEUED', 'RUNQUEUED', 'RUNNING', 'WAITING') "
"LIMIT 1;", result)) {
STHROW("502 Select failed");
}
if (!result.empty()) {
initialState = "WAITING";
}
}
Comment on lines +612 to +626
Copy link

Copilot AI Jan 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation to prevent unsupported combinations of sequentialKey with repeat or parentJobID. The PR description states these are unsupported because jobs with repeat or parentJobID don't get deleted on completion, preventing WAITING jobs from being promoted. However, there's no validation to prevent users from creating jobs with these combinations, which would lead to unexpected behavior where WAITING jobs never get promoted to QUEUED.

Copilot uses AI. Check for mistakes.
Comment on lines +612 to +626
Copy link

Copilot AI Jan 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequentialKey logic can override the parentJobID state logic, which could break parent-child job semantics. When both parentJobID and sequentialKey are set (an unsupported combination per the PR description), a child job that should be PAUSED (lines 605-610) can be set to WAITING (line 624), preventing it from running when the parent finishes. This is another reason why the combination should be explicitly validated and rejected.

Copilot uses AI. Check for mistakes.

// If no data was provided, use an empty object
const string& safeRetryAfter = SContains(job, "retryAfter") && !job["retryAfter"].empty() ? SQ(job["retryAfter"]) : SQ("");

// Create this new job with a new generated ID
const int64_t jobIDToUse = SQLiteUtils::getRandomID(db, "jobs", "jobID");
SINFO("Next jobID: " << jobIDToUse);
if (!db.writeIdempotent("INSERT INTO jobs ( jobID, created, state, name, nextRun, repeat, data, priority, parentJobID, retryAfter ) "
if (!db.writeIdempotent("INSERT INTO jobs ( jobID, created, state, name, nextRun, repeat, data, priority, parentJobID, retryAfter, sequentialKey ) "
"VALUES( " +
SQ(jobIDToUse) + ", " +
currentTime + ", " +
Expand All @@ -625,7 +642,8 @@ void BedrockJobsCommand::process(SQLite& db)
safeData + ", " +
SQ(priority) + ", " +
SQ(parentJobID) + ", " +
safeRetryAfter + " " +
safeRetryAfter + ", " +
safeSequentialKey + " " +
" );")) {
STHROW("502 insert query failed");
}
Expand Down Expand Up @@ -1011,7 +1029,7 @@ void BedrockJobsCommand::process(SQLite& db)

// Verify there is a job like this and it's running
SQResult result;
if (!db.read("SELECT state, nextRun, lastRun, repeat, parentJobID, json_extract(data, '$.mockRequest'), retryAfter, json_extract(data, '$.originalNextRun') "
if (!db.read("SELECT state, nextRun, lastRun, repeat, parentJobID, json_extract(data, '$.mockRequest'), retryAfter, json_extract(data, '$.originalNextRun'), sequentialKey "
"FROM jobs "
"WHERE jobID=" + SQ(jobID) + ";",
result)) {
Expand All @@ -1029,6 +1047,7 @@ void BedrockJobsCommand::process(SQLite& db)
mockRequest = result[0][5] == "1";
const string retryAfter = result[0][6];
const string originalDataNextRun = result[0][7];
const string& sequentialKey = result[0][8];

// Make sure we're finishing a job that's actually running
if (state != "RUNNING" && state != "RUNQUEUED" && !mockRequest) {
Expand Down Expand Up @@ -1199,6 +1218,17 @@ void BedrockJobsCommand::process(SQLite& db)
if (!db.read("SELECT 1 FROM jobs WHERE parentJobID != 0 AND parentJobID=" + SQ(jobID) + " LIMIT 1;").empty()) {
STHROW("405 Failed to delete a job with outstanding children");
}

// Promote the next job in sequence that is WAITING
if (!sequentialKey.empty()) {
db.writeIdempotent("UPDATE jobs SET state='QUEUED' "
"WHERE jobID = ("
" SELECT jobID FROM jobs "
" WHERE sequentialKey=" + SQ(sequentialKey) + " "
" AND state='WAITING' "
" ORDER BY created ASC "
" LIMIT 1);");
}
}
}

Expand Down Expand Up @@ -1258,23 +1288,25 @@ void BedrockJobsCommand::process(SQLite& db)
// - data - Data to associate with this failed job
//
BedrockPlugin::verifyAttributeInt64(request, "jobID", 1);
const int64_t jobID = request.calc64("jobID");

// Verify there is a job like this and it's running
SQResult result;
if (!db.read("SELECT state, nextRun, lastRun, repeat "
if (!db.read("SELECT state, nextRun, lastRun, repeat, sequentialKey "
"FROM jobs "
"WHERE jobID=" + SQ(request.calc64("jobID")) + ";",
"WHERE jobID=" + SQ(jobID) + ";",
result)) {
STHROW("502 Select failed");
}
if (result.empty()) {
STHROW("404 No job with this jobID");
}
const string& state = result[0][0];
const string& sequentialKey = result[0][4];

// Make sure we're failing a job that's actually running or running with a retryAfter
if (state != "RUNNING" && state != "RUNQUEUED") {
SINFO("Trying to fail job#" << request["jobID"] << ", but isn't RUNNING or RUNQUEUED (" << state << ")");
SINFO("Trying to fail job#" << jobID << ", but isn't RUNNING or RUNQUEUED (" << state << ")");
STHROW("405 Can only fail RUNNING or RUNQUEUED jobs");
}

Expand All @@ -1289,10 +1321,21 @@ void BedrockJobsCommand::process(SQLite& db)
updateList.push_back("state='FAILED'");

// Update this job
if (!db.writeIdempotent("UPDATE jobs SET " + SComposeList(updateList) + "WHERE jobID=" + SQ(request.calc64("jobID")) + ";")) {
if (!db.writeIdempotent("UPDATE jobs SET " + SComposeList(updateList) + "WHERE jobID=" + SQ(jobID) + ";")) {
STHROW("502 Fail failed");
}

// Promote the next WAITING job with the same sequentialKey
if (!sequentialKey.empty()) {
db.writeIdempotent("UPDATE jobs SET state='QUEUED' "
"WHERE jobID = ("
" SELECT jobID FROM jobs "
" WHERE sequentialKey=" + SQ(sequentialKey) + " "
" AND state='WAITING' "
" ORDER BY created ASC "
" LIMIT 1);");
}

// Successfully processed
return;
}
Expand All @@ -1306,32 +1349,46 @@ void BedrockJobsCommand::process(SQLite& db)
// - jobID - ID of the job to delete
//
BedrockPlugin::verifyAttributeInt64(request, "jobID", 1);
int64_t jobID = request.calc64("jobID");

// Verify there is a job like this and it's not running
SQResult result;
if (!db.read("SELECT state "
if (!db.read("SELECT state, sequentialKey "
"FROM jobs "
"WHERE jobID=" + SQ(request.calc64("jobID")) + ";",
"WHERE jobID=" + SQ(jobID) + ";",
result)) {
STHROW("502 Select failed");
}
if (result.empty()) {
STHROW("404 No job with this jobID");
}
if (result[0][0] == "RUNNING") {
const string& state = result[0][0];
const string& sequentialKey = result[0][1];

if (state == "RUNNING") {
STHROW("405 Can't delete a RUNNING job");
}
if (result[0][0] == "PAUSED") {
if (state == "PAUSED") {
STHROW("405 Can't delete a parent jobs with children running");
}

// Delete the job
if (!db.writeIdempotent("DELETE FROM jobs "
"WHERE jobID=" +
SQ(request.calc64("jobID")) + ";")) {
"WHERE jobID=" + SQ(jobID) + ";")) {
STHROW("502 Delete failed");
}

// Promote the next WAITING job with the same sequentialKey
if (!sequentialKey.empty()) {
db.writeIdempotent("UPDATE jobs SET state='QUEUED' "
"WHERE jobID = ("
" SELECT jobID FROM jobs "
" WHERE sequentialKey=" + SQ(sequentialKey) + " "
" AND state='WAITING' "
" ORDER BY created ASC "
" LIMIT 1);");
}

// Successfully processed
return;
}
Expand Down
8 changes: 7 additions & 1 deletion plugins/Jobs.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Bedrock::Jobs -- Rock solid job queuing
Bedrock::Jobs is a plugin to the [Bedrock data foundation](../README.md) that manages a scheduled job queue. Commands include:

* **CreateJob( name, [data], [firstRun], [repeat] )** - Schedules a new job, optionally in the future, optionally to repeat.
* **CreateJob( name, [data], [firstRun], [repeat], [sequentialKey] )** - Schedules a new job, optionally in the future, optionally to repeat.
* *name* - Any arbitrary string name for this job.
* *data* - (optional) An arbitrary data blob to associate with this job, typically JSON encoded.
* *firstRun* - (optional) The time/date on which to run this job the first time, in "YYYY-MM-DD [HH:MM:SS]" format
* *repeat* - (optional) Description of how this job should repeat (see ["Repeat Syntax"](#repeat-syntax) below)
* *sequentialKey* - (optional) Jobs with the same sequentialKey run one at a time, in creation order (see ["Sequential Jobs"](#sequential-jobs) below)

* **GetJob( name, [connection: wait, [timeout] ] )** - Waits for a match (if requested) and atomically dequeues exactly one job.
* *name* - A pattern to match in GLOB syntax (eg, "Foo*" will get the first job whose name starts with "Foo")
Expand Down Expand Up @@ -158,3 +159,8 @@ Confused by all the above? No problem -- there are a few "canned" patterns buil
* **WEEKLY** = FINISHED, + 7 DAYS

These are useful if you generally want something to happen *approximately but no greater* than the indicated frequency.

### Sequential Jobs
Jobs with the same `sequentialKey` run one at a time, in creation order. New jobs with a key that's already active are set to `WAITING` state. When the active job completes (FinishJob/FailJob/DeleteJob), the oldest WAITING job is promoted to `QUEUED`.

**Unsupported:** `repeat` and `parentJobID` with `sequentialKey` - these jobs don't get deleted on completion, so WAITING jobs would never be promoted.
45 changes: 45 additions & 0 deletions test/clustertest/tests/FinishJobTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct FinishJobTest : tpunit::TestFixture
TEST(FinishJobTest::hasDataDelete),
TEST(FinishJobTest::hasNextRun),
TEST(FinishJobTest::simpleFinishJobWithHttp),
TEST(FinishJobTest::finishJobPromotesWaitingJob),
AFTER(FinishJobTest::tearDown),
AFTER_CLASS(FinishJobTest::tearDownClass))
{
Expand Down Expand Up @@ -602,4 +603,48 @@ struct FinishJobTest : tpunit::TestFixture
clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result);
ASSERT_TRUE(result.empty());
}

// FinishJob should promote the next WAITING job with same sequentialKey
void finishJobPromotesWaitingJob()
{
// Create first job
SData command("CreateJob");
command["name"] = "testSequential1";
command["sequentialKey"] = "test_key_finish";
STable response1 = tester->executeWaitVerifyContentTable(command);
string jobID1 = response1["jobID"];

// Create second job
command.clear();
command.methodLine = "CreateJob";
command["name"] = "testSequential2";
command["sequentialKey"] = "test_key_finish";
STable response2 = tester->executeWaitVerifyContentTable(command);
string jobID2 = response2["jobID"];

// Verify second job is WAITING
SQResult result;
clusterTester->getTester(0).readDB("SELECT state FROM jobs WHERE jobID = " + jobID2 + ";", result);
ASSERT_EQUAL(result[0][0], "WAITING");

// Get first job to put it in RUNNING state
command.clear();
command.methodLine = "GetJob";
command["name"] = "testSequential1";
tester->executeWaitVerifyContent(command);

// Finish first job
command.clear();
command.methodLine = "FinishJob";
command["jobID"] = jobID1;
tester->executeWaitVerifyContent(command);

// Verify first job is deleted
clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID1 + ";", result);
ASSERT_TRUE(result.empty());

// Verify second job is now QUEUED
clusterTester->getTester(0).readDB("SELECT state FROM jobs WHERE jobID = " + jobID2 + ";", result);
ASSERT_EQUAL(result[0][0], "QUEUED");
}
} __FinishJobTest;
Loading