Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions diskann-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
futures-util = { workspace = true, features = ["alloc"] }
diskann-vector.workspace = true
diskann-wide.workspace = true
diskann-label-filter.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion diskann-benchmark/src/disk_index/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ where
};

let start = std::time::Instant::now();
disk_index.build()?;
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(params.num_threads.max(1))
.build()?;
runtime.block_on(disk_index.build())?;
let total_time: MicroSeconds = start.elapsed().into();

drop(span);
Expand Down
123 changes: 65 additions & 58 deletions diskann-benchmark/src/disk_index/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* Licensed under the MIT license.
*/

use rayon::prelude::*;
use std::{collections::HashSet, fmt, sync::atomic::AtomicBool, time::Instant};

use opentelemetry::{global, trace::Span, trace::Tracer};
Expand All @@ -20,14 +19,12 @@ use diskann_disk::{
utils::{instrumentation::PerfLogger, statistics, AlignedFileReaderFactory, QueryStatistics},
};
use diskann_providers::storage::StorageReadProvider;
use diskann_providers::{
storage::{
get_compressed_pq_file, get_disk_index_file, get_pq_pivot_file, FileStorageProvider,
},
utils::{create_thread_pool, ParallelIteratorInPool},
use diskann_providers::storage::{
get_compressed_pq_file, get_disk_index_file, get_pq_pivot_file, FileStorageProvider,
};
use diskann_tools::utils::{search_index_utils, KRecallAtN};
use diskann_utils::views::Matrix;
use futures_util::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};

use crate::{
Expand Down Expand Up @@ -227,12 +224,11 @@ where
&index_reader,
vertex_provider_factory,
search_params.distance.into(),
None,
)?;

logger.log_checkpoint("index_loaded");

let pool = create_thread_pool(search_params.num_threads)?;
let runtime = tokio::runtime::Builder::new_current_thread().build()?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI benchmark shows QPS dropping ~70% on this path (run).
The cause looks like the current_thread runtime + buffer_unordered: the search futures never suspend (get_element is future::ready), so on a single worker thread they run one after another — buffer_unordered(num_threads) gives no real parallelism here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to consider going benchmark core. It will manage the tokio parallelism.

let mut search_results_per_l = Vec::with_capacity(search_params.search_list.len());
let has_any_search_failed = AtomicBool::new(false);

Expand All @@ -253,59 +249,70 @@ where
tracer.start(span_name)
};

let zipped = queries
.par_row_iter()
.zip(vector_filters.par_iter())
.zip(result_ids.par_chunks_mut(search_params.recall_at as usize))
.zip(result_dists.par_chunks_mut(search_params.recall_at as usize))
.zip(statistics_vec.par_iter_mut())
.zip(result_counts.par_iter_mut());

