Skip to content

Commit

Permalink
archive: use data downloader queue
Browse files Browse the repository at this point in the history
  • Loading branch information
p4u committed Nov 24, 2023
1 parent 3ecc76c commit f4ad7f2
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
12 changes: 6 additions & 6 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,12 @@ func main() {
if err = srv.Vochain(); err != nil {
log.Fatal(err)
}
// create the offchain data downloader service
if conf.Vochain.OffChainDataDownloader {
if err := srv.OffChainDataHandler(); err != nil {
log.Fatal(err)
}
}
// create the indexer service
if conf.Vochain.Indexer.Enabled {
if err := srv.VochainIndexer(); err != nil {
Expand All @@ -560,12 +566,6 @@ func main() {
log.Fatal(err)
}
}
// create the offchain data downloader service
if conf.Vochain.OffChainDataDownloader {
if err := srv.OffChainDataHandler(); err != nil {
log.Fatal(err)
}
}
// start the service and block until finish fast sync
if err := srv.Start(); err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (vs *VocdoniService) VochainIndexer() error {

if vs.Config.Indexer.ArchiveURL != "" {
log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL)
go vs.Indexer.StartArchiveRetrieval(vs.Storage, vs.Config.Indexer.ArchiveURL)
go vs.Indexer.StartArchiveRetrieval(vs.DataDownloader, vs.Config.Indexer.ArchiveURL)
}

return nil
Expand Down
19 changes: 11 additions & 8 deletions vochain/indexer/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"time"

"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/data/downloader"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
indexerdb "go.vocdoni.io/dvote/vochain/indexer/db"
Expand All @@ -16,7 +16,7 @@ import (
)

const (
marxArchiveFileSize = 1024 * 100 // 100KB
maxArchiveFileSize = 1024 * 100 // 100KB
timeoutArchiveRetrieval = 120 * time.Second
archiveFetchInterval = 60 * time.Minute
archiveFileNameSize = types.ProcessIDsize * 2 // 64 hex chars
Expand All @@ -40,6 +40,7 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) ([]*ArchiveProcess,
return nil, err
}
defer tx.Rollback()

queries := indexerdb.New(tx)
added := []*ArchiveProcess{}
for _, p := range archive {
Expand Down Expand Up @@ -120,10 +121,14 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) ([]*ArchiveProcess,

// StartArchiveRetrieval starts the archive retrieval process. It is a blocking function that runs continuously.
// Retrieves the archive directory from the storage and imports the processes into the indexer database.
func (idx *Indexer) StartArchiveRetrieval(storage data.Storage, archiveURL string) {
func (idx *Indexer) StartArchiveRetrieval(storage *downloader.Downloader, archiveURL string) {
if storage == nil || archiveURL == "" {
log.Warnw("cannot start archive retrieval", "downloader", storage != nil, "url", archiveURL)
return
}
for {
ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval)
dirMap, err := storage.RetrieveDir(ctx, archiveURL, marxArchiveFileSize)
dirMap, err := storage.RemoteStorage.RetrieveDir(ctx, archiveURL, maxArchiveFileSize)
cancel()
if err != nil {
log.Warnw("cannot retrieve archive directory", "url", archiveURL, "err", err)
Expand Down Expand Up @@ -153,11 +158,9 @@ func (idx *Indexer) StartArchiveRetrieval(storage data.Storage, archiveURL strin
if len(added) > 0 {
log.Infow("new archive imported", "count", len(added))
for _, p := range added {
ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval)
if err := storage.Pin(ctx, p.ProcessInfo.Metadata); err != nil {
log.Warnw("cannot pin metadata", "err", err.Error())
if p.ProcessInfo.Metadata != "" {
storage.AddToQueue(p.ProcessInfo.Metadata, nil, true)
}
cancel()
}
}
time.Sleep(archiveFetchInterval)
Expand Down

0 comments on commit f4ad7f2

Please sign in to comment.