Skip to content

Commit

Permalink
Fixed DELETE RETURNING handling, update documentation for SKIP LOCKED.
Browse files Browse the repository at this point in the history
Revert the old way of BLR generation for DELETE RETURNING when SKIP LOCKED is not specified.
  • Loading branch information
hvlad committed Nov 23, 2023
1 parent f3205ff commit 69f8afd
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 19 deletions.
6 changes: 3 additions & 3 deletions doc/sql.extensions/README.skip_locked.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ DELETE FROM <sometable>

## Notes

As it happens with subclauses `FIRST`/`SKIP`/`ROWS`/`OFFSET`/`FETCH` record lock
(and "skip locked" check) is done in between of skip (`SKIP`/`ROWS`/`OFFSET`/`FETCH`) and
limit (`FIRST`/`ROWS`/`OFFSET`/`FETCH`) checks.
If statement have both `SKIP LOCKED` and `SKIP`/`ROWS` subclauses, some locked rows
could be skipped before `SKIP`/`ROWS` subclause would account it, thus skipping more
rows than specified in `SKIP`/`ROWS`.

## Examples

Expand Down
55 changes: 42 additions & 13 deletions src/dsql/StmtNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2273,9 +2273,9 @@ const StmtNode* DeclareVariableNode::execute(thread_db* tdbb, Request* request,
//--------------------


static RegisterNode<EraseNode> regEraseNode({blr_erase});
static RegisterNode<EraseNode> regEraseNode({blr_erase, blr_erase2});

DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch* csb, const UCHAR /*blrOp*/)
DmlNode* EraseNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb, const UCHAR blrOp)
{
const USHORT n = csb->csb_blr_reader.getByte();

Expand All @@ -2288,6 +2288,9 @@ DmlNode* EraseNode::parse(thread_db* /*tdbb*/, MemoryPool& pool, CompilerScratch
if (csb->csb_blr_reader.peekByte() == blr_marks)
node->marks |= PAR_marks(csb);

if (blrOp == blr_erase2)
node->returningStatement = PAR_parse_stmt(tdbb, csb);

return node;
}

Expand Down Expand Up @@ -2385,6 +2388,7 @@ string EraseNode::internalPrint(NodePrinter& printer) const
NODE_PRINT(printer, dsqlSkipLocked);
NODE_PRINT(printer, statement);
NODE_PRINT(printer, subStatement);
NODE_PRINT(printer, returningStatement);
NODE_PRINT(printer, stream);
NODE_PRINT(printer, marks);

Expand All @@ -2397,11 +2401,14 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
{
std::optional<USHORT> tableNumber;

const bool skipLocked = dsqlRse && dsqlRse->hasSkipLocked();

if (dsqlReturning && !dsqlScratch->isPsql())
{
if (dsqlCursorName.isEmpty())
{
dsqlScratch->appendUChar(blr_begin);
if (!skipLocked)
dsqlScratch->appendUChar(blr_begin);

tableNumber = dsqlScratch->localTableNumber++;
dsqlGenReturningLocalTableDecl(dsqlScratch, tableNumber.value());
Expand All @@ -2422,28 +2429,31 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)

const auto* context = dsqlContext ? dsqlContext : dsqlRelation->dsqlContext;

if (dsqlReturning)
if (dsqlReturning && !skipLocked)
{
dsqlScratch->appendUChar(blr_begin);
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber);
}

dsqlScratch->appendUChar(blr_erase);
dsqlScratch->appendUChar(dsqlReturning && skipLocked ? blr_erase2 : blr_erase);
GEN_stuff_context(dsqlScratch, context);

if (marks)
dsqlScratch->putBlrMarkers(marks);

if (dsqlReturning)
{
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber);

dsqlScratch->appendUChar(blr_end);
if (!skipLocked)
dsqlScratch->appendUChar(blr_end);
else
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber);

if (!dsqlScratch->isPsql() && dsqlCursorName.isEmpty())
{
dsqlGenReturningLocalTableCursor(dsqlScratch, dsqlReturning, tableNumber.value());

dsqlScratch->appendUChar(blr_end);
if (!skipLocked)
dsqlScratch->appendUChar(blr_end);
}
}
}
Expand All @@ -2455,6 +2465,9 @@ EraseNode* EraseNode::pass1(thread_db* tdbb, CompilerScratch* csb)
doPass1(tdbb, csb, statement.getAddress());
doPass1(tdbb, csb, subStatement.getAddress());

AutoSetRestore<bool> autoReturningExpr(&csb->csb_returning_expr, true);
doPass1(tdbb, csb, returningStatement.getAddress());

