diff --git a/cmd/node/main.go b/cmd/node/main.go index 4338d53ec..254e9835b 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -116,6 +116,7 @@ func newConfig() (*config.Config, config.Error) { "enable IPFS group synchronization using the given secret key") conf.Ipfs.ConnectPeers = *flag.StringSlice("ipfsConnectPeers", []string{}, "use custom ipfsconnect peers/bootnodes for accessing the DHT (comma-separated)") + conf.Vochain.Indexer.ArchiveURL = *flag.String("archiveURL", types.ArchiveURL, "enable archive retrival from the given IPNS url") // vochain conf.Vochain.P2PListen = *flag.String("vochainP2PListen", "0.0.0.0:26656", @@ -198,6 +199,11 @@ func newConfig() (*config.Config, config.Error) { // use different datadirs for different chains conf.DataDir = filepath.Join(conf.DataDir, conf.Vochain.Chain) + if err = viper.BindPFlag("archiveURL", flag.Lookup("archiveURL")); err != nil { + log.Fatalf("failed to bind archiveURL flag to viper: %v", err) + } + conf.Vochain.Indexer.ArchiveURL = viper.GetString("archiveURL") + // add viper config path (now we know it) viper.AddConfigPath(conf.DataDir) diff --git a/config/config.go b/config/config.go index 216f61f46..6e5cd39fb 100644 --- a/config/config.go +++ b/config/config.go @@ -144,6 +144,8 @@ type IndexerCfg struct { Enabled bool // Disables live results computation on indexer IgnoreLiveResults bool + // ArchiveURL is the URL where the archive is retrieved from (usually IPNS) + ArchiveURL string } // MetricsCfg initializes the metrics config diff --git a/data/data.go b/data/data.go index 79c4339f6..582f8942e 100644 --- a/data/data.go +++ b/data/data.go @@ -16,6 +16,7 @@ type Storage interface { Publish(ctx context.Context, data []byte) (string, error) PublishReader(ctx context.Context, data io.Reader) (string, error) Retrieve(ctx context.Context, id string, maxSize int64) ([]byte, error) + RetrieveDir(ctx context.Context, id string, maxSize int64) (map[string][]byte, error) Pin(ctx context.Context, path string) error Unpin(ctx context.Context, path string) error ListPins(ctx context.Context) (map[string]string, error) diff --git a/data/datamocktest.go b/data/datamocktest.go index 9fd11f836..2ed3f0e1a 100644 --- a/data/datamocktest.go +++ b/data/datamocktest.go @@ -57,6 +57,11 @@ func (d *DataMockTest) Retrieve(_ context.Context, id string, _ int64) ([]byte, return d.rnd.RandomBytes(256), nil } +func (d *DataMockTest) RetrieveDir(_ context.Context, id string, _ int64) (map[string][]byte, error) { + // TODO: Implement + return nil, nil +} + func (d *DataMockTest) Pin(_ context.Context, path string) error { d.filesMu.Lock() defer d.filesMu.Unlock() diff --git a/data/ipfs/ipfs.go b/data/ipfs/ipfs.go index 8646854c5..4dc72c7aa 100644 --- a/data/ipfs/ipfs.go +++ b/data/ipfs/ipfs.go @@ -260,6 +260,41 @@ func (i *Handler) ListPins(ctx context.Context) (map[string]string, error) { return pinMap, nil } +// RetrieveDir gets an IPFS directory and returns a map of all files and their content. +// It only supports 1 level of directory depth, so subdirectories are ignored. +func (i *Handler) RetrieveDir(ctx context.Context, path string, maxSize int64) (map[string][]byte, error) { + path = strings.Replace(path, "ipfs://", "/ipfs/", 1) + + // first resolve the path + cpath, err := i.CoreAPI.ResolvePath(ctx, corepath.New(path)) + if err != nil { + return nil, fmt.Errorf("could not resolve path %s", path) + } + // then get the file + f, err := i.CoreAPI.Unixfs().Get(ctx, cpath) + if err != nil { + return nil, fmt.Errorf("could not retrieve unixfs file: %w", err) + } + + dirMap := make(map[string][]byte) + if dir := files.ToDir(f); dir != nil { + if err := files.Walk(dir, func(path string, node files.Node) error { + if file := files.ToFile(node); file != nil { + content, err := fetchFileContent(file) + if err != nil { + log.Warnw("could not retrieve file from directory", "path", path, "error", err) + return nil + } + dirMap[path] = content + } + return nil + }); err != nil { + return nil, err + } + } + return dirMap, nil +} + // Retrieve gets an IPFS file (either from the p2p network or from the local cache). // If maxSize is 0, it is set to the hardcoded maximum of MaxFileSizeBytes. func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]byte, error) { @@ -277,32 +312,13 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b if err != nil { return nil, fmt.Errorf("could not resolve path %s", path) } - // then get the file f, err := i.CoreAPI.Unixfs().Get(ctx, cpath) if err != nil { return nil, fmt.Errorf("could not retrieve unixfs file: %w", err) } - file := files.ToFile(f) - if file == nil { - return nil, fmt.Errorf("object is not a file") - } - defer file.Close() - fsize, err := file.Size() - if err != nil { - return nil, err - } - - if maxSize == 0 { - maxSize = MaxFileSizeBytes - } - - if fsize > maxSize { - return nil, fmt.Errorf("file too big: %d", fsize) - } - - content, err := io.ReadAll(file) + content, err := fetchFileContent(f) if err != nil { return nil, err } @@ -322,10 +338,28 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b // Save file to cache for future attempts i.retrieveCache.Add(path, content) - log.Infow("retrieved file", "path", path, "size", fsize) + log.Infow("retrieved file", "path", path, "size", len(content)) return content, nil } +func fetchFileContent(node files.Node) ([]byte, error) { + file := files.ToFile(node) + if file == nil { + return nil, fmt.Errorf("object is not a file") + } + defer file.Close() + + fsize, err := file.Size() + if err != nil { + return nil, err + } + + if fsize > MaxFileSizeBytes { + return nil, fmt.Errorf("file too big: %d", fsize) + } + return io.ReadAll(io.LimitReader(file, MaxFileSizeBytes)) +} + // PublishIPNSpath creates or updates an IPNS record with the content of a // filesystem path (a single file or a directory). // diff --git a/dockerfiles/vocdoninode/env.example b/dockerfiles/vocdoninode/env.example index 14b92cdfe..ca94734f3 100644 --- a/dockerfiles/vocdoninode/env.example +++ b/dockerfiles/vocdoninode/env.example @@ -12,6 +12,7 @@ VOCDONI_ENABLEAPI=True VOCDONI_ENABLERPC=False VOCDONI_LISTENHOST=0.0.0.0 VOCDONI_LISTENPORT=9090 +VOCDONI_ARCHIVEURL=/ipns/k2k4r8mdn544n7f8nprwqeo27jr1v1unsu74th57s1j8mumjck7y7cbz #VOCDONI_TLS_DIRCERT= #VOCDONI_TLS_DOMAIN= #VOCDONI_ADMINTOKEN= diff --git a/service/indexer.go b/service/indexer.go index 6ce4ccaee..b35657c54 100644 --- a/service/indexer.go +++ b/service/indexer.go @@ -21,5 +21,11 @@ func (vs *VocdoniService) VochainIndexer() error { } // launch the indexer after sync routine (executed when the blockchain is ready) go vs.Indexer.AfterSyncBootstrap(false) + + if vs.Config.Indexer.ArchiveURL != "" { + log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL) + go vs.Indexer.StartArchiveRetrival(vs.Storage, vs.Config.Indexer.ArchiveURL) + } + return nil } diff --git a/types/consts.go b/types/consts.go index b52ab69d7..f12d64aae 100644 --- a/types/consts.go +++ b/types/consts.go @@ -110,6 +110,9 @@ const ( // on a specific block IndexerProcessEndingPrefix = byte(0x25) + // ArchiveURL is the default URL where the archive is retrieved from + ArchiveURL = "/ipns/k2k4r8mdn544n7f8nprwqeo27jr1v1unsu74th57s1j8mumjck7y7cbz" + // Vochain // PetitionSign contains the string that needs to match with the received vote type diff --git a/vochain/indexer/archive.go b/vochain/indexer/archive.go index 97501d40c..fc6701715 100644 --- a/vochain/indexer/archive.go +++ b/vochain/indexer/archive.go @@ -2,16 +2,26 @@ package indexer import ( "context" + "encoding/json" "fmt" "time" + "go.vocdoni.io/dvote/data" "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/types" indexerdb "go.vocdoni.io/dvote/vochain/indexer/db" "go.vocdoni.io/dvote/vochain/indexer/indexertypes" "go.vocdoni.io/dvote/vochain/results" "go.vocdoni.io/proto/build/go/models" ) +const ( + marxArchiveFileSize = 1024 * 100 // 100KB + timeoutArchiveRetrieval = 60 * time.Second + archiveFetchInterval = 20 * time.Minute + archiveFileNameSize = types.ProcessIDsize * 2 // 64 hex chars +) + // ArchiveProcess is the struct used to store the process data in the archive. type ArchiveProcess struct { ChainID string `json:"chainId,omitempty"` @@ -25,16 +35,22 @@ type ArchiveProcess struct { func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error { tx, err := idx.readWriteDB.Begin() if err != nil { - panic(err) // shouldn't happen, use an error return if it ever does + return err } - queries := idx.blockQueries.WithTx(tx) + defer tx.Rollback() height := idx.App.State.CurrentHeight() - + queries := indexerdb.New(tx) + addCount := 0 for _, p := range archive { if idx.App.ChainID() == p.ChainID { - log.Warnw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String()) + log.Debugw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String()) + continue + } + if p.ProcessInfo == nil { + log.Debugw("skipping import of archive process with nil process info") continue } + // Check if election already exists if _, err := idx.ProcessInfo(p.ProcessInfo.ID); err != nil { if err != ErrProcessNotFound { @@ -72,11 +88,51 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error { SourceNetworkID: int64(models.SourceNetworkId_value[p.ProcessInfo.SourceNetworkId]), Metadata: p.ProcessInfo.Metadata, ResultsVotes: indexertypes.EncodeJSON(p.Results.Votes), - // TODO: Add a boolean field to the process table to indicate if the process is archived + VoteCount: int64(p.ProcessInfo.VoteCount), } if _, err := queries.CreateProcess(context.TODO(), procParams); err != nil { return fmt.Errorf("create archive process: %w", err) } + addCount++ + } + if addCount > 0 { + log.Infow("archive new elections imported", "elections", addCount) } return tx.Commit() } + +// StartArchiveRetrival starts the archive retrieval process. It is a blocking function that runs continuously. +// Retrieves the archive directory from the storage and imports the processes into the indexer database. +func (idx *Indexer) StartArchiveRetrival(storage data.Storage, archiveURL string) { + for { + ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval) + dirMap, err := storage.RetrieveDir(ctx, archiveURL, marxArchiveFileSize) + cancel() + if err != nil { + log.Warnw("cannot retrieve archive directory", "url", archiveURL, "err", err) + continue + } + archive := []*ArchiveProcess{} + for name, data := range dirMap { + if len(data) == 0 { + continue + } + if len(name) != archiveFileNameSize { + continue + } + var p ArchiveProcess + if err := json.Unmarshal(data, &p); err != nil { + log.Warnw("cannot unmarshal archive process", "name", name, "err", err) + continue + } + archive = append(archive, &p) + } + + log.Debugw("archive processes unmarshaled", "processes", len(archive)) + if err := idx.ImportArchive(archive); err != nil { + log.Warnw("cannot import archive", "err", err) + } + + time.Sleep(archiveFetchInterval) + } +} diff --git a/vochain/indexer/archive_test.go b/vochain/indexer/archive_test.go index f8ae93a51..74062b5bd 100644 --- a/vochain/indexer/archive_test.go +++ b/vochain/indexer/archive_test.go @@ -2,7 +2,6 @@ package indexer import ( "encoding/json" - "fmt" "testing" qt "github.com/frankban/quicktest" @@ -15,20 +14,20 @@ func TestImportArchive(t *testing.T) { archive := []*ArchiveProcess{} archiveProcess1 := &ArchiveProcess{} - archiveProcess2 := &ArchiveProcess{} err := json.Unmarshal([]byte(testArchiveProcess1), archiveProcess1) qt.Assert(t, err, qt.IsNil) - err = json.Unmarshal([]byte(testArchiveProcess2), archiveProcess2) - qt.Assert(t, err, qt.IsNil) archive = append(archive, archiveProcess1) - archive = append(archive, archiveProcess2) err = idx.ImportArchive(archive) qt.Assert(t, err, qt.IsNil) process1, err := idx.ProcessInfo(archiveProcess1.ProcessInfo.ID) qt.Assert(t, err, qt.IsNil) - fmt.Printf("process1: %+v\n", process1) + qt.Assert(t, process1.ID.String(), qt.Equals, archiveProcess1.ProcessInfo.ID.String()) + qt.Assert(t, process1.Results().Votes[0][0].MathBigInt().Int64(), qt.Equals, int64(342)) + qt.Assert(t, process1.Results().Votes[0][1].MathBigInt().Int64(), qt.Equals, int64(365)) + qt.Assert(t, process1.Results().Votes[0][2].MathBigInt().Int64(), qt.Equals, int64(21)) + // TODO: qt.Assert(t, process1.Results().Weight.MathBigInt().Int64(), qt.Equals, int64(342+365+21)) } var testArchiveProcess1 = ` @@ -40,6 +39,7 @@ var testArchiveProcess1 = ` "startBlock": 236631, "endBlock": 240060, "blockCount": 3515, + "voteCount": 728, "censusRoot": "09397c5b65efc0e95b338b610ef38394312e94418d4a828f8820008378758451", "censusURI": "ipfs://bafybeifduki6huacmtufg5avyo6g2pxxefwohsks3fp63w3dd733ixzecu", "metadata": "ipfs://bafybeiel2cvsrg7d4frqxziberrpnjj4yxzs5peyxkgsvamylpxxdn4m7q", @@ -88,58 +88,3 @@ var testArchiveProcess1 = ` "startDate": "2023-10-20T07:59:49.977338317Z" } ` - -var testArchiveProcess2 = ` -{ - "chainId": "vocdoni-stage-8", - "process": { - "processId": "c5d2460186f7d8fcfaa76192aa69cceddaec554b1d82b0166dc9020000000031", - "entityId": "d8fcfaa76192aa69cceddaec554b1d82b0166dc9", - "startBlock": 287823, - "endBlock": 287858, - "blockCount": 35, - "censusRoot": "8b840a7ddbadfbc98145bf86cf3e8f53f3670a703b1b92f9d0ee4b065eae970b", - "censusURI": "ipfs://bafybeigff4k3pqxzokvixov6s34yaw6dje474ghnehz6duaxryyf2hd5ke", - "metadata": "ipfs://bafybeicyi4o3hmp27ejkakzqrdk37e2n3julvmwg3txwyyqfub3pwz6ora", - "censusOrigin": 2, - "status": 5, - "namespace": 0, - "envelopeType": {}, - "processMode": { - "autoStart": true, - "interruptible": true - }, - "voteOptions": { - "maxCount": 1, - "maxValue": 2, - "costExponent": 10000 - }, - "questionIndex": 0, - "creationTime": "2023-10-26T13:24:25Z", - "haveResults": true, - "finalResults": true, - "sourceBlockHeight": 0, - "sourceNetworkId": "UNKNOWN", - "maxCensusSize": 2 - }, - "results": { - "processId": "c5d2460186f7d8fcfaa76192aa69cceddaec554b1d82b0166dc9020000000031", - "votes": [ - [ - "0", - "1000000000000000000000", - "0" - ] - ], - "weight": null, - "envelopeType": {}, - "voteOptions": { - "maxCount": 1, - "maxValue": 2, - "costExponent": 10000 - }, - "blockHeight": 0 - }, - "startDate": "2023-10-26T13:24:56.609443588Z" - } -` diff --git a/vochain/processarchive/processarchive.go b/vochain/processarchive/processarchive.go index 0c65baafe..ee43724cd 100644 --- a/vochain/processarchive/processarchive.go +++ b/vochain/processarchive/processarchive.go @@ -163,8 +163,7 @@ func BuildIndex(datadir string) (*Index, error) { // The key parameter must be either a valid IPFS base64 encoded private key // or empty (a new key will be generated). // If ipfs is nil, only JSON archive storage will be performed. -func NewProcessArchive(s *indexer.Indexer, ipfs *ipfs.Handler, - datadir, key string) (*ProcessArchive, error) { +func NewProcessArchive(s *indexer.Indexer, ipfs *ipfs.Handler, datadir, key string) (*ProcessArchive, error) { js, err := NewJsonStorage(datadir) if err != nil { return nil, fmt.Errorf("could not create process archive: %w", err)