Skip to content

Commit

Permalink
archive: pin election metadata when importing processes
Browse files Browse the repository at this point in the history
Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed Nov 7, 2023
1 parent a07fe9c commit 5ee13c9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
37 changes: 23 additions & 14 deletions vochain/indexer/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (

const (
marxArchiveFileSize = 1024 * 100 // 100KB
timeoutArchiveRetrieval = 60 * time.Second
archiveFetchInterval = 20 * time.Minute
timeoutArchiveRetrieval = 120 * time.Second
archiveFetchInterval = 60 * time.Minute
archiveFileNameSize = types.ProcessIDsize * 2 // 64 hex chars
)

Expand All @@ -32,15 +32,17 @@ type ArchiveProcess struct {
}

// ImportArchive imports an archive list of processes into the indexer database.
func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error {
// It checks if the process already exists in the database and if not, it creates it.
// Returns those processes that have been added to the database.
func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) ([]*ArchiveProcess, error) {
tx, err := idx.readWriteDB.Begin()
if err != nil {
return err
return nil, err
}
defer tx.Rollback()
height := idx.App.State.CurrentHeight()
queries := indexerdb.New(tx)
addCount := 0
added := []*ArchiveProcess{}
for _, p := range archive {
if idx.App.ChainID() == p.ChainID {
log.Debugw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String())
Expand All @@ -54,7 +56,7 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error {
// Check if election already exists
if _, err := idx.ProcessInfo(p.ProcessInfo.ID); err != nil {
if err != ErrProcessNotFound {
return fmt.Errorf("process info: %w", err)
return nil, fmt.Errorf("process info: %w", err)
}
} else {
continue
Expand Down Expand Up @@ -91,14 +93,11 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error {
VoteCount: int64(p.ProcessInfo.VoteCount),
}
if _, err := queries.CreateProcess(context.TODO(), procParams); err != nil {
return fmt.Errorf("create archive process: %w", err)
return nil, fmt.Errorf("create archive process: %w", err)
}
addCount++
added = append(added, p)
}
if addCount > 0 {
log.Infow("archive new elections imported", "elections", addCount)
}
return tx.Commit()
return added, tx.Commit()
}

// StartArchiveRetrival starts the archive retrieval process. It is a blocking function that runs continuously.
Expand Down Expand Up @@ -129,10 +128,20 @@ func (idx *Indexer) StartArchiveRetrival(storage data.Storage, archiveURL string
}

log.Debugw("archive processes unmarshaled", "processes", len(archive))
if err := idx.ImportArchive(archive); err != nil {
added, err := idx.ImportArchive(archive)
if err != nil {
log.Warnw("cannot import archive", "err", err)
}

if len(added) > 0 {
log.Infow("new archive imported", "count", len(added))
for _, p := range added {
ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval)
if err := storage.Pin(ctx, p.ProcessInfo.Metadata); err != nil {
log.Warnw("cannot pin metadata", "err", err.Error())
}
cancel()
}
}
time.Sleep(archiveFetchInterval)
}
}
2 changes: 1 addition & 1 deletion vochain/indexer/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestImportArchive(t *testing.T) {
qt.Assert(t, err, qt.IsNil)
archive = append(archive, archiveProcess1)

err = idx.ImportArchive(archive)
_, err = idx.ImportArchive(archive)
qt.Assert(t, err, qt.IsNil)

process1, err := idx.ProcessInfo(archiveProcess1.ProcessInfo.ID)
Expand Down

0 comments on commit 5ee13c9

Please sign in to comment.