Skip to content

Commit

Permalink
consumer: add DisableWaitForAck spec field
Browse files Browse the repository at this point in the history
This field allows for toggling behavior re: whether consumer
transactions should wait for ACKs of messages read this transaction.

Issue #243
  • Loading branch information
jgraettinger committed Dec 3, 2019
1 parent 81241ca commit 32b1f75
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 110 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ v0.85.1 (unreleased)
- Reworked almost all documentation into reStructuredText / Sphinx / ReadTheDocs format.
- Make-based build system is refactored to make it easier to integrate and reuse
in external repositories and consumer application projects.
- Add ``DisableWaitForAck`` ShardSpec field, which toggles the consumer transaction
behavior of waiting for ACKs of read pending messages. Most applications won't want
to set this, but it can be helpful to avoid stalls in applications with cyclic
message flows.

v0.84.2
-------
Expand Down
264 changes: 157 additions & 107 deletions consumer/protocol/protocol.pb.go

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions consumer/protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ message ShardSpec {

// Disable processing of the shard.
bool disable = 8 [(gogoproto.moretags) = "yaml:\",omitempty\""];

// Hot standbys is the desired number of consumer processes which should be
// replicating the primary consumer's recovery log. Standbys are allocated in
// a separate availability zone of the current primary, and tail the live log
Expand All @@ -169,6 +170,22 @@ message ShardSpec {
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.moretags) = "yaml:\",omitempty,inline\""];

// Disable waiting for acknowledgements of pending message(s).
//
// If a consumer transaction reads uncommitted messages, it will by default
// remain open (subject to the max duration) awaiting an acknowledgement of
// those messages, in the hope that that acknowledgement will be quickly
// forthcoming and, by remaining open, we can process all messages in this
// transaction. Effectively we're trading a small amount of increased local
// latency for a global reduction in end-to-end latency.
//
// This works well for acyclic message flows, but can introduce unnecessary
// stalls if there are message cycles between shards. In the simplest case,
// a transaction could block awaiting an ACK of a message that it itself
// produced -- an ACK which can't arrive until the transaction closes.
bool disable_wait_for_ack = 11 [
(gogoproto.moretags) = "yaml:\"disable_wait_for_ack,omitempty\""];
}

// ConsumerSpec describes a Consumer process instance and its configuration.
Expand Down
11 changes: 10 additions & 1 deletion consumer/protocol/shard_spec_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (m *ShardSpec) Validate() error {
}
}

// Disable and HotStandbys require no extra validation.
// HotStandbys, Disable, and DisableWaitForAck require no extra validation.

return nil
}
Expand Down Expand Up @@ -181,6 +181,9 @@ func UnionShardSpecs(a, b ShardSpec) ShardSpec {
}
a.LabelSet = pb.UnionLabelSets(a.LabelSet, b.LabelSet, pb.LabelSet{})

if a.DisableWaitForAck == false {
a.DisableWaitForAck = b.DisableWaitForAck
}
return a
}

Expand Down Expand Up @@ -213,6 +216,9 @@ func IntersectShardSpecs(a, b ShardSpec) ShardSpec {
}
a.LabelSet = pb.IntersectLabelSets(a.LabelSet, b.LabelSet, pb.LabelSet{})

if a.DisableWaitForAck != b.DisableWaitForAck {
a.DisableWaitForAck = false
}
return a
}

Expand Down Expand Up @@ -245,6 +251,9 @@ func SubtractShardSpecs(a, b ShardSpec) ShardSpec {
}
a.LabelSet = pb.SubtractLabelSet(a.LabelSet, b.LabelSet, pb.LabelSet{})

if a.DisableWaitForAck == b.DisableWaitForAck {
a.DisableWaitForAck = false
}
return a
}

Expand Down
4 changes: 4 additions & 0 deletions consumer/protocol/shard_spec_extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *SpecSuite) TestSetOperations(c *gc.C) {
{Name: "ccc", Value: "val"},
},
},
DisableWaitForAck: true,
}
var other = ShardSpec{
Sources: []ShardSpec_Source{
Expand All @@ -166,14 +167,17 @@ func (s *SpecSuite) TestSetOperations(c *gc.C) {
{Name: "ccc", Value: "other"},
},
},
DisableWaitForAck: false,
}

