Skip to content

Commit bbc6a27

Browse files
committed
feat: add tx post-processing
1 parent c5b6dfd commit bbc6a27

File tree

4 files changed

+300
-0
lines changed

4 files changed

+300
-0
lines changed

database/transaction.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package database
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/rs/zerolog/log"
7+
)
8+
9+
// UpdateTransactionLogs updates the logs field of a transaction
10+
func (db *Db) UpdateTransactionLogs(hash string, logs []byte) error {
11+
stmt := `UPDATE transaction SET logs = $1 WHERE hash = $2`
12+
13+
log.Debug().
14+
Str("txhash", hash).
15+
Int("logs_bytes", len(logs)).
16+
Msg("DATABASE: Updating transaction logs")
17+
18+
result, err := db.SQL.Exec(stmt, logs, hash)
19+
if err != nil {
20+
log.Error().
21+
Err(err).
22+
Str("txhash", hash).
23+
Msg("DATABASE: Error while updating transaction logs")
24+
return fmt.Errorf("error while updating transaction logs: %s", err)
25+
}
26+
27+
rowsAffected, err := result.RowsAffected()
28+
if err != nil {
29+
log.Warn().
30+
Err(err).
31+
Str("txhash", hash).
32+
Msg("DATABASE: Could not get rows affected count")
33+
} else {
34+
log.Debug().
35+
Str("txhash", hash).
36+
Int64("rows_affected", rowsAffected).
37+
Msg("DATABASE: Transaction logs update complete")
38+
}
39+
40+
return nil
41+
}

modules/registrar.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/forbole/callisto/v4/modules/modules"
3636
"github.com/forbole/callisto/v4/modules/pricefeed"
3737
"github.com/forbole/callisto/v4/modules/staking"
38+
"github.com/forbole/callisto/v4/modules/tx_worker"
3839
"github.com/forbole/callisto/v4/modules/upgrade"
3940
)
4041

