Skip to content

Commit

Permalink
oracle: update saved lvldb items in accept job
Browse files Browse the repository at this point in the history
  • Loading branch information
jowenshaw committed Jun 29, 2021
1 parent d4d416f commit e895734
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 78 deletions.
5 changes: 5 additions & 0 deletions dcrm/sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func doSignImpl(dcrmNode *NodeInfo, signGroupIndex int64, signPubkey string, msg
return keyID, rsvs, nil
}

// GetSignStatusByKeyID get sign status by keyID
func GetSignStatusByKeyID(keyID string) (rsvs []string, err error) {
return getSignResult(keyID, defaultDcrmNode.dcrmRPCAddress)
}

func getSignResult(keyID, rpcAddr string) (rsvs []string, err error) {
log.Info("start get sign status", "keyID", keyID)
var signStatus *SignStatus
Expand Down
31 changes: 31 additions & 0 deletions tokens/eth/signtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,34 @@ func (b *Bridge) CalcTransactionHash(tx *types.Transaction) (txHash string, err
}
return tx.Hash().Hex(), nil
}

// GetSignedTxHashOfKeyID get signed tx hash by keyID (called by oracle)
func (b *Bridge) GetSignedTxHashOfKeyID(keyID, pairID string, rawTx interface{}) (txHash string, err error) {
tx, ok := rawTx.(*types.Transaction)
if !ok {
return "", errors.New("wrong raw tx of keyID " + keyID)
}
rsvs, err := dcrm.GetSignStatusByKeyID(keyID)
if err != nil {
return "", err
}
if len(rsvs) != 1 {
return "", errors.New("wrong number of rsvs of keyID " + keyID)
}

rsv := rsvs[0]
signature := common.FromHex(rsv)
if len(signature) != crypto.SignatureLength {
return "", errors.New("wrong signature of keyID " + keyID)
}
token := b.GetTokenConfig(pairID)
signedTx, err := b.signTxWithSignature(tx, signature, common.HexToAddress(token.DcrmAddress))
if err != nil {
return "", err
}
txHash, err = b.CalcTransactionHash(signedTx)
if err != nil {
return "", fmt.Errorf("calc signed tx hash failed, %w", err)
}
return txHash, nil
}
83 changes: 52 additions & 31 deletions worker/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var (
errIdentifierMismatch = errors.New("cross chain bridge identifier mismatch")
errInitiatorMismatch = errors.New("initiator mismatch")
errWrongMsgContext = errors.New("wrong msg context")
errNonceMismatch = errors.New("nonce mismatch")
)

// StartAcceptSignJob accept job
Expand Down Expand Up @@ -142,10 +141,7 @@ func processAcceptInfo(info *dcrm.SignInfoData) {
}()

agreeResult := acceptAgree
args, err := getBuildTxArgsFromMsgContext(info)
if err == nil {
err = verifySignInfo(info, args)
}
args, err := verifySignInfo(info)
switch {
case errors.Is(err, tokens.ErrTxNotStable),
errors.Is(err, tokens.ErrTxNotFound),
Expand All @@ -170,26 +166,10 @@ func processAcceptInfo(info *dcrm.SignInfoData) {
logWorkerError("accept", "accept sign job failed", err, "keyID", keyID, "result", res, "pairID", args.PairID, "txid", args.SwapID, "bind", args.Bind, "swaptype", args.SwapType)
} else {
logWorker("accept", "accept sign job finish", "keyID", keyID, "result", agreeResult, "pairID", args.PairID, "txid", args.SwapID, "bind", args.Bind, "swaptype", args.SwapType)
if agreeResult == acceptAgree { // only record agree result
saveAcceptRecord(keyID, args)
}
isProcessed = true
}
}

func saveAcceptRecord(keyID string, args *tokens.BuildTxArgs) {
var err error
for i := 0; i < 3; i++ {
err = AddAcceptRecord(args)
if err == nil {
return
}
}
logWorkerWarn("accept", "save accept record to db failed", err,
"keyID", keyID, "pairID", args.PairID, "txid", args.SwapID,
"bind", args.Bind, "swaptype", args.SwapType.String())
}

