Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@

type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error

// maxConsensusMsgSize caps the wire size of an incoming QBFTConsensusMsg, well below
// the 128MB default p2p frame limit. A legitimate message carries at most a handful of
// small justification sub-messages (bounded in handle) plus its values, the largest of
// which is a single block proposal (a few MB on mainnet); 32MB leaves ample margin while
// bounding the receive/decode/allocation cost a malicious peer can inflict per message.
const maxConsensusMsgSize = 32 * 1024 * 1024 // 32 MB.

var supportedCompareDuties = []core.DutyType{core.DutyAttester}

// newDefinition returns a qbft definition (this is constant across all consensus instances).
Expand Down Expand Up @@ -365,7 +372,7 @@
func (c *Consensus) Start(ctx context.Context) {
p2p.RegisterHandler("qbft", c.p2pNode, protocols.QBFTv2ProtocolID,
func() proto.Message { return new(pbv1.QBFTConsensusMsg) },
c.handle)
c.handle, p2p.WithReadLimit(maxConsensusMsgSize))

go func() {
for {
Expand Down Expand Up @@ -659,7 +666,7 @@
}

// handle processes an incoming consensus wire message.
func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (proto.Message, bool, error) {

Check failure on line 669 in core/consensus/qbft/qbft.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ63Mc3PIgwHeUuLnncl&open=AZ63Mc3PIgwHeUuLnncl&pullRequest=4557
t0 := time.Now()

pbMsg, ok := req.(*pbv1.QBFTConsensusMsg)
Expand All @@ -678,7 +685,18 @@
return nil, false, errors.New("invalid duty", z.Any("duty", duty))
}

if err := verifyMsgLimits(pbMsg, len(c.pubkeys)); err != nil {
return nil, false, err
}

for _, justification := range pbMsg.GetJustification() {
// Bail out as soon as the receive deadline fires rather than burning the full
// CPU budget on signature recovery for every justification in a large message.
if ctx.Err() != nil {
return nil, false, errors.Wrap(ctx.Err(), "receive cancelled during justification verification",
z.Any("duty", duty), z.Any("after", time.Since(t0)))
}

if err := verifyMsg(justification, c.pubkeys); err != nil {
return nil, false, errors.Wrap(err, "invalid justification")
}
Expand Down Expand Up @@ -776,6 +794,29 @@
return peerIdx, nil
}

// verifyMsgLimits bounds the justification and value counts of a consensus message
// before any expensive per-element work (each justification requires an ECDSA
// signature recovery, each value a proto unmarshal + hash). Without it a single
// authenticated peer could pack one large message with many sub-messages to exhaust
// CPU/memory on every peer (amplification DoS).
func verifyMsgLimits(pbMsg *pbv1.QBFTConsensusMsg, nodes int) error {
// A legitimate justification set contains at most a quorum of ROUND-CHANGE plus a
// quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded above by 2*nodes.
maxJust := 2 * nodes
if n := len(pbMsg.GetJustification()); n > maxJust {
return errors.New("too many justifications", z.Int("count", n), z.Int("max", maxJust))
}

// Each message (the main message plus each justification) references at most two
// values (value and prepared value), so the values are bounded by 2*(justifications+1).
maxValues := 2 * (len(pbMsg.GetJustification()) + 1)
if n := len(pbMsg.GetValues()); n > maxValues {
return errors.New("too many values", z.Int("count", n), z.Int("max", maxValues))
}

return nil
}

func verifyMsg(msg *pbv1.QBFTMsg, pubkeys map[int64]*k1.PublicKey) error {
if msg == nil || msg.GetDuty() == nil {
return errors.New("invalid consensus message")
Expand Down
123 changes: 123 additions & 0 deletions core/consensus/qbft/qbft_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,129 @@ func TestQBFTConsensusHandle(t *testing.T) {
}
}

// TestQBFTConsensusHandleAmplificationLimits verifies that handle rejects messages
// carrying more justifications or values than a legitimate consensus message ever
// needs, before doing the expensive per-element signature recovery / unmarshalling.
// This caps the CPU/memory amplification a single authenticated peer can inflict.
func TestQBFTConsensusHandleAmplificationLimits(t *testing.T) {
// newConsensus returns a single-node consensus, so max justifications = 2*nodes = 2.
newConsensus := func(t *testing.T) (*Consensus, *k1.PrivateKey) {
t.Helper()

var c Consensus

deadliner := coremocks.NewDeadliner(t)
deadliner.On("Add", mock.Anything).Maybe().Return(core.DeadlineScheduled)
c.deadliner = deadliner
c.gaterFunc = func(core.Duty) bool { return true }
c.mutable.instances = make(map[core.Duty]*instance.IO[Msg])

p2pKey := testutil.GenerateInsecureK1Key(t, 0)
c.pubkeys = make(map[int64]*k1.PublicKey)
c.pubkeys[0] = p2pKey.PubKey()

return &c, p2pKey
}

// signedBase returns a validly-signed main message so verification reaches the limit checks.
signedBase := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTConsensusMsg {
t.Helper()

base := &pbv1.QBFTConsensusMsg{Msg: newRandomQBFTMsg(t)}
base.Msg.PeerIdx = 0
base.Msg.Round = 1
base.Msg.Duty = &pbv1.Duty{Slot: 42, Type: 1}

msgHash, err := hashProto(base.GetMsg())
require.NoError(t, err)

sign, err := k1util.Sign(p2pKey, msgHash[:])
require.NoError(t, err)

base.Msg.Signature = sign

return base
}

// signedJustification returns a validly-signed justification matching the base message's duty.
signedJustification := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTMsg {
t.Helper()

j := newRandomQBFTMsg(t)
j.PeerIdx = 0
j.Round = 1 // verifyMsg requires round > 0, don't rely on the random value.
j.Duty = &pbv1.Duty{Slot: 42, Type: 1}

jHash, err := hashProto(j)
require.NoError(t, err)

j.Signature, err = k1util.Sign(p2pKey, jHash[:])
require.NoError(t, err)

return j
}

t.Run("too many justifications rejected", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// 3 justifications > 2*nodes (2). Content is irrelevant since the count
// check runs before any per-justification verification.
for range 3 {
base.Justification = append(base.Justification, &pbv1.QBFTMsg{})
}

_, _, err := c.handle(context.Background(), "peerID", base)
require.ErrorContains(t, err, "too many justifications")
})

t.Run("max justifications accepted", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// Exactly 2*nodes (2) justifications must not be rejected by the count check.
for range 2 {
base.Justification = append(base.Justification, signedJustification(t, p2pKey))
}

_, _, err := c.handle(context.Background(), "peerID", base)
require.NoError(t, err)
})

t.Run("too many values rejected", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// 0 justifications => max values = 2*(0+1) = 2. Provide 3.
base.Values = []*anypb.Any{{}, {}, {}}

_, _, err := c.handle(context.Background(), "peerID", base)
require.ErrorContains(t, err, "too many values")
})

