-
Notifications
You must be signed in to change notification settings - Fork 429
Improve diskann-disk test coverage #1193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
02c3fb6
94cfbf4
6d26e5a
86e0fc8
ad17a52
31bb404
f62eb83
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,6 +104,77 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_new_checkpoint_creates_fresh_record() -> ANNResult<()> { | ||
| let temp_dir = tempdir()?; | ||
| let index_prefix = temp_dir | ||
| .path() | ||
| .join("fresh_index") | ||
| .to_str() | ||
| .unwrap() | ||
| .to_string(); | ||
| // Two managers with the same prefix+identifier should see the same checkpoint state | ||
| let manager_a = CheckpointRecordManagerWithFileStorage::new(&index_prefix, 42); | ||
| let manager_b = CheckpointRecordManagerWithFileStorage::new(&index_prefix, 42); | ||
| assert_eq!( | ||
| manager_a.get_resumption_point(WorkStage::Start)?, | ||
| manager_b.get_resumption_point(WorkStage::Start)? | ||
| ); | ||
| // A different identifier should be independent | ||
| let manager_c = CheckpointRecordManagerWithFileStorage::new(&index_prefix, 99); | ||
| assert!(!manager_c.has_completed()?); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_has_completed_false_when_no_file() -> ANNResult<()> { | ||
| let temp_dir = tempdir()?; | ||
| let index_prefix = temp_dir | ||
| .path() | ||
| .join("nonexistent_index") | ||
| .to_str() | ||
| .unwrap() | ||
| .to_string(); | ||
| let manager = CheckpointRecordManagerWithFileStorage::new(&index_prefix, 999); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This appears to be the identical test to the last thing above? |
||
| assert!(!manager.has_completed()?); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_mark_as_invalid() -> ANNResult<()> { | ||
| let temp_dir = tempdir()?; | ||
| let index_prefix = temp_dir | ||
| .path() | ||
| .join("test_invalid") | ||
| .to_str() | ||
| .unwrap() | ||
| .to_string(); | ||
| let identifier = 77; | ||
|
|
||
| let mut manager = CheckpointRecordManagerWithFileStorage::new(&index_prefix, identifier); | ||
| // Advance to a later stage with some progress | ||
| manager.update(Progress::Completed, WorkStage::QuantizeFPV)?; | ||
| manager.update(Progress::Processed(42), WorkStage::InMemIndexBuild)?; | ||
|
|
||
| // Verify we can resume from progress=42 | ||
| let manager2 = CheckpointRecordManagerWithFileStorage::new(&index_prefix, identifier); | ||
| assert_eq!( | ||
| manager2.get_resumption_point(WorkStage::QuantizeFPV)?, | ||
| Some(42) | ||
| ); | ||
|
|
||
| // Mark as invalid - progress resets to 0 (is_valid=false => progress read as 0) | ||
| let mut manager3 = CheckpointRecordManagerWithFileStorage::new(&index_prefix, identifier); | ||
| manager3.mark_as_invalid()?; | ||
| assert_eq!( | ||
| manager3.get_resumption_point(WorkStage::QuantizeFPV)?, | ||
| Some(0) | ||
| ); | ||
|
|
||
| clean_checkpoint_file(&index_prefix, identifier); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a big deal, but part of point of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_checkpoint_manager_interruption_and_resumption() -> ANNResult<()> { | ||
| let temp_dir = tempdir()?; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -150,6 +150,150 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| /// A tracker that returns Stop after `stop_after` Continue grants. | ||
| #[derive(Clone)] | ||
| struct StopAfterTracker { | ||
| count: std::sync::Arc<std::sync::Mutex<usize>>, | ||
| stop_after: usize, | ||
| } | ||
|
|
||
| impl ContinuationTrackerTrait for StopAfterTracker { | ||
| fn get_continuation_grant(&self) -> ContinuationGrant { | ||
| let mut count = self.count.lock().unwrap(); | ||
| if *count >= self.stop_after { | ||
| ContinuationGrant::Stop | ||
| } else { | ||
| *count += 1; | ||
| ContinuationGrant::Continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_process_while_resource_is_available_stops_early() { | ||
| let tracker = StopAfterTracker { | ||
| count: std::sync::Arc::new(std::sync::Mutex::new(0)), | ||
| stop_after: 3, | ||
| }; | ||
| let items = vec![10, 20, 30, 40, 50]; | ||
| let mut processed = Vec::new(); | ||
|
|
||
| let result = process_while_resource_is_available( | ||
| |item| { | ||
| processed.push(item); | ||
| Ok::<(), TestError>(()) | ||
| }, | ||
| items.into_iter(), | ||
| Box::new(tracker), | ||
| ); | ||
|
|
||
| assert!(result.is_ok()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is redundant with the next one. If you match on result.unwrap() it will already panic if it's not ok.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
| match result.unwrap() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably better written as an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Even removed panic!() to increase code coverage number. |
||
| Progress::Processed(idx) => { | ||
| assert_eq!(idx, 3); // stopped before processing item at index 3 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this comment correct? Processed(3) means processed to 3, or processed until 3? I really hope it is the former.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. it is the former. fixed. |
||
| assert_eq!(processed, vec![10, 20, 30]); | ||
| } | ||
| _ => panic!("Expected Processed"), | ||
| } | ||
| } | ||
|
|
||
| /// A tracker that yields once (with a tiny duration), then continues. | ||
| #[derive(Clone)] | ||
| struct YieldOnceThenContinueTracker { | ||
| yielded: std::sync::Arc<std::sync::Mutex<bool>>, | ||
| } | ||
|
|
||
| impl ContinuationTrackerTrait for YieldOnceThenContinueTracker { | ||
| fn get_continuation_grant(&self) -> ContinuationGrant { | ||
| let mut yielded = self.yielded.lock().unwrap(); | ||
| if !*yielded { | ||
| *yielded = true; | ||
| ContinuationGrant::Yield(std::time::Duration::ZERO) | ||
| } else { | ||
|
arrayka marked this conversation as resolved.
|
||
| // After yielding once, always continue | ||
| ContinuationGrant::Continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_process_while_resource_is_available_yield_then_continue() { | ||
| let tracker = YieldOnceThenContinueTracker { | ||
| yielded: std::sync::Arc::new(std::sync::Mutex::new(false)), | ||
| }; | ||
| let items = vec![1, 2]; | ||
| let mut processed = Vec::new(); | ||
|
|
||
| let result = process_while_resource_is_available( | ||
| |item| { | ||
| processed.push(item); | ||
| Ok::<(), TestError>(()) | ||
| }, | ||
| items.into_iter(), | ||
| Box::new(tracker), | ||
| ); | ||
|
|
||
| assert!(result.is_ok()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comments from above apply here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
| // After yielding, it should have continued and processed all items | ||
| match result.unwrap() { | ||
| Progress::Completed => assert_eq!(processed, vec![1, 2]), | ||
| _ => panic!("Expected Completed"), | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_process_while_resource_is_available_action_error() { | ||
| let checker = Box::new(NaiveContinuationTracker::default()); | ||
| let items = vec![1, 2, 3]; | ||
|
|
||
| let result = process_while_resource_is_available( | ||
| |item| { | ||
| if item == 2 { | ||
| Err(TestError) | ||
| } else { | ||
| Ok(()) | ||
| } | ||
| }, | ||
| items.into_iter(), | ||
| checker, | ||
| ); | ||
|
|
||
| assert!(result.is_err()); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_process_while_resource_is_available_async_stops_early() { | ||
| let tracker = StopAfterTracker { | ||
| count: std::sync::Arc::new(std::sync::Mutex::new(0)), | ||
| stop_after: 2, | ||
| }; | ||
| let items = vec![1, 2, 3, 4, 5]; | ||
| let processed = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new())); | ||
|
|
||
| let result = process_while_resource_is_available_async( | ||
| |item| { | ||
| let processed = processed.clone(); | ||
| async move { | ||
| processed.lock().await.push(item); | ||
| Ok::<(), TestError>(()) | ||
| } | ||
| }, | ||
| items.into_iter(), | ||
| Box::new(tracker), | ||
| ) | ||
| .await; | ||
|
|
||
| assert!(result.is_ok()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And same here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
| match result.unwrap() { | ||
| Progress::Processed(idx) => { | ||
| assert_eq!(idx, 2); | ||
| let processed = processed.lock().await; | ||
| assert_eq!(*processed, vec![1, 2]); | ||
| } | ||
| _ => panic!("Expected Processed"), | ||
| } | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_process_while_resource_is_available_async_completes() { | ||
| let checker = Box::new(NaiveContinuationTracker::default()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me how this asserts independence from a and b like the comment states.