func getBuildTxArgsFromMsgContext(signInfo *dcrm.SignInfoData) (*tokens.BuildTxArgs, error) {
msgContext := signInfo.MsgContext
if len(msgContext) != 1 {
Expand All @@ -203,9 +183,13 @@ func getBuildTxArgsFromMsgContext(signInfo *dcrm.SignInfoData) (*tokens.BuildTxA
return &args, nil
}

func verifySignInfo(signInfo *dcrm.SignInfoData, args *tokens.BuildTxArgs) error {
func verifySignInfo(signInfo *dcrm.SignInfoData) (args *tokens.BuildTxArgs, err error) {
if !params.IsDcrmInitiator(signInfo.Account) {
return errInitiatorMismatch
return nil, errInitiatorMismatch
}
args, err = getBuildTxArgsFromMsgContext(signInfo)
if err != nil {
return args, err
}
msgHash := signInfo.MsgHash
msgContext := signInfo.MsgContext
Expand All @@ -214,22 +198,32 @@ func verifySignInfo(signInfo *dcrm.SignInfoData, args *tokens.BuildTxArgs) error
case params.GetReplaceIdentifier():
case tokens.AggregateIdentifier:
if btc.BridgeInstance == nil {
return tokens.ErrNoBtcBridge
return args, tokens.ErrNoBtcBridge
}
logWorker("accept", "verifySignInfo", "msgHash", msgHash, "msgContext", msgContext)
return btc.BridgeInstance.VerifyAggregateMsgHash(msgHash, args)
err = btc.BridgeInstance.VerifyAggregateMsgHash(msgHash, args)
if err != nil {
return args, err
}
return args, nil
default:
return errIdentifierMismatch
return args, errIdentifierMismatch
}
logWorker("accept", "verifySignInfo", "keyID", signInfo.Key, "msgHash", msgHash, "msgContext", msgContext)
err := CheckAcceptRecord(args)
if args.GetTxNonce() > 0 { // only for eth like chain
err = CheckAcceptRecord(args)
if err != nil {
return args, err
}
}
err = rebuildAndVerifyMsgHash(signInfo.Key, msgHash, args)
if err != nil {
return err
return args, err
}
return rebuildAndVerifyMsgHash(msgHash, args)
return args, nil
}

func rebuildAndVerifyMsgHash(msgHash []string, args *tokens.BuildTxArgs) error {
func rebuildAndVerifyMsgHash(keyID string, msgHash []string, args *tokens.BuildTxArgs) error {
var srcBridge, dstBridge tokens.CrossChainBridge
switch args.SwapType {
case tokens.SwapinType:
Expand Down Expand Up @@ -263,5 +257,32 @@ func rebuildAndVerifyMsgHash(msgHash []string, args *tokens.BuildTxArgs) error {
if err != nil {
return err
}
return dstBridge.VerifyMsgHash(rawTx, msgHash)
err = dstBridge.VerifyMsgHash(rawTx, msgHash)
if err != nil {
return err
}
if args.GetTxNonce() > 0 { // only for eth like chain
go saveAcceptRecord(dstBridge, keyID, buildTxArgs, rawTx)
}
return nil
}

func saveAcceptRecord(bridge tokens.CrossChainBridge, keyID string, args *tokens.BuildTxArgs, rawTx interface{}) {
impl, ok := bridge.(interface {
GetSignedTxHashOfKeyID(keyID, pairID string, rawTx interface{}) (txHash string, err error)
})
if !ok {
return
}
swapTx, err := impl.GetSignedTxHashOfKeyID(keyID, args.PairID, rawTx)
if err != nil {
logWorkerError("accept", "get signed tx hash failed", err, "keyID", keyID, "pairID", args.PairID, "txid", args.SwapID, "bind", args.Bind, "swaptype", args.SwapType.String())
return
}
err = AddAcceptRecord(args, swapTx)
if err != nil {
logWorkerError("accept", "save accept record to db failed", err, "keyID", keyID, "pairID", args.PairID, "txid", args.SwapID, "bind", args.Bind, "swaptype", args.SwapType.String(), "swaptx", swapTx)
return
}
logWorker("accept", "save accept record to db sucess", "keyID", keyID, "pairID", args.PairID, "txid", args.SwapID, "bind", args.Bind, "swaptype", args.SwapType.String(), "swaptx", swapTx)
}
106 changes: 59 additions & 47 deletions worker/acceptdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,21 @@ import (
"github.com/anyswap/CrossChain-Bridge/log"
"github.com/anyswap/CrossChain-Bridge/params"
"github.com/anyswap/CrossChain-Bridge/tokens"
"github.com/anyswap/CrossChain-Bridge/types"
)

const (
identifierKey = "bridge-identifier"

allowMismatchNonceTimeInterval = 1800 // seconds
allowReswapTimeInterval = 1800 // seconds
)

var (
lvldbHandle *leveldb.Database
)

func getEthAcceptRecordKey(args *tokens.BuildTxArgs, isPrefix bool) string {
if isPrefix {
return fmt.Sprintf("%s:%d", args.SwapID, args.SwapType)
}
return fmt.Sprintf("%s:%d:%d", args.SwapID, args.SwapType, args.GetTxNonce())
func getSwapKeyPrefix(args *tokens.BuildTxArgs) string {
return strings.ToLower(fmt.Sprintf("%s:%d:%s:%s:", args.SwapID, args.SwapType, args.PairID, args.Bind))
}

func int64ToBytes(i int64) []byte {
Expand All @@ -39,70 +37,84 @@ func bytesToInt64(buf []byte) int64 {
}

// AddAcceptRecord add accept record
func AddAcceptRecord(args *tokens.BuildTxArgs) (err error) {
func AddAcceptRecord(args *tokens.BuildTxArgs, swapTx string) (err error) {
if lvldbHandle == nil {
return nil
}
swapNonce := args.GetTxNonce()
if swapNonce == 0 {
return nil
}
key := []byte(getEthAcceptRecordKey(args, false))
err = lvldbHandle.Put(key, int64ToBytes(now()))
if err != nil {
log.Warn("add accept record failed", "key", string(key), "err", err)
} else {
log.Info("add accept record success", "key", string(key))
}
return err
}

// GetAcceptRecord get accept record
func GetAcceptRecord(args *tokens.BuildTxArgs) (int64, error) {
key := []byte(getEthAcceptRecordKey(args, false))
bs, err := lvldbHandle.Get(key)
if err != nil {
return 0, err
}
return bytesToInt64(bs), nil
key := []byte(getSwapKeyPrefix(args) + swapTx)
return lvldbHandle.Put(key, int64ToBytes(now()))
}

// FindAcceptRecords find accept records
func FindAcceptRecords(args *tokens.BuildTxArgs) (result map[string]int64, err error) {
result = make(map[string]int64)
prefix := []byte(getEthAcceptRecordKey(args, true))
func FindAcceptRecords(args *tokens.BuildTxArgs) map[string]int64 {
if lvldbHandle == nil {
return nil
}
result := make(map[string]int64)
prefix := []byte(getSwapKeyPrefix(args))
iter := lvldbHandle.NewIterator(prefix, nil)
for iter.Next() {
result[string(iter.Key())] = bytesToInt64(iter.Value())
}
iter.Release()
err = iter.Error()
return result, err
return result
}

// CheckAcceptRecord check accept record
func CheckAcceptRecord(args *tokens.BuildTxArgs) (err error) {
if lvldbHandle == nil {
return nil
}
swapNonce := args.GetTxNonce()
if swapNonce == 0 {
return nil
}
_, err = GetAcceptRecord(args)
if err == nil {
return nil
}
isSwapin := args.SwapType == tokens.SwapinType
resBridge := tokens.GetCrossChainBridge(!isSwapin)
alreadySwapped := false
nowTime := now()
prefix := []byte(getEthAcceptRecordKey(args, true))

prefix := []byte(getSwapKeyPrefix(args))
prefixLen := len(prefix)
iter := lvldbHandle.NewIterator(prefix, nil)
for iter.Next() {
lastTime := bytesToInt64(iter.Value())
if lastTime+allowMismatchNonceTimeInterval > nowTime {
log.Warn("find record with nonce mismatch recently", "args", args, "oldKey", string(iter.Key()), "oldTime", lastTime, "nowTime", nowTime)
return errNonceMismatch
key := string(iter.Key())
value := bytesToInt64(iter.Value())
oldSwapTx := key[prefixLen:]
log.Info("[accept] check saved record", "key", key, "value", value)
txStatus := resBridge.GetTransactionStatus(oldSwapTx)
if txStatus.Receipt != nil {
receipt, ok := txStatus.Receipt.(*types.RPCTxReceipt)
if ok && *receipt.Status == 1 {
log.Warn("[accept] found already swapped tx", "key", key, "value", value)
alreadySwapped = true
break
}
} else if txStatus != nil && txStatus.BlockHeight > 0 {
log.Warn("[accept] found already swapped tx", "key", key, "value", value)
alreadySwapped = true
break
} else if tx, _ := resBridge.GetTransaction(oldSwapTx); tx != nil {
etx, ok := tx.(*types.Transaction)
if !ok {
log.Warn("[accept] find already swapped tx in pool", "key", key, "value", value)
alreadySwapped = true
break
}

if value+allowReswapTimeInterval <= nowTime {
continue // allow reswap old enough
}

if etx.Nonce() == args.GetTxNonce() {
continue // allow replace always
}

log.Warn("[accept] find already swapped tx in pool", "key", key, "value", value)
alreadySwapped = true
break
}
}
iter.Release()
if alreadySwapped {
return errAlreadySwapped
}
return nil
}

Expand Down

0 comments on commit e895734

Please sign in to comment.