Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
go: ["1.25"]
go: ["1.25.11"]
steps:
- uses: actions/checkout@v6

Expand Down Expand Up @@ -70,7 +70,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: "1.25"
go-version: "1.25.11"
cache: true

# MinIO can't run as a `services:` container because GitHub Actions
Expand Down Expand Up @@ -115,7 +115,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: "1.25"
go-version: "1.25.11"
cache: true

- name: staticcheck
Expand All @@ -137,7 +137,7 @@ jobs:
- uses: actions/checkout@v6
- uses: actions/setup-go@v6
with:
go-version: "1.25"
go-version: "1.25.11"
cache: true
- name: Install govulncheck
run: go install golang.org/x/vuln/cmd/govulncheck@latest
Expand Down
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ linters:
- (net/http.ResponseWriter).Write
- (github.com/jackc/pgx/v5.Tx).Rollback
- (github.com/hallelx2/vectorless-engine/pkg/queue.Queue).Close
misspell:
# "strat" is our abbreviation for "strategy" (variable + key names),
# not a misspelling of "start".
ignore-rules:
- strat
exclusions:
rules:
- path: _test\.go
Expand Down
32 changes: 16 additions & 16 deletions cmd/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
ID string `json:"id"`
} `json:"sections"`
}
json.Unmarshal(treeResp, &treeData)
_ = json.Unmarshal(treeResp, &treeData) // benchmark best-effort parse

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Handle tree parse failure explicitly before section benchmark selection.

