Skip to content

Commit

Permalink
vochain/indexer: keep vote_count as a process column
Browse files Browse the repository at this point in the history
We used to do this a while back, and replaced it with a COUNT plus JOIN
with sqlite since the data was all in sqlite.

However, we're now adding processes where the votes are not indexed,
and we want to specify what the vote count is as a static integer.

Make indexertypes.Process.VoteCount visible in JSON as well now,
since it's a proper column and no longer derived data via COUNT.

Updates #1170.
  • Loading branch information
mvdan authored and p4u committed Oct 31, 2023
1 parent 125bc51 commit 20bbe9c
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 54 deletions.
10 changes: 10 additions & 0 deletions vochain/indexer/db/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions vochain/indexer/db/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 25 additions & 40 deletions vochain/indexer/db/processes.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 30 additions & 4 deletions vochain/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type Indexer struct {
blockQueries *indexerdb.Queries
// blockUpdateProcs is the list of process IDs that require sync with the state database.
// The key is a types.ProcessID as a string, so that it can be used as a map key.
blockUpdateProcs map[string]bool
blockUpdateProcs map[string]bool
blockUpdateProcVoteCounts map[string]bool

// list of live processes (those on which the votes will be computed on arrival)
// TODO: we could query the procs table, perhaps memoizing to avoid querying the same over and over again?
Expand All @@ -100,8 +101,13 @@ func NewIndexer(dataDir string, app *vochain.BaseApplication, countLiveResults b
App: app,
ignoreLiveResults: !countLiveResults,

votePool: make(map[string]map[string]*state.Vote),
blockUpdateProcs: make(map[string]bool),
// TODO(mvdan): these three maps are all keyed by process ID,
// and each of them needs to query existing data from the DB.
// Since the map keys very often overlap, consider joining the maps
// so that we can also reuse queries to the DB.
votePool: make(map[string]map[string]*state.Vote),
blockUpdateProcs: make(map[string]bool),
blockUpdateProcVoteCounts: make(map[string]bool),
}
log.Infow("indexer initialization", "dataDir", dataDir, "liveResults", countLiveResults)

Expand Down Expand Up @@ -408,6 +414,24 @@ func (idx *Indexer) Commit(height uint32) error {
}
clear(idx.votePool)

// Note that we re-compute each process vote count from the votes table,
// since simply incrementing the vote count would break with vote overwrites.
for pidStr := range idx.blockUpdateProcVoteCounts {
pid := []byte(pidStr)
voteCount, err := queries.CountVotesByProcessID(ctx, pid)
if err != nil {
log.Errorw(err, "could not get vote count")
continue
}
if _, err := queries.SetProcessVoteCount(ctx, indexerdb.SetProcessVoteCountParams{
ID: pid,
VoteCount: voteCount,
}); err != nil {
log.Errorw(err, "could not set vote count")
}
}
clear(idx.blockUpdateProcVoteCounts)

if err := idx.blockTx.Commit(); err != nil {
log.Errorw(err, "could not commit tx")
}
Expand All @@ -428,6 +452,7 @@ func (idx *Indexer) Rollback() {
defer idx.blockMu.Unlock()
clear(idx.votePool)
clear(idx.blockUpdateProcs)
clear(idx.blockUpdateProcVoteCounts)
if idx.blockTx != nil {
if err := idx.blockTx.Rollback(); err != nil {
log.Errorw(err, "could not rollback tx")
Expand All @@ -452,9 +477,9 @@ func (idx *Indexer) OnProcess(pid, _ []byte, _, _ string, _ int32) {
// voterID is the identifier of the voter, the most common case is an ethereum address
// but can be any kind of id expressed as bytes.
func (idx *Indexer) OnVote(vote *state.Vote, txIndex int32) {
pid := string(vote.ProcessID)
if !idx.ignoreLiveResults && idx.isProcessLiveResults(vote.ProcessID) {
// Since []byte in Go isn't comparable, but we can convert any bytes to string.
pid := string(vote.ProcessID)
nullifier := string(vote.Nullifier)
if idx.votePool[pid] == nil {
idx.votePool[pid] = make(map[string]*state.Vote)
Expand All @@ -473,6 +498,7 @@ func (idx *Indexer) OnVote(vote *state.Vote, txIndex int32) {
if err := idx.addVoteIndex(context.TODO(), queries, vote, txIndex); err != nil {
log.Errorw(err, "could not index vote")
}
idx.blockUpdateProcVoteCounts[pid] = true
}

// OnCancel indexer stores the processID and entityID
Expand Down
5 changes: 2 additions & 3 deletions vochain/indexer/indexertypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Process struct {
StartBlock uint32 `json:"startBlock"`
EndBlock uint32 `json:"endBlock"`
BlockCount uint32 `json:"blockCount"`
VoteCount uint64 `json:"voteCount"`
CensusRoot types.HexBytes `json:"censusRoot"`
CensusURI string `json:"censusURI"`
Metadata string `json:"metadata"`
Expand All @@ -42,8 +43,6 @@ type Process struct {
PrivateKeys json.RawMessage `json:"-"` // json array
PublicKeys json.RawMessage `json:"-"` // json array

VoteCount uint64 `json:"-"` // via LEFT JOIN

ResultsVotes [][]*types.BigInt `json:"-"`
ResultsWeight *types.BigInt `json:"-"`
ResultsBlockHeight uint32 `json:"-"`
Expand All @@ -60,7 +59,7 @@ func (p *Process) Results() *results.Results {
}
}

func ProcessFromDB(dbproc *indexerdb.GetProcessRow) *Process {
func ProcessFromDB(dbproc *indexerdb.Process) *Process {
proc := &Process{
ID: dbproc.ID,
EntityID: nonEmptyBytes(dbproc.EntityID),
Expand Down
1 change: 1 addition & 0 deletions vochain/indexer/migrations/0001_create_table_processes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CREATE TABLE processes (
start_block INTEGER NOT NULL,
end_block INTEGER NOT NULL,
block_count INTEGER NOT NULL,
vote_count INTEGER NOT NULL,

have_results BOOLEAN NOT NULL,
final_results BOOLEAN NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions vochain/indexer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (idx *Indexer) newEmptyProcess(pid []byte) error {
StartBlock: int64(p.StartBlock),
EndBlock: int64(p.BlockCount + p.StartBlock),
BlockCount: int64(p.BlockCount),
VoteCount: 0, // an empty process has no votes yet
HaveResults: !p.EnvelopeType.EncryptedVotes, // like isOpenProcess, but on the state type
CensusRoot: nonNullBytes(p.CensusRoot),
MaxCensusSize: int64(p.GetMaxCensusSize()),
Expand Down
17 changes: 10 additions & 7 deletions vochain/indexer/queries/processes.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- name: CreateProcess :execresult
INSERT INTO processes (
id, entity_id, start_block, end_block, block_count,
have_results, final_results, census_root,
vote_count, have_results, final_results, census_root,
max_census_size, census_uri, metadata,
census_origin, status, namespace,
envelope, mode, vote_opts,
Expand All @@ -12,7 +12,7 @@ INSERT INTO processes (
results_votes, results_weight, results_block_height
) VALUES (
?, ?, ?, ?, ?,
?, ?, ?,
?, ?, ?, ?,
?, ?, ?,
?, ?, ?,
?, ?, ?,
Expand All @@ -24,11 +24,9 @@ INSERT INTO processes (
);

-- name: GetProcess :one
SELECT p.*, COUNT(v.nullifier) AS vote_count FROM processes AS p
LEFT JOIN votes AS v
ON p.id = v.process_id
WHERE p.id = ?
GROUP BY p.id
SELECT * FROM processes
WHERE id = ?
GROUP BY id
LIMIT 1;

-- name: SearchProcesses :many
Expand Down Expand Up @@ -80,6 +78,11 @@ UPDATE processes
SET have_results = FALSE, final_results = TRUE
WHERE id = sqlc.arg(id);

-- name: SetProcessVoteCount :execresult
UPDATE processes
SET vote_count = sqlc.arg(vote_count)
WHERE id = sqlc.arg(id);

-- name: GetProcessCount :one
SELECT COUNT(*) FROM processes;

Expand Down

0 comments on commit 20bbe9c

Please sign in to comment.