Skip to content

Commit 5f9819f

Browse files
authored
Merge pull request #178 from shawakash/go2
feat: use kafka to publish the txn metadata in go and consume in ts side
2 parents 361e5eb + 0ab24c5 commit 5f9819f

File tree

10 files changed

+190
-28
lines changed

10 files changed

+190
-28
lines changed

backend/mote/go.mod

+17-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
require (
1717
contrib.go.opencensus.io/exporter/stackdriver v0.13.14 // indirect
1818
filippo.io/edwards25519 v1.1.0 // indirect
19+
github.com/IBM/sarama v1.43.0 // indirect
1920
github.com/Microsoft/go-winio v0.6.1 // indirect
2021
github.com/StackExchange/wmi v1.2.1 // indirect
2122
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
@@ -39,6 +40,9 @@ require (
3940
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
4041
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
4142
github.com/dfuse-io/logging v0.0.0-20201110202154-26697de88c79 // indirect
43+
github.com/eapache/go-resiliency v1.6.0 // indirect
44+
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
45+
github.com/eapache/queue v1.1.0 // indirect
4246
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
4347
github.com/fatih/color v1.16.0 // indirect
4448
github.com/fsnotify/fsnotify v1.6.0 // indirect
@@ -49,11 +53,20 @@ require (
4953
github.com/go-playground/locales v0.14.1 // indirect
5054
github.com/go-playground/universal-translator v0.18.1 // indirect
5155
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
56+
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
5257
github.com/google/uuid v1.3.0 // indirect
5358
github.com/gorilla/rpc v1.2.0 // indirect
5459
github.com/gorilla/securecookie v1.1.2 // indirect
5560
github.com/gorilla/websocket v1.5.0 // indirect
61+
github.com/hashicorp/errwrap v1.1.0 // indirect
62+
github.com/hashicorp/go-multierror v1.1.1 // indirect
63+
github.com/hashicorp/go-uuid v1.0.3 // indirect
5664
github.com/holiman/uint256 v1.2.4 // indirect
65+
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
66+
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
67+
github.com/jcmturner/gofork v1.7.6 // indirect
68+
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
69+
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
5770
github.com/json-iterator/go v1.1.12 // indirect
5871
github.com/klauspost/compress v1.17.7 // indirect
5972
github.com/leodido/go-urn v1.4.0 // indirect
@@ -66,6 +79,8 @@ require (
6679
github.com/modern-go/reflect2 v1.0.2 // indirect
6780
github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect
6881
github.com/mr-tron/base58 v1.2.0 // indirect
82+
github.com/pierrec/lz4/v4 v4.1.21 // indirect
83+
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
6984
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
7085
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 // indirect
7186
github.com/supranational/blst v0.3.11 // indirect
@@ -84,8 +99,8 @@ require (
8499
golang.org/x/crypto v0.21.0 // indirect
85100
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
86101
golang.org/x/mod v0.14.0 // indirect
87-
golang.org/x/net v0.21.0 // indirect
88-
golang.org/x/sync v0.5.0 // indirect
102+
golang.org/x/net v0.22.0 // indirect
103+
golang.org/x/sync v0.6.0 // indirect
89104
golang.org/x/sys v0.18.0 // indirect
90105
golang.org/x/term v0.18.0 // indirect
91106
golang.org/x/text v0.14.0 // indirect

backend/mote/go.sum

+54
Large diffs are not rendered by default.

backend/mote/src/config/config.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ var (
2020
BTC_RPC_HOST = "localhost:8332"
2121
BTC_RPC_PORT = "8332"
2222
BTC_POST_MODE = true
23-
JWT_ALG = jose.RS256
2423
AUTH_JWT_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqx+7XJxJR+0Lp8hLFKYr5Gc+0RPIdaZJ18GH8b//oMn7PCVe0gLQDkxjvhKo2ySMgWSOSGaNJkZXLhN4jlot/xaulN3dSbrgQPxvx3ALd3nXJaTLOb7xBODd196r+Ylg1QPICdrBQVi6qAXacq/UBK8K7BWQ0TG2/R9aB5mNSGtY3Ogj9xp2MP5LTi7f2Alj6IwSFRN+9SCmH3NiQzNUPBWJB02Lgx1oxwtfevkQ3BpwIqzkOTTE1G7PXgKbYRBUlUNqwvMIjk89tRf/qHgMbRPGYYNu7XoRt8AOVgNFUcL51Gb9vM75XstWoAh6BwYQsceEXUU7dgIJem9zItFRdwIDAQAB-----END PUBLIC KEY-----"
2524
AUTH_JWT_PRIVATE_KEY = "-----BEGIN PRIVATE KEY-----MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCrH7tcnElH7QunyEsUpivkZz7RE8h1pknXwYfxv/+gyfs8JV7SAtAOTGO+EqjbJIyBZI5IZo0mRlcuE3iOWi3/Fq6U3d1JuuBA/G/HcAt3edclpMs5vvEE4N3X3qv5iWDVA8gJ2sFBWLqoBdpyr9QErwrsFZDRMbb9H1oHmY1Ia1jc6CP3GnYw/ktOLt/YCWPojBIVE371IKYfc2JDM1Q8FYkHTYuDHWjHC196+RDcGnAirOQ5NMTUbs9eApthEFSVQ2rC8wiOTz21F/+oeAxtE8Zhg27tehG3wA5WA0VRwvnUZv28zvley1agCHoHBhCxx4RdRTt2Agl6b3Mi0VF3AgMBAAECggEAMom1kN1LOyXDynJ50ghdcCAZyi+YhT5uEn1Cg+AbQ8ZDH3k97rIL9h0TXAAwxD+gC1rCNpmq2AHwH1h6wzfY27w8JRT9FJhPQIINFQ5/JHLkWma36j78+V7bxbQqgBDVezOZsWdcqcrlnVfVMwfAiv2TMTQRR+bxzwGiWho8QoWNq1UcA8GGOE3vzWGrZJbgVwG43xUVDJtMem9w4QwlHLwekP3Q46Lqx1AOtesN39h/HduJtWtYGcw/t2TkIW9UibmBqZy7rkZW+4hCXhGBI6YhAUYnuyP6ZT+r1+J2aPlJeo2yIyjc6YVxoFwUR7QWtINzuBtG9v/YXfmtYCnEIQKBgQDd3E/nX4Vc7xolvoQZzN+0XWnyLqPgtAL63RLFfb8/lhnHHzca2k2eI0+P6I9etDa9c+i4l7/RM5LUkwxd8RGH6S4m8FiUkdKyaiwK1PAGRiUbWaij9WjKVjp7QhEtisQvtyMa9quwpv3C02zbD31/PqgeTmXOH0Aweh162qyeBwKBgQDFdMQ13JCkQe3GNO06EEPE+NjFkLqBVP2leDXmUVZRnHUN4OMpjCb9H0+/4rOAusCRmS9kALoeo6U9ykC5mqUViVzeTqnHXctD4llvEzSngDU/4+cbUQG3obj8JG9lupe/p3r6gRvB13nWiwVzj2wgK2SY0HGG1gaRaIS3K2nVEQKBgQCNhlJ6V9as9+GIHkYKZ0R0u/ovgU0MtAgKmye0T4jGOSvsd58hRAyrSf8g38tFMFSS+fOEfVjhTLLnY35KFtOGDVthf4QiEfuD0HKT3k3W0rws/D61iID2QZdAtV5b3N9VSM/eDWhsYboSo+gWvYTivMdlvcD3gbvisKNJkWD31QKBgCdgIqSPCHUJBK6K7WevyKPl7+xt8RNLbI1rzGvSeoEpzxnmZ8ZoQXomnVOplJwuIaqnPpEVqAfmIFSTGZcppJQH4XIfg7HTHW67G5SP4ucoJPZJr1N+MvZ4lJgLd/90V0CL2HVN+8gK/SvwazThO/GqVZQ3tPvrgEHM8vJIAQHRAoGAEjFMNmitScLogo9Cq5oX88/KpDOfCi+IG19g0HdaepgzreDzallcKf/XnXX9d7wuTuoSRNsq7RfCmLAUlzC+Waw0dpwLkZjgeVvFdrADGOFDKEovesZ0NBQ+Ln0SXJVRaynRxgnrjYINE+1I3uE8XZie4NMh5pybTXBpyx/cIz0=-----END PRIVATE KEY-----"
2625
)
2726

2827
const (
28+
JWT_ALG = jose.RS256
2929
CLIENT_URL = "http://localhost:3000"
3030
PORT = 8085
31+
KAFKA_URL = "localhost:9092"
32+
KAFKA_TXN_TOPIC = "txn4"
3133
)
3234

3335
func init() {

backend/mote/src/routes/txn.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func SignHandler(w http.ResponseWriter, r *http.Request) {
3737
return
3838
}
3939
txn = tx
40-
hash = tx.Signatures[0].String()
40+
hash = tx.Hash
4141
fmt.Println("hash:", hash)
4242

4343
case "eth":
@@ -48,7 +48,7 @@ func SignHandler(w http.ResponseWriter, r *http.Request) {
4848
}
4949
fmt.Println("hash:", tx)
5050
txn = tx
51-
hash = string(tx.Hash().Hex())
51+
hash = string(tx.Hash)
5252

5353
case "btc":
5454
tx, err := sockets.SendBtc(txnSignQuery.From, txnSignQuery.To, txnSignQuery.Amount, txnSignQuery.Wait)

backend/mote/src/sockets/eth.go

+27-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"mote/src/config"
1212
localType "mote/src/types"
13+
"mote/src/utils"
1314

1415
// "github.com/ethereum/go-ethereum/accounts/abi/bind"
1516
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -20,7 +21,7 @@ import (
2021
// "github.com/ethereum/go-ethereum/rpc"
2122
)
2223

23-
func SendEth(from string, to string, amount float64, wait bool) (*types.Transaction, error) {
24+
func SendEth(from string, to string, amount float64, wait bool) (*localType.Txn, error) {
2425
client, err := ethclient.Dial(config.INFURA_SEPOLIA_URL)
2526
if err != nil {
2627
log.Fatalf("Failed to connect to the Ethereum client: %v", err)
@@ -82,17 +83,38 @@ func SendEth(from string, to string, amount float64, wait bool) (*types.Transact
8283
log.Fatalf("Failed to send transaction: %v", err)
8384
}
8485

86+
var txn = &localType.Txn{
87+
Hash: signedTx.Hash().Hex(),
88+
Network: "eth",
89+
From: fromAddress.Hex(),
90+
To: toAddress.Hex(),
91+
Amt: float64(signedTx.Value().Int64()) / math.Pow10(18),
92+
Fee: float64(signedTx.GasPrice().Int64()) * float64(signedTx.Gas()) / math.Pow10(18),
93+
Status: "pending",
94+
Cluster: "sepolia",
95+
Timestamp: signedTx.Time(),
96+
BlockHash: "null",
97+
Slot: signedTx.Nonce(),
98+
ClientId: "71cc7ca9-6072-4571-99c6-a595132fba2f",
99+
ChainId: chainID,
100+
}
101+
85102
if wait {
86103
receipt, err := bind.WaitMined(context.Background(), client, signedTx)
87104
if err != nil {
88105
log.Fatalf("Failed to get transaction receipt: %v", err)
89106
}
90-
107+
txn.Status = "confirmed"
108+
txn.BlockHash = receipt.BlockHash.Hex()
91109
log.Println("Status: ", receipt.Status)
92110
}
93111

94112
log.Println("txn: ", signedTx)
95-
return signedTx, nil
113+
114+
115+
utils.PublishTxn(*txn)
116+
117+
return txn, nil
96118
}
97119

98120
func GetEthTxn(hash string) (*localType.Txn, error) {
@@ -126,7 +148,7 @@ func GetEthTxn(hash string) (*localType.Txn, error) {
126148
}
127149

128150
var txn = &localType.Txn{
129-
Sig: tx.Hash().Hex(),
151+
Hash: tx.Hash().Hex(),
130152
Network: "eth",
131153
From: from.Hex(),
132154
To: tx.To().Hex(),
@@ -137,6 +159,7 @@ func GetEthTxn(hash string) (*localType.Txn, error) {
137159
Timestamp: tx.Time(),
138160
BlockHash: receipt.BlockHash.Hex(),
139161
Slot: tx.Nonce(),
162+
ClientId: "71cc7ca9-6072-4571-99c6-a595132fba2f",
140163
}
141164
return txn, nil
142165
}

backend/mote/src/sockets/sol.go

+27-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package sockets
22

33
import (
44
"context"
5+
"math/big"
6+
"strings"
57
"time"
8+
69
// "encoding/json"
710
"fmt"
811
"log"
@@ -11,8 +14,8 @@ import (
1114
// "time"
1215

1316
"mote/src/types"
17+
"mote/src/utils"
1418

15-
// "github.com/davecgh/go-spew/spew"
1619
bin "github.com/gagliardetto/binary"
1720
"github.com/gagliardetto/solana-go"
1821
"github.com/gagliardetto/solana-go/programs/system"
@@ -28,7 +31,7 @@ type SendSolResult struct {
2831
Txn solana.Transaction
2932
}
3033

31-
func SendSol(from string, to string, amount float64, wait bool) (*solana.Transaction, error) {
34+
func SendSol(from string, to string, amount float64, wait bool) (*types.Txn, error) {
3235
rpcClient := rpc.New(rpc.DevNet_RPC)
3336

3437
wsClient, err := ws.Connect(context.Background(), rpc.DevNet_WS)
@@ -104,7 +107,6 @@ func SendSol(from string, to string, amount float64, wait bool) (*solana.Transac
104107
panic(err)
105108
}
106109
log.Println("hash: ", sig)
107-
return tx, nil
108110
} else {
109111
sig, err := rpcClient.SendTransactionWithOpts(
110112
context.TODO(),
@@ -117,8 +119,13 @@ func SendSol(from string, to string, amount float64, wait bool) (*solana.Transac
117119
panic(err)
118120
}
119121
log.Println("hash: ", sig)
120-
return tx, nil
121122
}
123+
txn, _ := GetSolTxn(tx.Signatures[0].String())
124+
log.Println("txn: ", txn)
125+
126+
utils.PublishTxn(*txn)
127+
128+
return txn, nil
122129
}
123130

124131
func GetSolTxn(hash string) (*types.Txn, error) {
@@ -135,22 +142,31 @@ func GetSolTxn(hash string) (*types.Txn, error) {
135142
panic(err)
136143
}
137144

138-
decodedTx, err := solana.TransactionFromDecoder(bin.NewBinDecoder(tx.Transaction.GetBinary()))
139-
if err != nil {
140-
panic(err)
141-
}
145+
log.Println("tx: ", tx)
146+
147+
decodedTx, err := solana.TransactionFromDecoder(bin.NewBinDecoder(tx.Transaction.GetBinary()))
148+
if err != nil {
149+
panic(err)
150+
}
151+
142152
var txn = &types.Txn{
143-
Sig: string(decodedTx.Signatures[0].String()),
153+
Hash: string(decodedTx.Signatures[0].String()),
144154
Network: "sol",
145155
Cluster: "devnet",
146156
Timestamp: time.Unix(tx.BlockTime.Time().Unix(), 0),
147157
Slot: tx.Slot,
148158
From: string(decodedTx.Message.AccountKeys[0].String()),
149159
To: string(decodedTx.Message.AccountKeys[1].String()),
150160
BlockHash: decodedTx.Message.RecentBlockhash.String(),
151-
Amt: float64(tx.Meta.PreBalances[0] - tx.Meta.PostBalances[0]),
152-
Fee: float64(tx.Meta.Fee),
161+
Amt: float64(tx.Meta.PostBalances[0] - tx.Meta.PreBalances[0]) / 1e9,
162+
Fee: float64(tx.Meta.Fee) / 1e9,
153163
Status: string("confirmed"),
164+
ClientId: "71cc7ca9-6072-4571-99c6-a595132fba2f",
165+
ChainId: big.NewInt(103),
166+
}
167+
168+
if txn.To == strings.Repeat("1", 32) {
169+
txn.To = txn.From
154170
}
155171

156172
if err != nil {

backend/mote/src/types/txn.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package types
22

33
import (
4+
"math/big"
45
"time"
56

6-
"github.com/go-playground/validator/v10"
7+
"github.com/go-playground/validator/v10"
78
)
89

910
var Validate = validator.New()
@@ -24,15 +25,23 @@ type TxnGet struct {
2425
}
2526

2627
type Txn struct {
27-
Sig string `json:"hash"`
28+
Hash string `json:"hash"`
2829
Network string `json:"network"`
2930
Cluster string `json:"cluster"`
30-
Timestamp time.Time `json:"timestamp"`
31+
Timestamp time.Time `json:"time"`
3132
Slot uint64 `json:"slot"`
3233
From string `json:"from"`
3334
To string `json:"to"`
3435
BlockHash string `json:"blockHash"`
3536
Amt float64 `json:"amount"`
3637
Fee float64 `json:"fee"`
3738
Status string `json:"status"`
38-
}
39+
ClientId string `json:"clientId"`
40+
ChainId *big.Int `json:"chainId,omitempty"`
41+
}
42+
43+
44+
type Message struct {
45+
Key string `json:"key"`
46+
Value Txn `json:"value"`
47+
}

backend/mote/src/utils/utils.go

+37
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package utils
33
import (
44
"crypto/rsa"
55
"crypto/x509"
6+
"encoding/json"
67
"encoding/pem"
78
"errors"
89
"log"
910
"net/http"
1011
"time"
1112

1213
"mote/src/config"
14+
"mote/src/types"
1315

16+
"github.com/IBM/sarama"
1417
jose "github.com/go-jose/go-jose/v4"
1518
"github.com/go-jose/go-jose/v4/jwt"
1619
"github.com/gorilla/securecookie"
@@ -106,3 +109,37 @@ func ImportSPKI(publicKeyPem string) (*rsa.PublicKey, error) {
106109

107110
return rsaPub, nil
108111
}
112+
113+
114+
func PublishTxn(txn types.Txn) {
115+
brokers := []string{config.KAFKA_URL}
116+
117+
kConfig := sarama.NewConfig()
118+
kConfig.Producer.RequiredAcks = sarama.WaitForLocal
119+
kConfig.Producer.Compression = sarama.CompressionGZIP
120+
kConfig.Producer.Flush.Frequency = 500 * time.Millisecond
121+
122+
producer, err := sarama.NewAsyncProducer(brokers, kConfig)
123+
if err != nil {
124+
log.Fatalf("Error creating Kafka producer: %v", err)
125+
}
126+
defer producer.Close()
127+
128+
jsonData, err := json.Marshal(txn)
129+
if err != nil {
130+
log.Fatalf("Error marshalling JSON: %v", err)
131+
}
132+
133+
producer.Input() <- &sarama.ProducerMessage{
134+
Topic: config.KAFKA_TXN_TOPIC,
135+
Key: sarama.StringEncoder("txn:" + txn.Hash),
136+
Value: sarama.ByteEncoder(jsonData),
137+
}
138+
139+
// select {
140+
// case <-producer.Successes():
141+
// log.Println("Message published successfully to Kafka!")
142+
// case err := <-producer.Errors():
143+
// log.Fatalf("Failed to publish message to Kafka: %v", err.Err)
144+
// }
145+
}

packages/common/src/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ export type InsertTxnType = {
8989
slot?: number;
9090
chainId?: number;
9191
cluster?: EthCluster | Cluster | BitcoinCluster | USDCCluster;
92+
status?: string;
9293
};
9394

9495
export type TxnType = InsertTxnType & {

0 commit comments

Comments
 (0)