Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ require (
github.com/dv-net/xconfig v0.1.0
)

require github.com/dv-net/dv-proto v0.5.5
require github.com/dv-net/dv-proto v0.5.6

require (
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions internal/eproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,50 @@ func (s *Service) AddressBalance(ctx context.Context, address, assetIdentifier s
return balance, nil
}

// AddressBalanceAt returns the balance of the address for the asset on the blockchain at a specific block number.
func (s *Service) AddressBalanceAt(ctx context.Context, address, assetIdentifier string, blockchain wconstants.BlockchainType, blockNumber uint64) (decimal.Decimal, error) {
if address == "" {
return decimal.Zero, ErrAddressRequired
}

if assetIdentifier == "" {
return decimal.Zero, ErrAssetIdentifierRequired
}

if !blockchain.Valid() {
return decimal.Zero, fmt.Errorf("invalid blockchain type: %s", blockchain.String())
}

ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout)
defer cancel()

var response *connect.Response[addressesv2.BalanceResponse]
if err := retry.New().Do(func() error {
var err error
response, err = s.eproxyClient.AddressesClient.Balance(
ctx, connect.NewRequest(&addressesv2.BalanceRequest{
Address: address,
AssetIdentifier: assetIdentifier,
Blockchain: ConvertBlockchain(blockchain),
BlockNumber: &blockNumber,
}),
)
if err != nil && !strings.Contains(err.Error(), errConnectionResetByPeer) {
return fmt.Errorf("%w: %w", err, retry.ErrExit)
}
return err
}); err != nil {
return decimal.Decimal{}, err
}

balance, err := decimal.NewFromString(response.Msg.GetAmount())
if err != nil {
return decimal.Decimal{}, err
}

return balance, nil
}

// AssetDecimals returns the number of decimals for the asset on the blockchain
func (s *Service) AssetDecimals(ctx context.Context, blockchain wconstants.BlockchainType, assetIdentifier string) (int64, error) {
if assetIdentifier == "" {
Expand Down
52 changes: 52 additions & 0 deletions internal/taskmanager/event_waiting_confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dv-net/mx/logger"
"github.com/google/uuid"
"github.com/riverqueue/river"
"github.com/shopspring/decimal"
)

const JobKindWebhookWaitingConfirmations = "waiting_confirmations"
Expand Down Expand Up @@ -64,6 +65,51 @@ func (s WebhookWaitingConfirmationsArgs) Validate() error {
// Kind
func (WebhookWaitingConfirmationsArgs) Kind() string { return JobKindWebhookWaitingConfirmations }

func (s *WebhookWaitingConfirmationsWorker) verifyEVMDepositBalanceDelta(
ctx context.Context,
job *river.Job[WebhookWaitingConfirmationsArgs],
tx *transactionsv2.Transaction,
event *transactionsv2.Event,
) error {
assetIdentifier := event.GetAssetIdentifier()
if assetIdentifier == "" {
return nil
}

expectedAmount, _ := decimal.NewFromString(event.GetValue())
if !expectedAmount.IsPositive() {
return nil
}

blockHeight := tx.GetBlockHeight()
if blockHeight == 0 {
return nil
}

balanceBefore, err := s.bs.EProxy().AddressBalanceAt(ctx, job.Args.Address, assetIdentifier, job.Args.Blockchain, blockHeight-1)
if err != nil {
return fmt.Errorf("get balance before deposit block %d: %w", blockHeight-1, err)
}

balanceAfter, err := s.bs.EProxy().AddressBalanceAt(ctx, job.Args.Address, assetIdentifier, job.Args.Blockchain, blockHeight)
if err != nil {
return fmt.Errorf("get balance after deposit block %d: %w", blockHeight, err)
}

s.logger.Debugf("deposit balance delta raw: address=%s asset=%s block=%d before=%s after=%s expected=%s",
job.Args.Address, assetIdentifier, blockHeight, balanceBefore.String(), balanceAfter.String(), expectedAmount.String())

delta := balanceAfter.Sub(balanceBefore)
if delta.LessThan(expectedAmount) {
return fmt.Errorf("deposit balance delta mismatch: address=%s asset=%s block=%d expected=%s actual_delta=%s",
job.Args.Address, assetIdentifier, blockHeight, expectedAmount.String(), delta.String())
}
s.logger.Debugf("deposit balance delta verified: address=%s asset=%s block=%d expected=%s actual_delta=%s",
job.Args.Address, assetIdentifier, blockHeight, expectedAmount.String(), delta.String())

return nil
}

type WebhookWaitingConfirmationsWorker struct {
logger logger.Logger
river.WorkerDefaults[WebhookWaitingConfirmationsArgs]
Expand Down Expand Up @@ -105,6 +151,12 @@ func (s *WebhookWaitingConfirmationsWorker) Work(ctx context.Context, job *river
return river.JobSnooze(confirmationsTimeout)
}

if job.Args.Blockchain.IsEVM() && job.Args.WebhookKind == models.WebhookKindDeposit {
if err := s.verifyEVMDepositBalanceDelta(ctx, job, tx, event); err != nil {
return err
}
}

transactionData := webhooks.TransactionData{
Hash: job.Args.Hash,
Confirmations: tx.Confirmations,
Expand Down
Loading