Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 121 additions & 23 deletions config/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package config
import (
"encoding/json"
"fmt"
"math"
"math/rand/v2"
"sync"

"github.com/sei-protocol/sei-load/utils/rng"
)

var (
Expand All @@ -16,19 +21,30 @@ type indexSampler interface {
SampleIndex(n uint64) (uint64, error)
}

// Distribution is a tagged wrapper over a keyspace index distribution, selected
// by a "Name" discriminator on the JSON wire format. The discriminator strings
// ("uniform", "zipfian") and the "theta" parameter name are a frozen
// saved-workload contract; do not rename them.
// Distribution is a tagged keyspace index sampler selected by a "Name"
// discriminator on the JSON wire. The discriminator strings and "theta"
// parameter are a FROZEN saved-workload contract; see package doc.
type Distribution struct {
name string
delegate indexSampler
}

func (d *Distribution) Name() string { return d.name }

// SampleIndex delegates to the selected distribution. A zero-value (no Name)
// Distribution samples nothing and returns 0.
// SetStream binds the sampler to a deterministic sub-stream (nil = unseeded
// global RNG); a zero-value Distribution draws nothing, so it no-ops. See
// package doc for the reproducibility contract.
func (d *Distribution) SetStream(s *rng.Stream) {
switch delegate := d.delegate.(type) {
case *UniformDistribution:
delegate.stream = s
case *ZipfianDistribution:
delegate.stream = s
}
}

// SampleIndex delegates to the selected sampler; a zero-value (no Name)
// Distribution returns 0.
func (d *Distribution) SampleIndex(n uint64) (uint64, error) {
if d.delegate == nil {
return 0, nil
Expand All @@ -48,11 +64,8 @@ func (d *Distribution) UnmarshalJSON(data []byte) error {
case "":
return nil
case "uniform":
var uniform UniformDistribution
if err := json.Unmarshal(data, &uniform); err != nil {
return err
}
d.delegate = &uniform
// No JSON parameters; the stream is bound later via SetStream.
d.delegate = &UniformDistribution{}
return nil
case "zipfian":
var zipfian ZipfianDistribution
Expand All @@ -70,22 +83,78 @@ func (d *Distribution) UnmarshalJSON(data []byte) error {
}

// UniformDistribution draws each index with equal probability.
type UniformDistribution struct{}
type UniformDistribution struct {
stream *rng.Stream
}

func (UniformDistribution) SampleIndex(n uint64) (uint64, error) {
// PLT-460: implement the seeded uniform draw. Out of scope for PLT-455
// (wire format + validation only).
return 0, nil
func (u *UniformDistribution) SampleIndex(n uint64) (uint64, error) {
if n == 0 {
return 0, fmt.Errorf("uniform sample: empty keyspace (n == 0)")
}
if u.stream != nil {
return u.stream.Uint64N(n), nil
}
return rand.Uint64N(n), nil
}

// ZipfianDistribution draws indices with a Zipf-distributed skew controlled by
// theta. theta == 0 is uniform; larger theta concentrates draws on low indices.
// ZipfianDistribution is the YCSB precomputed-zeta generator: zeta(n, theta) is
// computed once per keyspace size n and cached, so each draw is O(1). theta == 0
// is uniform; larger theta hotspots the low indices. See package doc for the math.
//
// not copy-safe: holds a sync.Mutex; use only via *ZipfianDistribution.
type ZipfianDistribution struct {
Theta float64 `json:"theta"`

stream *rng.Stream

mu sync.Mutex
state *zipfState // memoized for state.n; recomputed when n changes.
}

// zipfState holds the O(1) draw constants for one keyspace size n. See package doc.
type zipfState struct {
n uint64
theta float64
zetaN float64 // zeta(n, theta)
alpha float64 // 1 / (1 - theta)
eta float64
halfPowTheta float64 // 0.5^theta, the boundary mass for index 1
}

// zipfianThetaMax bounds theta to the range over which the YCSB precomputed-zeta
// generator (PLT-460) is numerically well-behaved.
// newZipfState precomputes zeta(n, theta) in O(n) and the O(1) draw constants;
// see package doc for the derivation.
func newZipfState(n uint64, theta float64) *zipfState {
zetaN := zeta(n, theta)
zeta2 := zeta(2, theta)

// eta is unread for n <= 2; pin its NaN denominator to 0. See package doc.
denom := 1.0 - zeta2/zetaN
var eta float64
if denom != 0 {
eta = (1.0 - math.Pow(2.0/float64(n), 1.0-theta)) / denom
}

return &zipfState{
n: n,
theta: theta,
zetaN: zetaN,
alpha: 1.0 / (1.0 - theta),
eta: eta,
halfPowTheta: math.Pow(0.5, theta),
}
}

// zeta returns the generalized harmonic number sum_{i=1..n} 1/i^theta, summed
// smallest-term-first for numerical stability; see package doc.
func zeta(n uint64, theta float64) float64 {
var sum float64
for i := n; i >= 1; i-- {
sum += 1.0 / math.Pow(float64(i), theta)
}
return sum
}

// zipfianThetaMax bounds theta to the numerically well-behaved range; see package doc.
const zipfianThetaMax = 1.0

func (z *ZipfianDistribution) validate() error {
Expand All @@ -95,8 +164,37 @@ func (z *ZipfianDistribution) validate() error {
return nil
}

// SampleIndex draws a Zipf-skewed index in [0, n). n must be stable per sampler:
// the zeta cache is keyed on n, so a changing n recomputes O(n) every draw. See
// package doc.
func (z *ZipfianDistribution) SampleIndex(n uint64) (uint64, error) {
// PLT-460: implement the YCSB precomputed-zeta zipfian draw with a seeded
// RNG. Out of scope for PLT-455 (wire format + validation only).
return 0, nil
if n == 0 {
return 0, fmt.Errorf("zipfian sample: empty keyspace (n == 0)")
}

z.mu.Lock()
if z.state == nil || z.state.n != n || z.state.theta != z.Theta {
z.state = newZipfState(n, z.Theta)
}
st := z.state
z.mu.Unlock()

var u float64
if z.stream != nil {
u = z.stream.Float64()
} else {
u = rand.Float64()
}
uz := u * st.zetaN
if uz < 1.0 {
return 0, nil
}
if uz < 1.0+st.halfPowTheta {
return 1, nil
}
idx := uint64(float64(n) * math.Pow(st.eta*u-st.eta+1.0, st.alpha))
if idx >= n { // guard against float rounding at the top of the range.
idx = n - 1
}
return idx, nil
}
157 changes: 157 additions & 0 deletions config/distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package config_test

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/sei-protocol/sei-load/config"
"github.com/sei-protocol/sei-load/utils/rng"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -44,6 +47,160 @@ func TestDistribution(t *testing.T) {
})
}

// distribution unmarshals a fresh Distribution from a JSON fragment.
func distribution(t *testing.T, raw string) *config.Distribution {
t.Helper()
var d config.Distribution
require.NoError(t, d.UnmarshalJSON([]byte(raw)))
return &d
}

// sample binds d to stream and pulls count draws over keyspace n.
func sample(t *testing.T, d *config.Distribution, s *rng.Stream, n uint64, count int) []uint64 {
t.Helper()
d.SetStream(s)
out := make([]uint64, count)
for i := range out {
v, err := d.SampleIndex(n)
require.NoError(t, err)
require.Less(t, v, n, "draw out of range [0, n)")
out[i] = v
}
return out
}

// TestSampleIndexEmptyKeyspace: a zero keyspace is a caller error, not a silent
// zero, for the real samplers (the zero-value Distribution still returns 0).
func TestSampleIndexEmptyKeyspace(t *testing.T) {
t.Parallel()
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.9}`} {
_, err := distribution(t, raw).SampleIndex(0)
require.Error(t, err, raw)
}
}

// TestSampleIndexDeterminism: same seed + same stream id => identical draw
// sequence, for both samplers. This is the per-stream reproducibility contract.
func TestSampleIndexDeterminism(t *testing.T) {
t.Parallel()
const seed, n, count = 99, 1000, 256
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} {
a := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
b := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
require.Equal(t, a, b, "same seed must reproduce the draw sequence: %s", raw)
}
}

// TestSampleIndexSeededDiffersFromUnseeded guards the binding the way
// TestRandomGasPickerStreamSeeds does for gas: a bound sampler draws
// seed-determined values that differ from the unseeded global RNG path. If a
// refactor silently broke the binding, the seeded and unseeded sequences would
// match by accident only with probability ~0.
func TestSampleIndexSeededDiffersFromUnseeded(t *testing.T) {
t.Parallel()
const seed, n, count = 7, 1000, 128
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} {
seeded := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
unseeded := sample(t, distribution(t, raw), nil, n, count)
require.NotEqual(t, seeded, unseeded, "seeded draws must differ from the unseeded global RNG: %s", raw)
}
}

// TestUniformIsUniform: a chi-square goodness-of-fit test over evenly-sized
// buckets. With B buckets and N draws the statistic should sit well under the
// upper critical value; a badly skewed "uniform" would blow far past it.
func TestUniformIsUniform(t *testing.T) {
t.Parallel()
const n, buckets, perBucket = 1000, 20, 5000
const draws = buckets * perBucket // 100k draws, expected 5k per bucket.

got := sample(t, distribution(t, `{"Name":"uniform"}`), rng.NewSource(1).Stream("x"), n, draws)
counts := make([]float64, buckets)
width := uint64(n / buckets)
for _, v := range got {
counts[v/width]++
}
expected := float64(draws) / buckets
var chi2 float64
for _, c := range counts {
d := c - expected
chi2 += d * d / expected
}
// df = 19; chi-square upper ~0.1% critical value is ~43.8. A uniform draw
// clears this comfortably; the loose bound keeps the test non-flaky.
require.Less(t, chi2, 50.0, "uniform draws failed chi-square (chi2=%.2f)", chi2)
}

// TestZipfianSkewRisesWithTheta: the mass on the hottest top-k indices must
// increase monotonically with theta, and theta->0 must approach the uniform
// baseline. This is the defining property of the generator.
func TestZipfianSkewRisesWithTheta(t *testing.T) {
t.Parallel()
const n, draws, topK = 10000, 100000, 100 // top 1% of the keyspace.

topKMass := func(theta float64) float64 {
raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta)
got := sample(t, distribution(t, raw), rng.NewSource(5).Stream("x"), n, draws)
var hot int
for _, v := range got {
if v < topK {
hot++
}
}
return float64(hot) / float64(draws)
}

uniformBaseline := float64(topK) / float64(n) // 0.01
m0 := topKMass(0.0)
m5 := topKMass(0.5)
m9 := topKMass(0.9)

require.InDelta(t, uniformBaseline, m0, 0.01, "theta=0 should approximate uniform")
require.Greater(t, m5, m0, "skew must rise from theta=0 to 0.5 (m0=%.4f m5=%.4f)", m0, m5)
require.Greater(t, m9, m5, "skew must rise from theta=0.5 to 0.9 (m5=%.4f m9=%.4f)", m5, m9)
require.Greater(t, m9, 0.1, "theta=0.9 should concentrate >10%% on the top 1%% (m9=%.4f)", m9)
}

// TestZipfianInitCostBounded: precomputing zeta for a 1e6 keyspace must finish
// quickly (it is O(n), done once), and subsequent draws must be O(1) — proven
// here by the whole operation, init plus 1000 draws, staying well under budget.
func TestZipfianInitCostBounded(t *testing.T) {
t.Parallel()
const n = 1_000_000
d := distribution(t, `{"Name":"zipfian","theta":0.99}`)
d.SetStream(rng.NewSource(1).Stream("x"))

start := time.Now()
for i := 0; i < 1000; i++ {
v, err := d.SampleIndex(n)
require.NoError(t, err)
require.Less(t, v, uint64(n))
}
elapsed := time.Since(start)
require.Less(t, elapsed, 2*time.Second, "zipfian init+draws for n=1e6 too slow: %s", elapsed)
}

// TestZipfianNoNaNAcrossThetaRange: across the valid theta range and small
// edge-case keyspaces, every draw must be a valid in-range index — guarding the
// numerical-stability constants (eta, alpha) from producing NaN/overflow.
func TestZipfianNoNaNAcrossThetaRange(t *testing.T) {
t.Parallel()
for _, theta := range []float64{0.0, 0.001, 0.5, 0.9, 0.99, 0.999} {
for _, n := range []uint64{2, 3, 100, 1000} {
raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta)
d := distribution(t, raw)
d.SetStream(rng.NewSource(1).Stream("x"))
for i := 0; i < 100; i++ {
v, err := d.SampleIndex(n)
require.NoError(t, err)
// v is a uint64 index; the in-range check is the real guard that
// the internal zeta/eta math never produced a bad (NaN-derived) draw.
require.Less(t, v, n, "theta=%v n=%d produced out-of-range index", theta, n)
}
}
}
}

// TestScenarioDistributionAdditive proves the new fields are additive: a profile
// carrying no distribution fields parses unchanged and round-trips without
// introducing any distribution keys.
Expand Down
Loading
Loading