t.Run("max values accepted", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// 2 justifications => max values = 2*(2+1) = 6. A message carrying exactly
// the maximum must pass the count check and the rest of handle, guarding
// against the bound being tightened below the legitimate maximum.
for range 2 {
base.Justification = append(base.Justification, signedJustification(t, p2pKey))
}

for i := range 6 {
value, err := anypb.New(&pbv1.Duty{Slot: uint64(i + 1)})
require.NoError(t, err)

base.Values = append(base.Values, value)
}

_, _, err := c.handle(context.Background(), "peerID", base)
require.NoError(t, err)
})
}

func TestInstanceIO_MaybeStart(t *testing.T) {
t.Run("MaybeStart for new instance", func(t *testing.T) {
inst1 := instance.NewIO[Msg]()
Expand Down
42 changes: 40 additions & 2 deletions core/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ type dedupKey struct {
Round int64
}

// maxDecidedResends bounds the number of MsgDecided rebroadcasts that
// post-decision ROUND-CHANGE messages from a single peer can trigger.
// A lagging peer re-sends ROUND-CHANGE with an increasing round on each
// timeout until it learns the decided value, so a handful of resends is
// ample for liveness, while the cap stops a malicious peer minting
// ever-higher rounds from extracting unlimited large rebroadcasts.
const maxDecidedResends = 16

// decidedResend tracks the MsgDecided rebroadcasts triggered by a peer's
// post-decision ROUND-CHANGE messages.
type decidedResend struct {
Round int64 // Highest round a rebroadcast was triggered for.
Count int // Total rebroadcasts triggered by the peer.
}

// errors
var (
errCompare = errors.New("compare leader value with local value failed")
Expand Down Expand Up @@ -211,6 +226,7 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C],
qCommit []Msg[I, V, C]
buffer = make(map[int64][]Msg[I, V, C])
dedupRules = make(map[dedupKey]bool)
decidedResends = make(map[int64]decidedResend) // Bounds MsgDecided rebroadcasts by peer source.
timerChan <-chan time.Time
stopTimer func()
)
Expand Down Expand Up @@ -273,6 +289,25 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C],
return true
}

// allowDecidedResend reports whether a post-decision ROUND-CHANGE from source at
// round may trigger a MsgDecided rebroadcast, recording it when it does. It permits
// at most one rebroadcast per source per strictly-increasing round, capped at
// maxDecidedResends per source, so duplicate, replayed or maliciously
// round-incremented messages can't repeatedly trigger a large rebroadcast
// (amplification DoS), while a peer advancing to a genuinely new round still gets
// served. Sources are authenticated by the transport, so the tracked set stays
// naturally bounded by the cluster size.
allowDecidedResend := func(incomingSource, incomingRound int64) bool {
resend := decidedResends[incomingSource]
if incomingRound <= resend.Round || resend.Count >= maxDecidedResends {
return false
}

decidedResends[incomingSource] = decidedResend{Round: incomingRound, Count: resend.Count + 1}

return true
}

