-
Notifications
You must be signed in to change notification settings - Fork 429
Make diskann-disk runtime-agnostic with async APIs #1184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,7 +42,6 @@ use crate::{ | |
| }, | ||
| inmem_builder::{load_inmem_index_builder, new_inmem_index_builder, InmemIndexBuilder}, | ||
| quantizer::BuildQuantizer, | ||
| tokio::create_runtime, | ||
| }, | ||
| chunking::{ | ||
| checkpoint::{ | ||
|
|
@@ -205,22 +204,7 @@ where | |
| ) | ||
| } | ||
|
|
||
| pub fn build(&mut self) -> ANNResult<()> { | ||
| let runtime = create_runtime(self.index_configuration.num_threads)?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth noting: build parallelism is governed by two numbers — |
||
| runtime.block_on(async { | ||
| match self.build_internal().await { | ||
| Err(err) if err.kind() == ANNErrorKind::BuildInterrupted => { | ||
| info!( | ||
| "Index build was interrupted by continuation_checker, progress saved for resumption" | ||
| ); | ||
| Ok(()) // Return success for controlled interruptions | ||
| } | ||
| result => result, // Pass through any other result (Ok or Err) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| async fn build_internal(&mut self) -> ANNResult<()> { | ||
| pub async fn build(&mut self) -> ANNResult<()> { | ||
| let mut logger = PerfLogger::new_disk_index_build_logger(); | ||
|
|
||
| let pool = create_thread_pool(self.index_configuration.num_threads)?; | ||
|
|
@@ -233,17 +217,30 @@ where | |
| self.index_configuration.num_threads | ||
| ); | ||
|
|
||
| self.generate_compressed_data(pool.as_ref()).await?; | ||
| logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction); | ||
| let result: ANNResult<()> = async { | ||
| self.generate_compressed_data(pool.as_ref()).await?; | ||
| logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction); | ||
|
|
||
| self.build_inmem_index(pool.as_ref()).await?; | ||
| logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild); | ||
| self.build_inmem_index(pool.as_ref()).await?; | ||
| logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild); | ||
|
|
||
| // Use physical file to pass the memory index to the disk writer | ||
| self.create_disk_layout()?; | ||
| logger.log_checkpoint(DiskIndexBuildCheckpoint::DiskLayout); | ||
| // Use physical file to pass the memory index to the disk writer | ||
| self.create_disk_layout()?; | ||
| logger.log_checkpoint(DiskIndexBuildCheckpoint::DiskLayout); | ||
|
|
||
| Ok(()) | ||
| Ok(()) | ||
| } | ||
| .await; | ||
|
|
||
| match result { | ||
| Err(err) if err.kind() == ANNErrorKind::BuildInterrupted => { | ||
| info!( | ||
| "Index build was interrupted by continuation_checker, progress saved for resumption" | ||
| ); | ||
| Ok(()) // Return success for controlled interruptions | ||
| } | ||
| result => result, // Pass through any other result (Ok or Err) | ||
| } | ||
| } | ||
|
|
||
| async fn generate_compressed_data(&mut self, pool: RayonThreadPoolRef<'_>) -> ANNResult<()> { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,6 @@ pub mod core; | |
| pub mod quantizer; | ||
|
|
||
| pub mod inmem_builder; | ||
| pub mod tokio; | ||
|
|
||
| #[cfg(test)] | ||
| mod tests; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI benchmark shows QPS dropping ~70% on this path (run).
The cause looks like the
current_threadruntime +buffer_unordered: the search futures never suspend (get_elementisfuture::ready), so on a single worker thread they run one after another —buffer_unordered(num_threads)gives no real parallelism here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to consider going benchmark core. It will manage the
tokioparallelism.