diff --git a/config/distribution.go b/config/distribution.go index 15585bb..d183ec0 100644 --- a/config/distribution.go +++ b/config/distribution.go @@ -3,6 +3,11 @@ package config import ( "encoding/json" "fmt" + "math" + "math/rand/v2" + "sync" + + "github.com/sei-protocol/sei-load/utils/rng" ) var ( @@ -16,10 +21,9 @@ type indexSampler interface { SampleIndex(n uint64) (uint64, error) } -// Distribution is a tagged wrapper over a keyspace index distribution, selected -// by a "Name" discriminator on the JSON wire format. The discriminator strings -// ("uniform", "zipfian") and the "theta" parameter name are a frozen -// saved-workload contract; do not rename them. +// Distribution is a tagged keyspace index sampler selected by a "Name" +// discriminator on the JSON wire. The discriminator strings and "theta" +// parameter are a FROZEN saved-workload contract; see package doc. type Distribution struct { name string delegate indexSampler @@ -27,8 +31,20 @@ type Distribution struct { func (d *Distribution) Name() string { return d.name } -// SampleIndex delegates to the selected distribution. A zero-value (no Name) -// Distribution samples nothing and returns 0. +// SetStream binds the sampler to a deterministic sub-stream (nil = unseeded +// global RNG); a zero-value Distribution draws nothing, so it no-ops. See +// package doc for the reproducibility contract. +func (d *Distribution) SetStream(s *rng.Stream) { + switch delegate := d.delegate.(type) { + case *UniformDistribution: + delegate.stream = s + case *ZipfianDistribution: + delegate.stream = s + } +} + +// SampleIndex delegates to the selected sampler; a zero-value (no Name) +// Distribution returns 0. func (d *Distribution) SampleIndex(n uint64) (uint64, error) { if d.delegate == nil { return 0, nil @@ -48,11 +64,8 @@ func (d *Distribution) UnmarshalJSON(data []byte) error { case "": return nil case "uniform": - var uniform UniformDistribution - if err := json.Unmarshal(data, &uniform); err != nil { - return err - } - d.delegate = &uniform + // No JSON parameters; the stream is bound later via SetStream. + d.delegate = &UniformDistribution{} return nil case "zipfian": var zipfian ZipfianDistribution @@ -70,22 +83,78 @@ func (d *Distribution) UnmarshalJSON(data []byte) error { } // UniformDistribution draws each index with equal probability. -type UniformDistribution struct{} +type UniformDistribution struct { + stream *rng.Stream +} -func (UniformDistribution) SampleIndex(n uint64) (uint64, error) { - // PLT-460: implement the seeded uniform draw. Out of scope for PLT-455 - // (wire format + validation only). - return 0, nil +func (u *UniformDistribution) SampleIndex(n uint64) (uint64, error) { + if n == 0 { + return 0, fmt.Errorf("uniform sample: empty keyspace (n == 0)") + } + if u.stream != nil { + return u.stream.Uint64N(n), nil + } + return rand.Uint64N(n), nil } -// ZipfianDistribution draws indices with a Zipf-distributed skew controlled by -// theta. theta == 0 is uniform; larger theta concentrates draws on low indices. +// ZipfianDistribution is the YCSB precomputed-zeta generator: zeta(n, theta) is +// computed once per keyspace size n and cached, so each draw is O(1). theta == 0 +// is uniform; larger theta hotspots the low indices. See package doc for the math. +// +// not copy-safe: holds a sync.Mutex; use only via *ZipfianDistribution. type ZipfianDistribution struct { Theta float64 `json:"theta"` + + stream *rng.Stream + + mu sync.Mutex + state *zipfState // memoized for state.n; recomputed when n changes. +} + +// zipfState holds the O(1) draw constants for one keyspace size n. See package doc. +type zipfState struct { + n uint64 + theta float64 + zetaN float64 // zeta(n, theta) + alpha float64 // 1 / (1 - theta) + eta float64 + halfPowTheta float64 // 0.5^theta, the boundary mass for index 1 } -// zipfianThetaMax bounds theta to the range over which the YCSB precomputed-zeta -// generator (PLT-460) is numerically well-behaved. +// newZipfState precomputes zeta(n, theta) in O(n) and the O(1) draw constants; +// see package doc for the derivation. +func newZipfState(n uint64, theta float64) *zipfState { + zetaN := zeta(n, theta) + zeta2 := zeta(2, theta) + + // eta is unread for n <= 2; pin its NaN denominator to 0. See package doc. + denom := 1.0 - zeta2/zetaN + var eta float64 + if denom != 0 { + eta = (1.0 - math.Pow(2.0/float64(n), 1.0-theta)) / denom + } + + return &zipfState{ + n: n, + theta: theta, + zetaN: zetaN, + alpha: 1.0 / (1.0 - theta), + eta: eta, + halfPowTheta: math.Pow(0.5, theta), + } +} + +// zeta returns the generalized harmonic number sum_{i=1..n} 1/i^theta, summed +// smallest-term-first for numerical stability; see package doc. +func zeta(n uint64, theta float64) float64 { + var sum float64 + for i := n; i >= 1; i-- { + sum += 1.0 / math.Pow(float64(i), theta) + } + return sum +} + +// zipfianThetaMax bounds theta to the numerically well-behaved range; see package doc. const zipfianThetaMax = 1.0 func (z *ZipfianDistribution) validate() error { @@ -95,8 +164,37 @@ func (z *ZipfianDistribution) validate() error { return nil } +// SampleIndex draws a Zipf-skewed index in [0, n). n must be stable per sampler: +// the zeta cache is keyed on n, so a changing n recomputes O(n) every draw. See +// package doc. func (z *ZipfianDistribution) SampleIndex(n uint64) (uint64, error) { - // PLT-460: implement the YCSB precomputed-zeta zipfian draw with a seeded - // RNG. Out of scope for PLT-455 (wire format + validation only). - return 0, nil + if n == 0 { + return 0, fmt.Errorf("zipfian sample: empty keyspace (n == 0)") + } + + z.mu.Lock() + if z.state == nil || z.state.n != n || z.state.theta != z.Theta { + z.state = newZipfState(n, z.Theta) + } + st := z.state + z.mu.Unlock() + + var u float64 + if z.stream != nil { + u = z.stream.Float64() + } else { + u = rand.Float64() + } + uz := u * st.zetaN + if uz < 1.0 { + return 0, nil + } + if uz < 1.0+st.halfPowTheta { + return 1, nil + } + idx := uint64(float64(n) * math.Pow(st.eta*u-st.eta+1.0, st.alpha)) + if idx >= n { // guard against float rounding at the top of the range. + idx = n - 1 + } + return idx, nil } diff --git a/config/distribution_test.go b/config/distribution_test.go index 0dc32b2..b1757a2 100644 --- a/config/distribution_test.go +++ b/config/distribution_test.go @@ -2,11 +2,14 @@ package config_test import ( "encoding/json" + "fmt" "os" "path/filepath" "testing" + "time" "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/utils/rng" "github.com/stretchr/testify/require" ) @@ -44,6 +47,160 @@ func TestDistribution(t *testing.T) { }) } +// distribution unmarshals a fresh Distribution from a JSON fragment. +func distribution(t *testing.T, raw string) *config.Distribution { + t.Helper() + var d config.Distribution + require.NoError(t, d.UnmarshalJSON([]byte(raw))) + return &d +} + +// sample binds d to stream and pulls count draws over keyspace n. +func sample(t *testing.T, d *config.Distribution, s *rng.Stream, n uint64, count int) []uint64 { + t.Helper() + d.SetStream(s) + out := make([]uint64, count) + for i := range out { + v, err := d.SampleIndex(n) + require.NoError(t, err) + require.Less(t, v, n, "draw out of range [0, n)") + out[i] = v + } + return out +} + +// TestSampleIndexEmptyKeyspace: a zero keyspace is a caller error, not a silent +// zero, for the real samplers (the zero-value Distribution still returns 0). +func TestSampleIndexEmptyKeyspace(t *testing.T) { + t.Parallel() + for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.9}`} { + _, err := distribution(t, raw).SampleIndex(0) + require.Error(t, err, raw) + } +} + +// TestSampleIndexDeterminism: same seed + same stream id => identical draw +// sequence, for both samplers. This is the per-stream reproducibility contract. +func TestSampleIndexDeterminism(t *testing.T) { + t.Parallel() + const seed, n, count = 99, 1000, 256 + for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} { + a := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count) + b := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count) + require.Equal(t, a, b, "same seed must reproduce the draw sequence: %s", raw) + } +} + +// TestSampleIndexSeededDiffersFromUnseeded guards the binding the way +// TestRandomGasPickerStreamSeeds does for gas: a bound sampler draws +// seed-determined values that differ from the unseeded global RNG path. If a +// refactor silently broke the binding, the seeded and unseeded sequences would +// match by accident only with probability ~0. +func TestSampleIndexSeededDiffersFromUnseeded(t *testing.T) { + t.Parallel() + const seed, n, count = 7, 1000, 128 + for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} { + seeded := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count) + unseeded := sample(t, distribution(t, raw), nil, n, count) + require.NotEqual(t, seeded, unseeded, "seeded draws must differ from the unseeded global RNG: %s", raw) + } +} + +// TestUniformIsUniform: a chi-square goodness-of-fit test over evenly-sized +// buckets. With B buckets and N draws the statistic should sit well under the +// upper critical value; a badly skewed "uniform" would blow far past it. +func TestUniformIsUniform(t *testing.T) { + t.Parallel() + const n, buckets, perBucket = 1000, 20, 5000 + const draws = buckets * perBucket // 100k draws, expected 5k per bucket. + + got := sample(t, distribution(t, `{"Name":"uniform"}`), rng.NewSource(1).Stream("x"), n, draws) + counts := make([]float64, buckets) + width := uint64(n / buckets) + for _, v := range got { + counts[v/width]++ + } + expected := float64(draws) / buckets + var chi2 float64 + for _, c := range counts { + d := c - expected + chi2 += d * d / expected + } + // df = 19; chi-square upper ~0.1% critical value is ~43.8. A uniform draw + // clears this comfortably; the loose bound keeps the test non-flaky. + require.Less(t, chi2, 50.0, "uniform draws failed chi-square (chi2=%.2f)", chi2) +} + +// TestZipfianSkewRisesWithTheta: the mass on the hottest top-k indices must +// increase monotonically with theta, and theta->0 must approach the uniform +// baseline. This is the defining property of the generator. +func TestZipfianSkewRisesWithTheta(t *testing.T) { + t.Parallel() + const n, draws, topK = 10000, 100000, 100 // top 1% of the keyspace. + + topKMass := func(theta float64) float64 { + raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta) + got := sample(t, distribution(t, raw), rng.NewSource(5).Stream("x"), n, draws) + var hot int + for _, v := range got { + if v < topK { + hot++ + } + } + return float64(hot) / float64(draws) + } + + uniformBaseline := float64(topK) / float64(n) // 0.01 + m0 := topKMass(0.0) + m5 := topKMass(0.5) + m9 := topKMass(0.9) + + require.InDelta(t, uniformBaseline, m0, 0.01, "theta=0 should approximate uniform") + require.Greater(t, m5, m0, "skew must rise from theta=0 to 0.5 (m0=%.4f m5=%.4f)", m0, m5) + require.Greater(t, m9, m5, "skew must rise from theta=0.5 to 0.9 (m5=%.4f m9=%.4f)", m5, m9) + require.Greater(t, m9, 0.1, "theta=0.9 should concentrate >10%% on the top 1%% (m9=%.4f)", m9) +} + +// TestZipfianInitCostBounded: precomputing zeta for a 1e6 keyspace must finish +// quickly (it is O(n), done once), and subsequent draws must be O(1) — proven +// here by the whole operation, init plus 1000 draws, staying well under budget. +func TestZipfianInitCostBounded(t *testing.T) { + t.Parallel() + const n = 1_000_000 + d := distribution(t, `{"Name":"zipfian","theta":0.99}`) + d.SetStream(rng.NewSource(1).Stream("x")) + + start := time.Now() + for i := 0; i < 1000; i++ { + v, err := d.SampleIndex(n) + require.NoError(t, err) + require.Less(t, v, uint64(n)) + } + elapsed := time.Since(start) + require.Less(t, elapsed, 2*time.Second, "zipfian init+draws for n=1e6 too slow: %s", elapsed) +} + +// TestZipfianNoNaNAcrossThetaRange: across the valid theta range and small +// edge-case keyspaces, every draw must be a valid in-range index — guarding the +// numerical-stability constants (eta, alpha) from producing NaN/overflow. +func TestZipfianNoNaNAcrossThetaRange(t *testing.T) { + t.Parallel() + for _, theta := range []float64{0.0, 0.001, 0.5, 0.9, 0.99, 0.999} { + for _, n := range []uint64{2, 3, 100, 1000} { + raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta) + d := distribution(t, raw) + d.SetStream(rng.NewSource(1).Stream("x")) + for i := 0; i < 100; i++ { + v, err := d.SampleIndex(n) + require.NoError(t, err) + // v is a uint64 index; the in-range check is the real guard that + // the internal zeta/eta math never produced a bad (NaN-derived) draw. + require.Less(t, v, n, "theta=%v n=%d produced out-of-range index", theta, n) + } + } + } +} + // TestScenarioDistributionAdditive proves the new fields are additive: a profile // carrying no distribution fields parses unchanged and round-trips without // introducing any distribution keys. diff --git a/config/doc.go b/config/doc.go new file mode 100644 index 0000000..1a8e7dd --- /dev/null +++ b/config/doc.go @@ -0,0 +1,97 @@ +// Package config holds the load-test workload configuration: the JSON wire +// types (LoadConfig, Scenario, Settings, gas and funding pickers) and the +// keyspace Distribution primitive that scenarios use to skew which key/size +// index they touch. +// +// # Distribution primitive +// +// A Distribution is a tagged sampler that draws an index in [0, n) from some +// keyspace distribution (see distribution.go). It is selected on the JSON wire +// by a "Name" discriminator and bound at run time to a deterministic +// pseudo-random sub-stream (utils/rng) so that two runs at the same seed draw +// the same multiset of indices. +// +// # Wire format (FROZEN one-way door) +// +// The discriminator lives in the "Name" field and selects the delegate: +// +// {"Name": "uniform"} +// {"Name": "zipfian", "theta": 0.9} +// +// The discriminator strings ("uniform", "zipfian") and the "theta" parameter +// name are a FROZEN saved-workload contract: saved configs are keyed by +// config_sha256, so renaming any of them changes how an old config parses (or +// stops it parsing) and silently diverges replays. Treat them as a one-way +// door — add new names, never rename existing ones. A zero-value Distribution +// (empty Name) draws no randomness and samples 0. +// +// # Semantics: uniform vs zipfian(theta) +// +// uniform draws every index in [0, n) with equal probability. +// +// zipfian(theta) draws with a Zipf-distributed skew controlled by theta in +// [0, 1): theta -> 0 approaches uniform, while theta -> 1 concentrates draws on +// the low indices, producing a hotspot. theta is validated to [0, 1) because +// the precomputed-zeta generator below is numerically well-behaved only over +// that range (alpha = 1/(1-theta) diverges at theta = 1). +// +// # YCSB precomputed-zeta math +// +// The zipfian sampler is the YCSB precomputed-zeta generator. It rests on the +// generalized harmonic number +// +// zeta(n, theta) = sum_{i=1..n} 1 / i^theta +// +// which is O(n) to compute. zeta(n, theta) is summed from the largest i (the +// smallest term) down to i = 1: accumulating smallest-term-first keeps the +// running sum from being swamped by its leading terms, which matters for the +// n ~ 1e6 keyspaces this generator targets. +// +// From zeta(n, theta) the generator derives a set of constants that make each +// draw O(1): +// +// alpha = 1 / (1 - theta) // draw exponent +// eta = (1 - (2/n)^(1-theta)) / (1 - zeta(2,theta)/zeta(n,theta)) +// 0.5^theta // boundary mass for index 1 +// +// A draw takes one uniform u in [0, 1), forms uz = u * zeta(n, theta), and +// branches: uz < 1 returns index 0; uz < 1 + 0.5^theta returns index 1; +// otherwise the index is floor(n * (eta*u - eta + 1)^alpha), clamped to n-1 to +// absorb floating-point rounding at the top of the range. +// +// Precompute-once design: zeta(n, theta) and the constants depend only on +// (n, theta), so they are computed once and cached, keyed on n. n arrives at +// sample time (not at unmarshal time), so the cache fills lazily on first draw +// and is recomputed only if a later draw presents a different n. After the +// first draw, every subsequent draw is O(1). +// +// Edge behavior: at n <= 2, zeta(2, theta) == zeta(n, theta) so eta's +// denominator is 0 and eta would be NaN. eta is provably never read for those +// keyspaces (they are fully served by the uz < 1 and uz < 1 + 0.5^theta +// branches), but a NaN cached in state is a refactor hazard, so it is pinned to +// 0. theta = 0 reduces the generator to uniform sampling. +// +// # n must be stable per sampler +// +// The cache is keyed on n, so each sampler must be presented a stable n across +// its draws. A changing n triggers an O(n) zeta recompute on every draw and +// serializes those draws behind the cache mutex. Callers bind one sampler per +// fixed-size keyspace. (ZipfianDistribution holds that mutex and is therefore +// not copy-safe; use it only via pointer.) +// +// # Seeded-stream reproducibility (FROZEN inputs) +// +// Draws go through a bound *rng.Stream (see SetStream): a per-scenario +// substream derived from the run seed. This is what gives the workload its +// reproducibility contract — same seed + same config yields the same per-stream +// draw multiset (see package utils/rng for the precise contract and its limits +// above one worker). +// +// The stream ids feeding the samplers — "dist:%d:key" and "dist:%d:size" — are +// FROZEN, append-only inputs: a stream id is hashed to seed its sub-stream, so +// renaming one reseeds that stream and invalidates every saved replay for the +// same config_sha256. New ids may be added (they hash to their own sub-streams +// and do not perturb existing ones); existing ids must never be renamed. The +// canonical list and the full frozen-derivation note live in +// utils/rng/streams.go and utils/rng/rng.go — this is a one-way door. +package config diff --git a/generator/generator.go b/generator/generator.go index b55c59a..7a8b04d 100644 --- a/generator/generator.go +++ b/generator/generator.go @@ -62,6 +62,7 @@ func (g *configBasedGenerator) createScenarios() error { // Create scenario instance using factory scenario := scenarios.CreateScenario(scenarioCfg) g.bindGasStreams(i, scenarioCfg) + g.bindDistributionStreams(i, scenarioCfg) // Determine account pool to use var accountPool types.AccountPool @@ -139,6 +140,20 @@ func (g *configBasedGenerator) bindGasStreams(i int, cfg config.Scenario) { } } +// bindDistributionStreams binds each configured keyspace distribution for a +// scenario to its own deterministic sub-stream, keyed by the scenario's config +// index. The pointer-aliasing reasoning in bindGasStreams applies verbatim: cfg +// is a value copy but its *Distribution fields are pointers shared with the +// scenario's copy, so SetStream reaches the live sampler. +func (g *configBasedGenerator) bindDistributionStreams(i int, cfg config.Scenario) { + if cfg.KeyDistribution != nil { + cfg.KeyDistribution.SetStream(g.rng.Stream(rng.KeyDistributionStream(i))) + } + if cfg.SizeDistribution != nil { + cfg.SizeDistribution.SetStream(g.rng.Stream(rng.SizeDistributionStream(i))) + } +} + // mockDeployAll deploys all scenario instances that require deployment (for unit tests). func (g *configBasedGenerator) mockDeployAll() error { for _, instance := range g.instances { diff --git a/utils/rng/rng.go b/utils/rng/rng.go index 2ff9bf5..24ea692 100644 --- a/utils/rng/rng.go +++ b/utils/rng/rng.go @@ -41,6 +41,9 @@ import ( // 1. The derivation formula above (hash, diffusion, PCG argument order). // 2. The set of stream-id strings (defined as constants in streams.go). The // streamID feeds fnv1a64, so renaming "gas:0:base" reseeds that stream. +// Additions are append-only and do not perturb existing streams (a new id +// hashes to its own sub-stream); PLT-460 added "dist:%d:key" and +// "dist:%d:size" for the per-scenario distribution index samplers. // 3. The per-stream draw order. Each stream is a sequence; drawing base before // tip before feecap is part of the contract — reordering draws within a // stream shifts every downstream value. diff --git a/utils/rng/streams.go b/utils/rng/streams.go index 6c3b547..4f4aa60 100644 --- a/utils/rng/streams.go +++ b/utils/rng/streams.go @@ -28,3 +28,11 @@ func GasTipStream(i int) string { return fmt.Sprintf("gas:%d:tip", i) } // GasFeeCapStream is the stream id for scenario i's fee-cap gas picker. func GasFeeCapStream(i int) string { return fmt.Sprintf("gas:%d:feecap", i) } + +// KeyDistributionStream is the stream id for scenario i's key-distribution +// index sampler (PLT-460). +func KeyDistributionStream(i int) string { return fmt.Sprintf("dist:%d:key", i) } + +// SizeDistributionStream is the stream id for scenario i's size-distribution +// index sampler (PLT-460). +func SizeDistributionStream(i int) string { return fmt.Sprintf("dist:%d:size", i) }