Skip to content

Commit

Permalink
fixes to load job with valid offsets on the first run
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli committed Nov 21, 2024
1 parent b734482 commit 26acba1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
19 changes: 15 additions & 4 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type PartitionController interface {
HighestCommittedOffset(ctx context.Context) (int64, error)
// Returns the highest available offset in the partition
HighestPartitionOffset(ctx context.Context) (int64, error)
// Returns the earliest available offset in the partition
EarliestPartitionOffset(ctx context.Context) (int64, error)
// Commits the offset to the consumer group.
Commit(context.Context, int64) error
// Process will run load batches at a time and send them to channel,
Expand Down Expand Up @@ -74,17 +76,26 @@ func NewPartitionJobController(
// Returns whether an applicable job exists, the job, and an error
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) {
// Read the most recent committed offset
startOffset, err := l.part.HighestCommittedOffset(ctx)
committedOffset, err := l.part.HighestCommittedOffset(ctx)
if err != nil {
return false, Job{}, err
}

highestOffset, err := l.part.HighestPartitionOffset(ctx)
earliestOffset, err := l.part.EarliestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
}

if highestOffset == startOffset {
startOffset := committedOffset + 1
if startOffset < earliestOffset {
startOffset = earliestOffset
}

highestOffset, err := l.part.HighestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
}
if highestOffset == committedOffset {
return false, Job{}, nil
}

Expand All @@ -93,7 +104,7 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error)
Partition: l.part.Partition(),
Offsets: Offsets{
Min: startOffset,
Max: startOffset + l.stepLen,
Max: min(startOffset+l.stepLen, highestOffset),
},
}

Expand Down
27 changes: 19 additions & 8 deletions pkg/blockbuilder/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,6 @@ func (r *partitionReader) fetchPartitionOffset(ctx context.Context, position int

// Ensure no error occurred.
res := resps[0]

level.Debug(r.logger).Log(
"msg", "fetched partition offset",
"partition", r.partitionID,
"position", position,
"topic", r.topic,
"err", res.Err,
)
if res.Err != nil {
return 0, res.Err
}
Expand All @@ -150,6 +142,15 @@ func (r *partitionReader) fetchPartitionOffset(ctx context.Context, position int
return 0, err
}

level.Debug(r.logger).Log(
"msg", "fetched partition offset",
"partition", r.partitionID,
"position", position,
"topic", r.topic,
"err", res.Err,
"offset", listRes.Topics[0].Partitions[0].Offset,
)

return listRes.Topics[0].Partitions[0].Offset, nil
}

Expand Down Expand Up @@ -249,6 +250,16 @@ func (r *partitionReader) HighestPartitionOffset(ctx context.Context) (int64, er
)
}

func (r *partitionReader) EarliestPartitionOffset(ctx context.Context) (int64, error) {
return withBackoff(
ctx,
r.backoff,
func() (int64, error) {
return r.fetchPartitionOffset(ctx, kafkaStartOffset)
},
)
}

// pollFetches retrieves the next batch of records from Kafka and measures the fetch duration.
// NB(owen-d): originally lifted from `pkg/kafka/partition/reader.go:Reader`
func (r *partitionReader) poll(
Expand Down
6 changes: 5 additions & 1 deletion pkg/blockbuilder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
)
}

if lastOffset <= 0 {
return false, nil
}

if err = i.jobController.part.Commit(ctx, lastOffset); err != nil {
level.Error(logger).Log(
"msg", "failed to commit offset",
Expand Down Expand Up @@ -723,7 +727,7 @@ func newStream(fp model.Fingerprint, ls labels.Labels, cfg Config, metrics *Slim
fp: fp,
ls: ls,

chunkFormat: chunkenc.ChunkFormatV3,
chunkFormat: chunkenc.ChunkFormatV4,
codec: cfg.parsedEncoding,
blockSize: cfg.BlockSize.Val(),
targetChunkSize: cfg.TargetChunkSize.Val(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,8 @@ func (t *Loki) updateConfigForShipperStore() {

case t.Cfg.isTarget(BlockBuilder):
// Blockbuilder handles index creation independently of the shipper.
// TODO: introduce Disabled mode for boltdb shipper and set it here.
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeDisabled

default:
Expand Down

0 comments on commit 26acba1

Please sign in to comment.