If json.Unmarshal fails, sectionID remains empty and “Get Section” benchmarking is silently skipped. Log/track this as a failed setup step so results aren’t misread as complete.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cmd/benchmark/main.go` at line 80, The json.Unmarshal call in
cmd/benchmark/main.go is ignoring errors which can leave sectionID empty and
cause the "Get Section" benchmark to be silently skipped; change the
unmarshalling of treeResp into treeData (the json.Unmarshal(treeResp, &treeData)
call) to check the returned error, and on failure log an explicit setup error
(including the error detail) and mark the benchmark run as failed or abort
further section selection (e.g., return a non-zero exit or set a setupFailure
flag) so the missing sectionID is not treated as a successful run; ensure
downstream code that uses sectionID (section selection / "Get Section"
benchmark) respects this failure signal.

sectionID := ""
if len(treeData.Sections) > 1 {
sectionID = treeData.Sections[1].ID // pick a child section
Expand All @@ -87,9 +87,9 @@ func main() {
fmt.Printf("Using section ID: %s\n\n", sectionID)

type result struct {
name string
rest []time.Duration
grpc []time.Duration
name string
rest []time.Duration
grpc []time.Duration
}

var results []result
Expand All @@ -101,12 +101,12 @@ func main() {
fmt.Print(".")
// REST
start := time.Now()
restGET(base + "/v1/health")
_, _ = restGET(base + "/v1/health")
r.rest = append(r.rest, time.Since(start))

// gRPC (Connect)
start = time.Now()
healthClient.Check(ctx, connect.NewRequest(&v1.HealthCheckRequest{}))
_, _ = healthClient.Check(ctx, connect.NewRequest(&v1.HealthCheckRequest{}))
r.grpc = append(r.grpc, time.Since(start))
Comment on lines +104 to 110

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don’t include failed calls in latency samples.

All benchmark paths discard request errors, so network/protocol failures are still timed and recorded as “successful” latency. This skews median/p95 and can invert REST vs gRPC conclusions. Track failures separately and only append durations for successful calls (or report success-rate alongside latency).

Also applies to: 121-126, 137-144, 156-163, 185-194

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cmd/benchmark/main.go` around lines 104 - 110, Benchmark currently records
durations for failed requests (e.g., the REST call via restGET and gRPC via
healthClient.Check) which skews latency stats; change each measured block
(places using restGET, healthClient.Check and similar REST/gRPC calls that
append to r.rest/r.grpc/r.rerpc etc.) to capture and check the error/result and
only append time.Since(start) when the call succeeded, otherwise increment a
failure counter (e.g., r.restFailures/r.grpcFailures or a shared
successes/failures map) so latency arrays contain only successful-sample
durations and failures are tracked separately; update all similar blocks (the
other ranges mentioned) to follow the same pattern using the existing variables
like ctx, connect.NewRequest, r.rest and r.grpc to locate the spots to change.

}
fmt.Println(" done")
Expand All @@ -118,11 +118,11 @@ func main() {
for i := 0; i < n; i++ {
fmt.Print(".")
start := time.Now()
restGET(base + "/v1/documents")
_, _ = restGET(base + "/v1/documents")
r.rest = append(r.rest, time.Since(start))

start = time.Now()
docsClient.ListDocuments(ctx, connect.NewRequest(&v1.ListDocumentsRequest{Limit: 10}))
_, _ = docsClient.ListDocuments(ctx, connect.NewRequest(&v1.ListDocumentsRequest{Limit: 10}))
r.grpc = append(r.grpc, time.Since(start))
}
fmt.Println(" done")
Expand All @@ -134,11 +134,11 @@ func main() {
for i := 0; i < n; i++ {
fmt.Print(".")
start := time.Now()
restGET(base + "/v1/documents/" + *docID + "/tree")
_, _ = restGET(base + "/v1/documents/" + *docID + "/tree")
r.rest = append(r.rest, time.Since(start))

start = time.Now()
docsClient.GetDocumentTree(ctx, connect.NewRequest(&v1.GetDocumentTreeRequest{
_, _ = docsClient.GetDocumentTree(ctx, connect.NewRequest(&v1.GetDocumentTreeRequest{
DocumentId: *docID,
}))
r.grpc = append(r.grpc, time.Since(start))
Expand All @@ -153,11 +153,11 @@ func main() {
for i := 0; i < n; i++ {
fmt.Print(".")
start := time.Now()
restGET(base + "/v1/sections/" + sectionID)
_, _ = restGET(base + "/v1/sections/" + sectionID)
r.rest = append(r.rest, time.Since(start))

start = time.Now()
docsClient.GetSection(ctx, connect.NewRequest(&v1.GetSectionRequest{
_, _ = docsClient.GetSection(ctx, connect.NewRequest(&v1.GetSectionRequest{
SectionId: sectionID,
}))
r.grpc = append(r.grpc, time.Since(start))
Expand All @@ -182,12 +182,12 @@ func main() {
"query": q,
})
start := time.Now()
restPOST(base+"/v1/query", body)
_, _ = restPOST(base+"/v1/query", body)
r.rest = append(r.rest, time.Since(start))

// gRPC (Connect)
start = time.Now()
queryClient.Query(ctx, connect.NewRequest(&v1.QueryRequest{
_, _ = queryClient.Query(ctx, connect.NewRequest(&v1.QueryRequest{
DocumentId: *docID,
Query: q,
}))
Expand Down Expand Up @@ -246,7 +246,7 @@ func restGET(url string) ([]byte, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer func() { _ = resp.Body.Close() }() // best-effort close
return io.ReadAll(resp.Body)
}

Expand All @@ -255,7 +255,7 @@ func restPOST(url string, body []byte) ([]byte, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer func() { _ = resp.Body.Close() }() // best-effort close
return io.ReadAll(resp.Body)
}

Expand Down
94 changes: 48 additions & 46 deletions cmd/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func run() error {
if err != nil {
return fmt.Errorf("init queue: %w", err)
}
defer q.Close()
defer func() { _ = q.Close() }() // best-effort close

llmClient, err := buildLLM(cfg.LLM)
if err != nil {
Expand Down Expand Up @@ -206,41 +206,41 @@ func run() error {
}
q.Register(queue.KindIngestDocument, pipeline.Handler())

// /v1/answer/pageindex gets its OWN PageIndexStrategy instance,
// /v1/answer/treewalk gets its OWN TreeWalkStrategy instance,
// independent of whatever selection strategy is configured in
// retrieval.strategy. This way the endpoint is always available
// (gated by retrieval.pageindex.enabled), even on a deployment
// (gated by retrieval.treewalk.enabled), even on a deployment
// using chunked-tree as its default selection path.
var pageIndexStrategy *retrieval.PageIndexStrategy
if cfg.Retrieval.PageIndex.Enabled && llmClient != nil {
pageIndexStrategy = buildPageIndexStrategy(cfg.Retrieval, llmClient, store)
logger.Info("retrieval: pageindex answer endpoint enabled",
"max_hops", pageIndexStrategy.MaxHops,
"page_content_limit", pageIndexStrategy.PageContentLimit,
"model_override", cfg.Retrieval.PageIndex.Model,
var treeWalkStrategy *retrieval.TreeWalkStrategy
if cfg.Retrieval.TreeWalk.Enabled && llmClient != nil {
treeWalkStrategy = buildTreeWalkStrategy(cfg.Retrieval, llmClient, store)
logger.Info("retrieval: treewalk answer endpoint enabled",
"max_hops", treeWalkStrategy.MaxHops,
"page_content_limit", treeWalkStrategy.PageContentLimit,
"model_override", cfg.Retrieval.TreeWalk.Model,
)
}

deps := api.Deps{
Logger: logger,
DB: pool,
Storage: store,
Queue: q,
Strategy: strategy,
Version: version,
MultiDoc: multiDoc,
LLM: llmClient,
LLMModel: modelFor(cfg.LLM),
AnswerSpan: cfg.Retrieval.AnswerSpan,
Answer: cfg.Retrieval.Answer,
Planner: planner,
Planning: cfg.Retrieval.Planning,
ReRanker: reRanker,
ReRank: cfg.Retrieval.ReRank,
Replay: replayStore,
Abstain: cfg.Retrieval.Abstain,
PageIndexStrategy: pageIndexStrategy,
PageIndex: cfg.Retrieval.PageIndex,
Logger: logger,
DB: pool,
Storage: store,
Queue: q,
Strategy: strategy,
Version: version,
MultiDoc: multiDoc,
LLM: llmClient,
LLMModel: modelFor(cfg.LLM),
AnswerSpan: cfg.Retrieval.AnswerSpan,
Answer: cfg.Retrieval.Answer,
Planner: planner,
Planning: cfg.Retrieval.Planning,
ReRanker: reRanker,
ReRank: cfg.Retrieval.ReRank,
Replay: replayStore,
Abstain: cfg.Retrieval.Abstain,
TreeWalkStrategy: treeWalkStrategy,
TreeWalk: cfg.Retrieval.TreeWalk,
}

srv := &http.Server{
Expand Down Expand Up @@ -393,36 +393,38 @@ func buildStrategy(c config.RetrievalConfig, client llmgate.Client, store storag
}
a.ModelOverride = c.Agentic.Model
return a
case "pageindex":
return buildPageIndexStrategy(c, client, store)
case "treewalk":
return buildTreeWalkStrategy(c, client, store)
case "auto":
return retrieval.NewAuto(retrieval.NewSinglePass(client), buildTreeWalkStrategy(c, client, store))
default:
return retrieval.NewChunkedTree(client)
}
}

// buildPageIndexStrategy constructs the page-based agentic
// buildTreeWalkStrategy constructs the page-based agentic
// strategy with the storage-backed PageLoader and the configured
// caps. Used by buildStrategy when retrieval.strategy=pageindex AND
// by the /v1/answer/pageindex endpoint setup (which wires its own
// caps. Used by buildStrategy when retrieval.strategy=treewalk AND
// by the /v1/answer/treewalk endpoint setup (which wires its own
// instance regardless of the selection strategy).
//
// The TOCProvider is left nil here. PR-A (toc-tree-builder) adds
// documents.toc_tree + a DB-backed provider; until it lands the
// strategy degrades to its synthesised view, which is the
// documented fallback path.
func buildPageIndexStrategy(c config.RetrievalConfig, client llmgate.Client, store storage.Storage) *retrieval.PageIndexStrategy {
p := retrieval.NewPageIndexStrategy(client)
func buildTreeWalkStrategy(c config.RetrievalConfig, client llmgate.Client, store storage.Storage) *retrieval.TreeWalkStrategy {
p := retrieval.NewTreeWalkStrategy(client)
p.PageLoader = storagePageLoader{s: store}
if c.PageIndex.MaxHops > 0 {
p.MaxHops = c.PageIndex.MaxHops
if c.TreeWalk.MaxHops > 0 {
p.MaxHops = c.TreeWalk.MaxHops
}
if c.PageIndex.PageContentLimit > 0 {
p.PageContentLimit = c.PageIndex.PageContentLimit
if c.TreeWalk.PageContentLimit > 0 {
p.PageContentLimit = c.TreeWalk.PageContentLimit
}
if c.PageIndex.MaxCitations > 0 {
p.MaxCitations = c.PageIndex.MaxCitations
if c.TreeWalk.MaxCitations > 0 {
p.MaxCitations = c.TreeWalk.MaxCitations
}
p.ModelOverride = c.PageIndex.Model
p.ModelOverride = c.TreeWalk.Model
return p
}

Expand All @@ -437,14 +439,14 @@ func (sf storageFetcher) Get(ctx context.Context, ref string) ([]byte, error) {
if err != nil {
return nil, err
}
defer rc.Close()
defer func() { _ = rc.Close() }() // best-effort close
return io.ReadAll(rc)
}

// storagePageLoader adapts a storage.Storage to
// retrieval.PageContentLoader. Mirrors storageFetcher but lives
// behind a separate interface so the two callers (agentic /
// pageindex) can be wired independently. The PageIndex strategy
// treewalk) can be wired independently. The TreeWalk strategy
// materialises section bodies once per get_pages observation, so
// reading the full reader into a []byte is the right shape.
type storagePageLoader struct{ s storage.Storage }
Expand All @@ -454,7 +456,7 @@ func (l storagePageLoader) Load(ctx context.Context, ref string) ([]byte, error)
if err != nil {
return nil, err
}
defer rc.Close()
defer func() { _ = rc.Close() }() // best-effort close
return io.ReadAll(rc)
}

Expand Down
Loading
Loading