return this;
}

Expand Down Expand Up @@ -2571,6 +2584,7 @@ void EraseNode::pass1Erase(thread_db* tdbb, CompilerScratch* csb, EraseNode* nod
EraseNode* EraseNode::pass2(thread_db* tdbb, CompilerScratch* csb)
{
doPass2(tdbb, csb, statement.getAddress(), this);
doPass2(tdbb, csb, returningStatement.getAddress(), this);
doPass2(tdbb, csb, subStatement.getAddress(), this);

const jrd_rel* const relation = csb->csb_rpt[stream].csb_relation;
Expand Down Expand Up @@ -2649,6 +2663,8 @@ const StmtNode* EraseNode::execute(thread_db* tdbb, Request* request, ExeState*
// Perform erase operation.
const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger whichTrig) const
{
impure_state* impure = request->getImpure<impure_state>(impureOffset);

jrd_tra* transaction = request->req_transaction;
record_param* rpb = &request->req_rpb[stream];
jrd_rel* relation = rpb->rpb_relation;
Expand All @@ -2674,6 +2690,12 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
}

case Request::req_return:
if (impure->sta_state == 1)
{
impure->sta_state = 0;
rpb->rpb_number.setValid(false);
return parentStmt;
}
break;

default:
Expand Down Expand Up @@ -2735,9 +2757,9 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger

if (!VIO_erase(tdbb, rpb, transaction))
{
// Record was not deleted, flow control should be passed to the
// parent ForNode. Note, If RETURNING clause was specified, then
// parent node is CompoundStmtNode, not ForNode. If\when this
// Record was not deleted, flow control should be passed to the parent
// ForNode. Note, If RETURNING clause was specified and SKIP LOCKED was
// not, then parent node is CompoundStmtNode, not ForNode. If\when this
// will be changed, the code below should be changed accordingly.

if (skipLocked)
Expand Down Expand Up @@ -2789,6 +2811,13 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
}
}

if (returningStatement)
{
impure->sta_state = 1;
request->req_operation = Request::req_evaluate;
return returningStatement;
}

rpb->rpb_number.setValid(false);

return parentStmt;
Expand Down Expand Up @@ -7774,7 +7803,7 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, Request* request, WhichTrigg

SavepointChangeMarker scMarker(transaction);

// Prepare to undo changed by PRE-triggers if record is locked by another
// Prepare to undo changes by PRE-triggers if record is locked by another
// transaction and update should be skipped.
const bool skipLocked = orgRpb->rpb_stream_flags & RPB_s_skipLocked;
CondSavepointAndMarker spPreTriggers(tdbb, transaction,
Expand Down
1 change: 1 addition & 0 deletions src/dsql/StmtNodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ class EraseNode final : public TypedNode<StmtNode, StmtNode::TYPE_ERASE>
dsql_ctx* dsqlContext = nullptr;
NestConst<StmtNode> statement;
NestConst<StmtNode> subStatement;
NestConst<StmtNode> returningStatement;
NestConst<ForNode> forNode; // parent implicit cursor, if present
StreamType stream = 0;
unsigned marks = 0; // see StmtNode::IUD_MARK_xxx
Expand Down
3 changes: 1 addition & 2 deletions src/include/firebird/impl/blr.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,7 @@
#define blr_agg_list (unsigned char)170
#define blr_agg_list_distinct (unsigned char)171
#define blr_modify2 (unsigned char)172

// unused codes: 173
#define blr_erase2 (unsigned char)173

/* FB 1.0 specific BLR */

Expand Down
2 changes: 1 addition & 1 deletion src/jrd/blp.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ static const struct
{"agg_list", two}, // 170
{"agg_list_distinct", two},
{"modify2", modify2},
{NULL, NULL},
{"erase2", erase2},
// New BLR in FB1
{"current_role", zero},
{"skip", one},
Expand Down
1 change: 1 addition & 0 deletions src/yvalve/gds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ static const UCHAR
store3[] = { op_line, op_byte, op_line, op_verb, op_verb, op_verb, 0},
marks[] = { op_byte, op_literal, op_line, op_verb, 0},
erase[] = { op_erase, 0},
erase2[] = { op_erase, op_verb, 0},
local_table[] = { op_word, op_byte, op_literal, op_byte, op_line, 0},
outer_map[] = { op_outer_map, 0 },
in_list[] = { op_verb, op_line, op_word, op_line, op_args, 0},
Expand Down

0 comments on commit 69f8afd

Please sign in to comment.