// changeRound updates round and clears the rule dedup state.
changeRound := func(newRound int64, rule UponRule) {
if round == newRound {
Expand Down Expand Up @@ -316,9 +351,12 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C],
inputValueCh = nil // Don't read from this channel again.

case msg := <-t.Receive:
// Just send Qcommit if consensus already decided
// Just send Qcommit if consensus already decided. The resend is rate-limited
// (see allowDecidedResend) to bound amplification; note this runs before the
// isJustified check, so the ROUND-CHANGE need not even be justified.
if len(qCommit) > 0 {
if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17
if msg.Source() != process && msg.Type() == MsgRoundChange && // Algorithm 3:17
allowDecidedResend(msg.Source(), msg.Round()) {
err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit)
}

Expand Down
112 changes: 112 additions & 0 deletions core/qbft/qbft_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,118 @@ func (m msg) Justification() []Msg[int64, int64, int64] {
return resp
}

// TestDecidedRebroadcastLimits verifies that once consensus has decided, post-decision
// ROUND-CHANGE messages trigger at most one MsgDecided rebroadcast per source per
// (strictly increasing) round, capped at maxDecidedResends per source. This bounds
// amplification while still serving lagging peers that advance to new rounds.
func TestDecidedRebroadcastLimits(t *testing.T) {
const (
n = 4
process = 0
value = 42
)

// Build a justified MsgDecided: quorum (3) commits for round 1, value 42.
commits := []msg{
{msgType: MsgCommit, peerIdx: 1, round: 1, value: value},
{msgType: MsgCommit, peerIdx: 2, round: 1, value: value},
{msgType: MsgCommit, peerIdx: 3, round: 1, value: value},
}
decided := msg{msgType: MsgDecided, peerIdx: 1, round: 1, value: value, justify: commits}

rc := func(source, round int64) msg {
return msg{msgType: MsgRoundChange, peerIdx: source, round: round}
}

// runDecidedInstance starts a qbft instance, sends it the decided message and
// returns a synchronous send function plus the channel collecting MsgDecided
// broadcasts. The receive channel is unbuffered, so the instance only accepts
// a send once it has fully processed all earlier messages (the just-sent
// message may still be in flight, hence tests end with an inert flush send).
// This makes broadcast-count assertions deterministic.
runDecidedInstance := func(t *testing.T) (func(msg), chan MsgType) {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

recv := make(chan Msg[int64, int64, int64])
decidedBroadcasts := make(chan MsgType, 100)

def := noopDef
def.Nodes = n
def.FIFOLimit = 100
def.Decide = func(context.Context, int64, int64, []Msg[int64, int64, int64]) {}

trans := Transport[int64, int64, int64]{
Broadcast: func(_ context.Context, typ MsgType, _ int64, _ int64, _ int64, _ int64,
_ int64, _ int64, _ []Msg[int64, int64, int64],
) error {
if typ == MsgDecided {
decidedBroadcasts <- typ
}

return nil
},
Receive: recv,
}

// Never-delivering input channels (this process is not a leader and proposes nothing).
go func() {
_ = Run(ctx, def, trans, 0, process, make(chan int64), make(chan int64))
}()

send := func(m msg) {
select {
case recv <- m:
case <-time.After(5 * time.Second):
require.Fail(t, "timeout sending message to qbft instance")
}
}

send(decided)

return send, decidedBroadcasts
}

t.Run("dedup duplicates and stale rounds", func(t *testing.T) {
send, broadcasts := runDecidedInstance(t)

for _, m := range []msg{
rc(2, 2), // Rebroadcast #1.
rc(2, 2), // Duplicate, no rebroadcast.
rc(2, 2), // Duplicate, no rebroadcast.
rc(3, 2), // Rebroadcast #2 (other source).
rc(3, 2), // Duplicate, no rebroadcast.
rc(2, 1), // Stale round (already rebroadcast for round 2), no rebroadcast.
rc(2, 3), // Rebroadcast #3 (source advanced to a new round).
} {
send(m)
}

// Flush with an inert message: once this send returns, all messages above
// have been fully processed, so the broadcast count is final.
send(rc(2, 1))

require.Len(t, broadcasts, 3)
})

t.Run("resend cap per source", func(t *testing.T) {
send, broadcasts := runDecidedInstance(t)

// One peer keeps advancing rounds: only the first maxDecidedResends
// ROUND-CHANGE messages may trigger a rebroadcast.
for round := int64(2); round < 2+maxDecidedResends+5; round++ {
send(rc(2, round))
}

// Flush with an inert message (stale round, never rebroadcast).
send(rc(2, 1))

require.Len(t, broadcasts, maxDecidedResends)
})
}

func TestIsJustifiedPrePrepare(t *testing.T) {
const (
n = 4
Expand Down
Loading
Loading