c.Check(UnionShardSpecs(ShardSpec{}, model), gc.DeepEquals, model)
c.Check(UnionShardSpecs(model, ShardSpec{}), gc.DeepEquals, model)

other.Disable = true // Disable == true dominates in union operation.
other.DisableWaitForAck = true
c.Check(UnionShardSpecs(other, model), gc.DeepEquals, other)
other.Disable = false
other.DisableWaitForAck = false
c.Check(UnionShardSpecs(model, other), gc.DeepEquals, model)

c.Check(IntersectShardSpecs(model, model), gc.DeepEquals, model)
Expand Down
7 changes: 7 additions & 0 deletions consumer/shardspace/shardspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (s *SetSuite) TestRoundTripHoistAndPushDown(c *gc.C) {
MinTxnDuration: time.Millisecond,
HotStandbys: 1,
LabelSet: pb.MustLabelSet("name-1", "val-1"),
DisableWaitForAck: true,
},
Shards: []Shard{
{
Expand All @@ -45,6 +46,7 @@ func (s *SetSuite) TestRoundTripHoistAndPushDown(c *gc.C) {
Spec: pc.ShardSpec{
Id: "shard-B",
Sources: []pc.ShardSpec_Source{{Journal: "journal/B"}},
Disable: true,
LabelSet: pb.MustLabelSet("name-3", "val-3"),
},
Revision: 456,
Expand Down Expand Up @@ -116,6 +118,7 @@ common:
labels:
- name: name-1
value: val-1
disable_wait_for_ack: true
shards:
- comment: This is a comment
delete: true
Expand All @@ -129,6 +132,7 @@ shards:
- id: shard-B
sources:
- journal: journal/B
disable: true
labels:
- name: name-3
value: val-3
Expand Down Expand Up @@ -168,6 +172,7 @@ func buildFlatFixture() Set {
MinTxnDuration: time.Millisecond,
HotStandbys: 1,
LabelSet: pb.MustLabelSet("name-1", "val-1", "name-2", "val-2"),
DisableWaitForAck: true,
},
Revision: 123,
Delete: &boxedTrue,
Expand All @@ -182,8 +187,10 @@ func buildFlatFixture() Set {
HintBackups: 2,
MaxTxnDuration: time.Second,
MinTxnDuration: time.Millisecond,
Disable: true,
HotStandbys: 1,
LabelSet: pb.MustLabelSet("name-1", "val-1", "name-3", "val-3"),
DisableWaitForAck: true,
},
Revision: 456,
},
Expand Down
6 changes: 4 additions & 2 deletions consumer/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func runTransactions(s *shard, cp pc.Checkpoint, readCh <-chan readMessage, hint
// transaction models a single consumer shard transaction.
type transaction struct {
minDur, maxDur time.Duration // Min/max processing durations. Set to -1 when elapsed.
waitForAck bool // Wait for ACKs of pending messages read this txn?
barrierCh <-chan struct{} // Next barrier of previous transaction to resolve.
readCh <-chan readMessage // Message source. Nil'd upon reaching |maxDur|.
readThrough pb.Offsets // Offsets read through this transaction.
Expand Down Expand Up @@ -92,6 +93,7 @@ func txnInit(s *shard, txn, prev *transaction, readCh <-chan readMessage, timer
timer: timer,
minDur: spec.MinTxnDuration,
maxDur: spec.MaxTxnDuration,
waitForAck: !spec.DisableWaitForAck,
barrierCh: prev.commitBarrier.Done(),
}
for j, o := range prev.readThrough {
Expand Down Expand Up @@ -124,8 +126,8 @@ func txnBlocks(s *shard, txn, prev *transaction) bool {
// Or if the prior transaction hasn't completed.
txn.barrierCh != nil ||
// Or if the maximum batching duration hasn't elapsed, and a sequence
// started this transaction awaits an ACK which will hopefully come.
(txn.maxDur != -1 && s.sequencer.HasPending(prev.readThrough))
// started this transaction awaits an ACK which we want to wait for.
(txn.waitForAck && txn.maxDur != -1 && s.sequencer.HasPending(prev.readThrough))
}

// txnStep steps the transaction one time, and returns true iff it has started to commit.
Expand Down

0 comments on commit 32b1f75

Please sign in to comment.