Skip to content

Commit

Permalink
If EOSE is not given, they should not be deleted.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattn committed Nov 24, 2023
1 parent 96299fd commit 23561ee
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
events := make(chan IncomingEvent)
seenAlready := xsync.NewMapOf[Timestamp]()
ticker := time.NewTicker(seenAlreadyDropTick)
eose := false

pending := xsync.NewCounter()
initial := len(urls)
Expand All @@ -98,17 +99,21 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
}

select {
case <-sub.EndOfStoredEvents:
eose = true
case <-ticker.C:
del := map[string]struct{}{}
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
seenAlready.Range(func(key string, value Timestamp) bool {
if value < old {
del[evt.ID] = struct{}{}
if eose {
del := map[string]struct{}{}
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
seenAlready.Range(func(key string, value Timestamp) bool {
if value < old {
del[evt.ID] = struct{}{}
}
return true
})
for k := range del {
seenAlready.Delete(k)
}
return true
})
for k := range del {
seenAlready.Delete(k)
}
case events <- IncomingEvent{Event: evt, Relay: relay}:
case <-ctx.Done():
Expand Down

0 comments on commit 23561ee

Please sign in to comment.