diff --git a/vochain/indexer/archive.go b/vochain/indexer/archive.go index fc6701715..1582149a6 100644 --- a/vochain/indexer/archive.go +++ b/vochain/indexer/archive.go @@ -17,8 +17,8 @@ import ( const ( marxArchiveFileSize = 1024 * 100 // 100KB - timeoutArchiveRetrieval = 60 * time.Second - archiveFetchInterval = 20 * time.Minute + timeoutArchiveRetrieval = 120 * time.Second + archiveFetchInterval = 60 * time.Minute archiveFileNameSize = types.ProcessIDsize * 2 // 64 hex chars ) @@ -32,15 +32,17 @@ type ArchiveProcess struct { } // ImportArchive imports an archive list of processes into the indexer database. -func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error { +// It checks if the process already exists in the database and if not, it creates it. +// Returns those processes that have been added to the database. +func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) ([]*ArchiveProcess, error) { tx, err := idx.readWriteDB.Begin() if err != nil { - return err + return nil, err } defer tx.Rollback() height := idx.App.State.CurrentHeight() queries := indexerdb.New(tx) - addCount := 0 + added := []*ArchiveProcess{} for _, p := range archive { if idx.App.ChainID() == p.ChainID { log.Debugw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String()) @@ -54,7 +56,7 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error { // Check if election already exists if _, err := idx.ProcessInfo(p.ProcessInfo.ID); err != nil { if err != ErrProcessNotFound { - return fmt.Errorf("process info: %w", err) + return nil, fmt.Errorf("process info: %w", err) } } else { continue @@ -91,14 +93,11 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error { VoteCount: int64(p.ProcessInfo.VoteCount), } if _, err := queries.CreateProcess(context.TODO(), procParams); err != nil { - return fmt.Errorf("create archive process: %w", err) + return nil, fmt.Errorf("create archive process: %w", err) } - addCount++ + added = append(added, p) } - if addCount > 0 { - log.Infow("archive new elections imported", "elections", addCount) - } - return tx.Commit() + return added, tx.Commit() } // StartArchiveRetrival starts the archive retrieval process. It is a blocking function that runs continuously. @@ -129,10 +128,20 @@ func (idx *Indexer) StartArchiveRetrival(storage data.Storage, archiveURL string } log.Debugw("archive processes unmarshaled", "processes", len(archive)) - if err := idx.ImportArchive(archive); err != nil { + added, err := idx.ImportArchive(archive) + if err != nil { log.Warnw("cannot import archive", "err", err) } - + if len(added) > 0 { + log.Infow("new archive imported", "count", len(added)) + for _, p := range added { + ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval) + if err := storage.Pin(ctx, p.ProcessInfo.Metadata); err != nil { + log.Warnw("cannot pin metadata", "err", err.Error()) + } + cancel() + } + } time.Sleep(archiveFetchInterval) } } diff --git a/vochain/indexer/archive_test.go b/vochain/indexer/archive_test.go index 74062b5bd..495480ac7 100644 --- a/vochain/indexer/archive_test.go +++ b/vochain/indexer/archive_test.go @@ -18,7 +18,7 @@ func TestImportArchive(t *testing.T) { qt.Assert(t, err, qt.IsNil) archive = append(archive, archiveProcess1) - err = idx.ImportArchive(archive) + _, err = idx.ImportArchive(archive) qt.Assert(t, err, qt.IsNil) process1, err := idx.ProcessInfo(archiveProcess1.ProcessInfo.ID)