zipped.for_each_in_pool(
pool.as_ref(),
|(((((q, vf), id_chunk), dist_chunk), stats), rc)| {
let vector_filter = if search_params.vector_filters_file.is_none() {
None
} else {
Some(Box::new(move |vid: &u32| vf.contains(vid))
as Box<dyn Fn(&u32) -> bool + Send + Sync>)
};

match searcher.search(
q,
search_params.recall_at,
l,
Some(search_params.beam_width),
vector_filter,
search_params.is_flat_search,
) {
Ok(search_result) => {
*stats = search_result.stats.query_statistics;
*rc = search_result.results.len() as u32;
let actual_results = search_result
.results
.len()
.min(search_params.recall_at as usize);
for (i, result_item) in search_result
.results
.iter()
.take(actual_results)
.enumerate()
{
id_chunk[i] = result_item.vertex_id;
dist_chunk[i] = result_item.distance;
}
// Drive all queries concurrently on the caller-owned runtime, bounding the
// number of in-flight searches to `num_threads`.
let search_results: Vec<_> = runtime.block_on(async {
stream::iter(queries.row_iter().enumerate())
.map(|(query_id, q)| {
let vf = &vector_filters[query_id];
let searcher = &searcher;
async move {
let vector_filter = if search_params.vector_filters_file.is_none() {
None
} else {
Some(Box::new(move |vid: &u32| vf.contains(vid))
as Box<dyn Fn(&u32) -> bool + Send + Sync>)
};

let result = searcher
.search(
q,
search_params.recall_at,
l,
Some(search_params.beam_width),
vector_filter,
search_params.is_flat_search,
)
.await;
(query_id, result)
}
Err(e) => {
eprintln!("Search failed for query: {:?}", e);
*rc = 0;
id_chunk.fill(0);
dist_chunk.fill(0.0);
has_any_search_failed.store(true, std::sync::atomic::Ordering::Release);
})
.buffer_unordered(search_params.num_threads.max(1))
.collect()
.await
});

for (query_id, result) in search_results {
let base = query_id * search_params.recall_at as usize;
let id_chunk = &mut result_ids[base..base + search_params.recall_at as usize];
let dist_chunk = &mut result_dists[base..base + search_params.recall_at as usize];
match result {
Ok(search_result) => {
statistics_vec[query_id] = search_result.stats.query_statistics;
result_counts[query_id] = search_result.results.len() as u32;
let actual_results = search_result
.results
.len()
.min(search_params.recall_at as usize);
for (i, result_item) in search_result
.results
.iter()
.take(actual_results)
.enumerate()
{
id_chunk[i] = result_item.vertex_id;
dist_chunk[i] = result_item.distance;
}
}
},
);
Err(e) => {
eprintln!("Search failed for query: {:?}", e);
result_counts[query_id] = 0;
id_chunk.fill(0);
dist_chunk.fill(0.0);
has_any_search_failed.store(true, std::sync::atomic::Ordering::Release);
}
}
}
let total_time = start.elapsed();

if has_any_search_failed.load(std::sync::atomic::Ordering::Acquire) {
Expand Down
2 changes: 1 addition & 1 deletion diskann-disk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ rand.workspace = true
rayon.workspace = true
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros", "sync"] }
tracing.workspace = true
vfs = { workspace = true }

Expand Down
47 changes: 22 additions & 25 deletions diskann-disk/src/build/builder/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use crate::{
},
inmem_builder::{load_inmem_index_builder, new_inmem_index_builder, InmemIndexBuilder},
quantizer::BuildQuantizer,
tokio::create_runtime,
},
chunking::{
checkpoint::{
Expand Down Expand Up @@ -205,22 +204,7 @@ where
)
}

pub fn build(&mut self) -> ANNResult<()> {
let runtime = create_runtime(self.index_configuration.num_threads)?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting: build parallelism is governed by two numbers — num_tasks (the tokio tasks spawned via JoinSet) and the runtime's worker-thread count. The old create_runtime(num_threads) derived both from num_threads, so they stayed in lockstep.
Now the runtime is caller-owned, so the two are independent: if the caller sizes worker threads below num_tasks, the build still runs but with reduced parallelism. Might be worth a note on build() about the expected worker-thread count.

runtime.block_on(async {
match self.build_internal().await {
Err(err) if err.kind() == ANNErrorKind::BuildInterrupted => {
info!(
"Index build was interrupted by continuation_checker, progress saved for resumption"
);
Ok(()) // Return success for controlled interruptions
}
result => result, // Pass through any other result (Ok or Err)
}
})
}

async fn build_internal(&mut self) -> ANNResult<()> {
pub async fn build(&mut self) -> ANNResult<()> {
let mut logger = PerfLogger::new_disk_index_build_logger();

let pool = create_thread_pool(self.index_configuration.num_threads)?;
Expand All @@ -233,17 +217,30 @@ where
self.index_configuration.num_threads
);

self.generate_compressed_data(pool.as_ref()).await?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction);
let result: ANNResult<()> = async {
self.generate_compressed_data(pool.as_ref()).await?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction);

self.build_inmem_index(pool.as_ref()).await?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild);
self.build_inmem_index(pool.as_ref()).await?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild);

// Use physical file to pass the memory index to the disk writer
self.create_disk_layout()?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::DiskLayout);
// Use physical file to pass the memory index to the disk writer
self.create_disk_layout()?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::DiskLayout);

Ok(())
Ok(())
}
.await;

match result {
Err(err) if err.kind() == ANNErrorKind::BuildInterrupted => {
info!(
"Index build was interrupted by continuation_checker, progress saved for resumption"
);
Ok(()) // Return success for controlled interruptions
}
result => result, // Pass through any other result (Ok or Err)
}
}

async fn generate_compressed_data(&mut self, pool: RayonThreadPoolRef<'_>) -> ANNResult<()> {
Expand Down
12 changes: 8 additions & 4 deletions diskann-disk/src/build/builder/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,10 @@ pub(crate) mod disk_index_builder_tests {
}?;

let timer = Timer::new();
disk_index.build()?;
let runtime = tokio::runtime::Builder::new_multi_thread()
.build()
.expect("failed to build tokio runtime");
runtime.block_on(disk_index.build())?;
println!("Indexing time: {} seconds", timer.elapsed().as_secs_f64());

Ok(())
Expand Down Expand Up @@ -1054,9 +1057,10 @@ pub(crate) mod disk_index_builder_tests {
&index_reader,
vertex_provider_factory,
params.metric,
None,
)?;

let runtime = tokio::runtime::Builder::new_current_thread().build()?;

let data =
read_bin::<G::VectorDataType>(&mut storage_provider.open_reader(&params.data_path)?)?;
let dim = data.ncols();
Expand All @@ -1080,7 +1084,7 @@ pub(crate) mod disk_index_builder_tests {
let mut distances = vec![0f32; top_k];
let mut associated_data = vec![(); top_k];

_ = search_engine.search_internal(
_ = runtime.block_on(search_engine.search_internal(
query_data,
top_k,
search_l,
Expand All @@ -1091,7 +1095,7 @@ pub(crate) mod disk_index_builder_tests {
&mut associated_data,
&|_| true,
false,
);
));

diskann_providers::test_utils::assert_top_k_exactly_match(
q, &gt, &indices, &distances, top_k,
Expand Down
1 change: 0 additions & 1 deletion diskann-disk/src/build/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod core;
pub mod quantizer;

pub mod inmem_builder;
pub mod tokio;

#[cfg(test)]
mod tests;
Loading
Loading