@@ -91,10 +92,12 @@ func (r *Registrar) BuildModules(ctx registrar.Context) jmodules.Modules {
9192
govModule := gov.NewModule(sources.GovSource, distrModule, mintModule, slashingModule, stakingModule, r.cdc, db)
9293
wasmModule := wasm.NewModule(sources.WasmSource, r.cdc, db)
9394
upgradeModule := upgrade.NewModule(db, stakingModule)
95+
txWorkerModule := tx_worker.NewModule(db)
9496

9597
externalModule := external.NewModule(ctx.JunoConfig, r.cdc, r.cdc.InterfaceRegistry())
9698

9799
return []jmodules.Module{
100+
txWorkerModule,
98101
messages.NewModule(r.parser, ctx.Database),
99102
telemetry.NewModule(ctx.JunoConfig),
100103
pruning.NewModule(ctx.JunoConfig, db, ctx.Logger),

modules/tx_worker/module.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package tx_worker
2+
3+
import (
4+
"github.com/forbole/juno/v6/modules"
5+
juno "github.com/forbole/juno/v6/types"
6+
"github.com/rs/zerolog/log"
7+
8+
"github.com/forbole/callisto/v4/database"
9+
"github.com/forbole/callisto/v4/utils/events"
10+
)
11+
12+
var (
13+
_ modules.Module = &Module{}
14+
_ modules.TransactionModule = &Module{}
15+
)
16+
17+
// Module represents the tx_worker module
18+
type Module struct {
19+
db *database.Db
20+
}
21+
22+
// NewModule returns a new Module instance
23+
func NewModule(db *database.Db) *Module {
24+
log.Info().Msg("TX_WORKER: Initializing transaction log enrichment module")
25+
return &Module{
26+
db: db,
27+
}
28+
}
29+
30+
// Name implements modules.Module
31+
func (m *Module) Name() string {
32+
return "tx_worker"
33+
}
34+
35+
// HandleTx implements modules.TransactionModule
36+
// This is called before any other module receives the transaction
37+
func (m *Module) HandleTx(tx *juno.Transaction) error {
38+
log.Info().
39+
Str("txhash", tx.TxHash).
40+
Uint64("height", tx.Height).
41+
Int("events_count", len(tx.Events)).
42+
Int("logs_count", len(tx.Logs)).
43+
Msg("TX_WORKER: Processing transaction")
44+
45+
// Log transaction state before processing
46+
if len(tx.Logs) > 0 {
47+
log.Debug().
48+
Str("txhash", tx.TxHash).
49+
Int("logs_count", len(tx.Logs)).
50+
Msg("TX_WORKER: Transaction already has logs, skipping")
51+
return nil
52+
}
53+
54+
if len(tx.Events) == 0 {
55+
log.Debug().
56+
Str("txhash", tx.TxHash).
57+
Msg("TX_WORKER: Transaction has no events, skipping")
58+
return nil
59+
}
60+
61+
// Use the utility function to update transaction logs if needed
62+
updated := events.UpdateTransactionLogs(tx, m.db)
63+
64+
if updated {
65+
log.Info().
66+
Str("txhash", tx.TxHash).
67+
Int("new_logs_count", len(tx.Logs)).
68+
Msg("TX_WORKER: Successfully enriched transaction logs")
69+
} else {
70+
log.Debug().
71+
Str("txhash", tx.TxHash).
72+
Msg("TX_WORKER: Transaction logs not updated")
73+
}
74+
75+
return nil
76+
}

utils/events/transaction.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package events
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
abci "github.com/cometbft/cometbft/abci/types"
8+
sdk "github.com/cosmos/cosmos-sdk/types"
9+
juno "github.com/forbole/juno/v6/types"
10+
"github.com/rs/zerolog/log"
11+
12+
"github.com/forbole/callisto/v4/database"
13+
)
14+
15+
// TransactionLogUpdater represents an interface for updating transaction logs
16+
type TransactionLogUpdater interface {
17+
UpdateTransactionLogs(hash string, logs []byte) error
18+
}
19+
20+
// ConvertEventsToLogs converts a slice of ABCI events to a slice of SDK ABCIMessageLog
21+
// It groups events by message index when possible
22+
func ConvertEventsToLogs(events []abci.Event) []sdk.ABCIMessageLog {
23+
var logs []sdk.ABCIMessageLog
24+
25+
log.Debug().
26+
Int("events_count", len(events)).
27+
Msg("EVENTS: Starting conversion of events to logs")
28+
29+
// Create a log entry for each message index by grouping events with the same msg_index
30+
msgIndexMap := make(map[uint32][]abci.Event)
31+
32+
// Group events by message index, default to 0 if not specified
33+
for i, event := range events {
34+
msgIndex := uint32(0) // Default to first message
35+
36+
// Try to find msg_index attribute
37+
for _, attr := range event.Attributes {
38+
if attr.Key == "msg_index" {
39+
// Try to parse as integer
40+
var idx int
41+
_, err := fmt.Sscanf(attr.Value, "%d", &idx)
42+
if err == nil && idx >= 0 {
43+
msgIndex = uint32(idx)
44+
log.Debug().
45+
Int("event_index", i).
46+
Str("event_type", event.Type).
47+
Uint32("msg_index", msgIndex).
48+
Msg("EVENTS: Found explicit msg_index in event")
49+
break
50+
}
51+
}
52+
}
53+
54+
// Add event to the appropriate message group
55+
msgIndexMap[msgIndex] = append(msgIndexMap[msgIndex], event)
56+
log.Debug().
57+
Int("event_index", i).
58+
Str("event_type", event.Type).
59+
Uint32("assigned_msg_index", msgIndex).
60+
Msg("EVENTS: Assigned event to message group")
61+
}
62+
63+
// If we couldn't find any message indices, use all events for the first message
64+
if len(msgIndexMap) == 0 && len(events) > 0 {
65+
log.Debug().Msg("EVENTS: No message indices found, assigning all events to message index 0")
66+
msgIndexMap[0] = events
67+
}
68+
69+
// Log the grouping results
70+
for msgIndex, events := range msgIndexMap {
71+
log.Debug().
72+
Uint32("msg_index", msgIndex).
73+
Int("events_count", len(events)).
74+
Msg("EVENTS: Message group events count")
75+
}
76+
77+
// Create logs for each message index
78+
for msgIndex, msgEvents := range msgIndexMap {
79+
stringEvents := sdk.StringifyEvents(msgEvents)
80+
81+
msgLog := sdk.ABCIMessageLog{
82+
MsgIndex: msgIndex,
83+
Events: stringEvents,
84+
}
85+
logs = append(logs, msgLog)
86+
87+
log.Debug().
88+
Uint32("msg_index", msgIndex).
89+
Int("events_count", len(stringEvents)).
90+
Msg("EVENTS: Created log entry for message index")
91+
}
92+
93+
log.Debug().
94+
Int("created_logs_count", len(logs)).
95+
Msg("EVENTS: Completed conversion of events to logs")
96+
97+
return logs
98+
}
99+
100+
// UpdateTransactionLogs updates the transaction logs in the database if they're empty
101+
// and there are events available. Returns true if logs were updated, false otherwise.
102+
func UpdateTransactionLogs(tx *juno.Transaction, db *database.Db) bool {
103+
// Skip if logs are already present
104+
if len(tx.Logs) > 0 {
105+
log.Debug().
106+
Str("txhash", tx.TxHash).
107+
Int("logs_count", len(tx.Logs)).
108+
Msg("EVENTS: Transaction already has logs, skipping")
109+
return false
110+
}
111+
112+
// Skip if no events are present
113+
if len(tx.Events) == 0 {
114+
log.Debug().
115+
Str("txhash", tx.TxHash).
116+
Msg("EVENTS: Transaction has no events, skipping")
117+
return false
118+
}
119+
120+
log.Info().
121+
Str("txhash", tx.TxHash).
122+
Int("events_count", len(tx.Events)).
123+
Msg("EVENTS: Enriching transaction logs from events")
124+
125+
// Log some details about the events for debugging
126+
for i, event := range tx.Events {
127+
log.Debug().
128+
Str("txhash", tx.TxHash).
129+
Int("event_index", i).
130+
Str("event_type", event.Type).
131+
Int("attributes_count", len(event.Attributes)).
132+
Msg("EVENTS: Processing event")
133+
}
134+
135+
// Convert events to logs format
136+
logs := ConvertEventsToLogs(tx.Events)
137+
138+
// Skip if no logs were created
139+
if len(logs) == 0 {
140+
log.Debug().
141+
Str("txhash", tx.TxHash).
142+
Msg("EVENTS: No logs were created from events")
143+
return false
144+
}
145+
146+
log.Debug().
147+
Str("txhash", tx.TxHash).
148+
Int("created_logs_count", len(logs)).
149+
Msg("EVENTS: Successfully converted events to logs")
150+
151+
// Store the logs in the transaction
152+
logsBz, err := json.Marshal(logs)
153+
if err != nil {
154+
log.Error().
155+
Err(err).
156+
Str("txhash", tx.TxHash).
157+
Msg("EVENTS: Error while marshaling logs")
158+
return false
159+
}
160+
161+
// Update the transaction logs
162+
err = db.UpdateTransactionLogs(tx.TxHash, logsBz)
163+
if err != nil {
164+
log.Error().
165+
Err(err).
166+
Str("txhash", tx.TxHash).
167+
Msg("EVENTS: Error while updating transaction logs")
168+
return false
169+
}
170+
171+
log.Info().
172+
Str("txhash", tx.TxHash).
173+
Int("logs_count", len(logs)).
174+
Msg("EVENTS: Successfully updated transaction logs in database")
175+
176+
// Update the transaction object with the new logs
177+
tx.Logs = logs
178+
179+
return true
180+
}

0 commit comments

Comments
 (0)