diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index a68c5f357..0b7e9fa7d 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -76,24 +76,17 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::fs::File; use std::path::Path; -use std::sync::Arc; use std::time::Instant; -use tokio::sync::Semaphore; use tracing::{debug, error, warn}; pub const DATASET_STATS_STREAM_NAME: &str = "pstats"; const DATASET_STATS_CUSTOM_PARTITION: &str = "dataset_name"; -const MAX_CONCURRENT_FIELD_STATS: usize = 4; -const PARALLEL_FIELD_STATS_MIN_FIELDS: usize = 16; const MIN_TRACKED_DISTINCT_VALUES_PER_FIELD: usize = 1024; const MAX_TRACKED_DISTINCT_VALUES_PER_FIELD: usize = 10_000; const HLL_PRECISION_BITS: u32 = 12; const HLL_REGISTER_COUNT: usize = 1 << HLL_PRECISION_BITS; -static FIELD_STATS_QUERY_SEMAPHORE: Lazy> = - Lazy::new(|| Arc::new(Semaphore::new(MAX_CONCURRENT_FIELD_STATS))); static FIELD_STATS_RAYON_POOL: Lazy = Lazy::new(|| { ThreadPoolBuilder::new() - .num_threads(MAX_CONCURRENT_FIELD_STATS) .thread_name(|index| format!("field-stats-{index}")) .build() .expect("field stats rayon pool should initialize") @@ -139,7 +132,7 @@ pub async fn calculate_field_stats( tenant_id, ) .await; - log_stats_calculation_time(stream_name, parquet_path, started_at, result.is_ok()); + log_stats_calculation_time(stream_name, parquet_path, started_at, &result); result } @@ -218,20 +211,21 @@ fn log_stats_calculation_time( stream_name: &str, parquet_path: &Path, started_at: Instant, - success: bool, + result: &Result, ) { let parquet_file = parquet_path .file_name() .and_then(|filename| filename.to_str()) .unwrap_or(""); let elapsed_ms = started_at.elapsed().as_millis(); - if success { + if result.is_ok() { debug!( "Field stats calculation completed for parquet file {parquet_file} in {elapsed_ms} ms. stream={stream_name}" ); } else { + let err = result.as_ref().expect_err("result checked as error"); warn!( - "Field stats calculation failed for parquet file {parquet_file} after {elapsed_ms} ms. stream={stream_name}" + "Field stats calculation failed for parquet file {parquet_file} after {elapsed_ms} ms. stream={stream_name}: {err}" ); } } @@ -260,22 +254,11 @@ async fn collect_all_field_stats_from_parquet( schema: &Schema, max_field_statistics: usize, ) -> Result, PostError> { - let permit = FIELD_STATS_QUERY_SEMAPHORE - .clone() - .acquire_owned() - .await - .map_err(|e| { - PostError::Invalid(anyhow::anyhow!( - "Failed to acquire field stats query permit: {}", - e - )) - })?; let stream_name = stream_name.to_string(); let parquet_path = parquet_path.to_path_buf(); let schema = schema.clone(); tokio::task::spawn_blocking(move || { - let _permit = permit; collect_all_field_stats_from_parquet_blocking( &stream_name, &parquet_path, @@ -335,14 +318,15 @@ fn collect_all_field_stats_from_parquet_blocking( } }; - let batch_counts = if field_names.len() >= PARALLEL_FIELD_STATS_MIN_FIELDS { - collect_batch_field_counts_parallel(&batch, &field_names) - } else { - collect_batch_field_counts_serial(&batch, &field_names) - }; + let batch_field_counts = collect_batch_field_counts_parallel( + stream_name, + &batch, + &field_names, + max_field_statistics, + ); - for (field_name, counts) in batch_counts { - merge_field_counts(&mut field_counts, &field_name, counts); + for (field_name, batch_field_count) in batch_field_counts { + merge_field_counts(&mut field_counts, &field_name, batch_field_count); } } @@ -364,53 +348,53 @@ fn collect_all_field_stats_from_parquet_blocking( .collect()) } -fn collect_batch_field_counts_serial( - batch: &arrow_array::RecordBatch, - field_names: &[String], -) -> Vec<(String, HashMap)> { - field_names - .iter() - .filter_map(|field_name| collect_field_counts(batch, field_name)) - .collect() -} - fn collect_batch_field_counts_parallel( + stream_name: &str, batch: &arrow_array::RecordBatch, field_names: &[String], -) -> Vec<(String, HashMap)> { + max_field_statistics: usize, +) -> Vec<(String, FieldCountState)> { FIELD_STATS_RAYON_POOL.install(|| { field_names .par_iter() - .filter_map(|field_name| collect_field_counts(batch, field_name)) + .filter_map(|field_name| { + collect_field_counts(stream_name, batch, field_name, max_field_statistics) + }) .collect() }) } fn collect_field_counts( + stream_name: &str, batch: &arrow_array::RecordBatch, field_name: &str, -) -> Option<(String, HashMap)> { + max_field_statistics: usize, +) -> Option<(String, FieldCountState)> { let array = batch.column_by_name(field_name)?; - let mut counts = HashMap::new(); + let mut field_count = FieldCountState::new_for_batch( + stream_name.to_string(), + field_name.to_string(), + max_field_statistics, + ); for row_index in 0..array.len() { let value = format_arrow_value(array.as_ref(), row_index); - *counts.entry(value).or_default() += 1; + field_count.record_value(value); } - Some((field_name.to_string(), counts)) + Some((field_name.to_string(), field_count)) } fn merge_field_counts( field_counts: &mut HashMap, field_name: &str, - counts: HashMap, + batch_field_count: FieldCountState, ) { let field_total = field_counts .get_mut(field_name) .expect("field_counts initialized for each field"); - field_total.merge_counts(counts); + field_total.merge_state(batch_field_count); } struct FieldCountState { @@ -421,10 +405,24 @@ struct FieldCountState { hll: HyperLogLog, max_tracked_values: usize, approximate: bool, + log_cardinality_cap: bool, } impl FieldCountState { fn new(stream_name: String, field_name: String, max_field_statistics: usize) -> Self { + Self::new_with_logging(stream_name, field_name, max_field_statistics, true) + } + + fn new_for_batch(stream_name: String, field_name: String, max_field_statistics: usize) -> Self { + Self::new_with_logging(stream_name, field_name, max_field_statistics, false) + } + + fn new_with_logging( + stream_name: String, + field_name: String, + max_field_statistics: usize, + log_cardinality_cap: bool, + ) -> Self { Self { stream_name, field_name, @@ -433,38 +431,68 @@ impl FieldCountState { hll: HyperLogLog::new(), max_tracked_values: tracked_distinct_value_limit(max_field_statistics), approximate: false, + log_cardinality_cap, } } + fn record_value(&mut self, value: String) { + self.hll.add(&value); + self.total_count += 1; + self.track_value(value, 1); + } + + #[cfg(test)] fn merge_counts(&mut self, counts: HashMap) { for (value, count) in counts { self.hll.add(&value); self.total_count += count; + self.track_value(value, count); + } + } - if let Some(existing_count) = self.counts.get_mut(&value) { - *existing_count += count; - continue; - } + fn merge_state(&mut self, state: FieldCountState) { + self.hll.merge(&state.hll); + self.total_count += state.total_count; - if self.counts.len() < self.max_tracked_values { - self.counts.insert(value, count); - continue; - } + if state.approximate { + self.mark_approximate(); + } - if !self.approximate { - self.approximate = true; + for (value, count) in state.counts { + self.track_value(value, count); + } + } + + fn track_value(&mut self, value: String, count: i64) { + if let Some(existing_count) = self.counts.get_mut(&value) { + *existing_count += count; + return; + } + + if self.counts.len() < self.max_tracked_values { + self.counts.insert(value, count); + return; + } + + self.mark_approximate(); + + if let Some((min_value, min_count)) = self.current_min_value() + && count > min_count + { + self.counts.remove(&min_value); + self.counts.insert(value, count); + } + } + + fn mark_approximate(&mut self) { + if !self.approximate { + self.approximate = true; + if self.log_cardinality_cap { debug!( "Field stats cardinality cap reached for stream {} field {}. Tracking bounded top-value candidates with max_tracked_values={}", self.stream_name, self.field_name, self.max_tracked_values ); } - - if let Some((min_value, min_count)) = self.current_min_value() - && count > min_count - { - self.counts.remove(&min_value); - self.counts.insert(value, count); - } } } @@ -533,6 +561,12 @@ impl HyperLogLog { self.registers[register_index] = self.registers[register_index].max(rank); } + fn merge(&mut self, other: &Self) { + for (register, other_register) in self.registers.iter_mut().zip(other.registers.iter()) { + *register = (*register).max(*other_register); + } + } + fn estimate(&self) -> f64 { let register_count = HLL_REGISTER_COUNT as f64; let zero_registers = self diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 9c9089faa..b9087c91c 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -98,10 +98,15 @@ pub(crate) struct UploadResult { manifest_file: Option, } +struct UploadedParquetFile { + file_path: std::path::PathBuf, + manifest_file: catalog::manifest::File, +} + /// Handles the upload of a single parquet file #[tracing::instrument( name = "object_store.upload_single_parquet", - skip(store, schema), + skip(store), fields(stream = %stream_name, tenant = ?tenant_id, path = %path.display(), key = %stream_relative_path) )] async fn upload_single_parquet_file( @@ -109,7 +114,6 @@ async fn upload_single_parquet_file( path: std::path::PathBuf, stream_relative_path: String, stream_name: String, - schema: Arc, tenant_id: Option, ) -> Result { let filename = path @@ -185,11 +189,6 @@ async fn upload_single_parquet_file( let manifest = catalog::create_from_parquet_file(absolute_path, &path) .map_err(|e| (path.clone(), ObjectStorageError::from(e)))?; - if PARSEABLE.options.collect_dataset_stats { - // collect field stats if enabled - calculate_stats_if_enabled(&stream_name, &path, &schema, tenant_id).await; - } - Ok(UploadResult { file_path: path, manifest_file: Some(manifest), @@ -1038,11 +1037,22 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let upload_context = UploadContext::new(stream); // Process parquet files concurrently and collect results - let manifest_files = + let uploaded_files = process_parquet_files(&upload_context, stream_name, tenant_id.clone()).await?; // Update snapshot with collected manifest files - update_snapshot_with_manifests(stream_name, manifest_files, &tenant_id).await?; + update_snapshot_with_manifests(stream_name, &uploaded_files, &tenant_id).await?; + + // Calculate stats after snapshot update so uploads are visible before stats work starts. + calculate_stats_for_uploaded_files( + &uploaded_files, + stream_name, + &upload_context, + &tenant_id, + ) + .await; + + cleanup_uploaded_staged_files(uploaded_files).await; // Process schema files process_schema_files(&upload_context, stream_name, &tenant_id).await?; @@ -1068,7 +1078,7 @@ async fn process_parquet_files( upload_context: &UploadContext, stream_name: &str, tenant_id: Option, -) -> Result, ObjectStorageError> { +) -> Result, ObjectStorageError> { let semaphore = Arc::new(tokio::sync::Semaphore::new(100)); let mut join_set = JoinSet::new(); let object_store = PARSEABLE.storage().get_object_store(); @@ -1154,21 +1164,13 @@ async fn spawn_parquet_upload_task( ); let stream_name = stream_name.to_string(); - let schema = upload_context.schema.clone(); let handle = FLUSH_AND_CONVERT_RUNTIME.handle(); join_set.spawn_on( async move { let _permit = semaphore.acquire().await.expect("semaphore is not closed"); - upload_single_parquet_file( - store, - path, - stream_relative_path, - stream_name, - schema, - tenant_id, - ) - .await + upload_single_parquet_file(store, path, stream_relative_path, stream_name, tenant_id) + .await }, handle, ); @@ -1177,14 +1179,17 @@ async fn spawn_parquet_upload_task( /// Collects results from all upload tasks async fn collect_upload_results( mut join_set: JoinSet>, -) -> Result, ObjectStorageError> { +) -> Result, ObjectStorageError> { let mut uploaded_files = Vec::new(); while let Some(result) = join_set.join_next().await { match result { Ok(Ok(upload_result)) => { if let Some(manifest_file) = upload_result.manifest_file { - uploaded_files.push((upload_result.file_path, manifest_file)); + uploaded_files.push(UploadedParquetFile { + file_path: upload_result.file_path, + manifest_file, + }); } else { // File failed in upload size validation, preserve staging file for retry error!( @@ -1212,10 +1217,69 @@ async fn collect_upload_results( } } - // successfully uploaded files, remove from in-mem hashset + Ok(uploaded_files) +} + +/// Updates snapshot with collected manifest files +async fn update_snapshot_with_manifests( + stream_name: &str, + uploaded_files: &[UploadedParquetFile], + tenant_id: &Option, +) -> Result<(), ObjectStorageError> { + let manifest_files: Vec<_> = uploaded_files + .iter() + .map(|uploaded_file| uploaded_file.manifest_file.clone()) + .collect(); + + if !manifest_files.is_empty() { + catalog::update_snapshot(stream_name, manifest_files, tenant_id).await?; + } + Ok(()) +} + +async fn calculate_stats_for_uploaded_files( + uploaded_files: &[UploadedParquetFile], + stream_name: &str, + upload_context: &UploadContext, + tenant_id: &Option, +) { + if !PARSEABLE.options.collect_dataset_stats { + return; + } + + let mut join_set = JoinSet::new(); + let handle = FLUSH_AND_CONVERT_RUNTIME.handle(); + + for uploaded_file in uploaded_files { + let stream_name = stream_name.to_string(); + let path = uploaded_file.file_path.clone(); + let schema = upload_context.schema.clone(); + let tenant_id = tenant_id.clone(); + + join_set.spawn_on( + async move { + calculate_stats_if_enabled(&stream_name, &path, &schema, tenant_id).await; + }, + handle, + ); + } + + while let Some(result) = join_set.join_next().await { + if let Err(err) = result { + warn!("Field stats task join failed after parquet upload: {err}"); + } + } +} + +async fn cleanup_uploaded_staged_files(uploaded_files: Vec) { + let paths: Vec<_> = uploaded_files + .into_iter() + .map(|uploaded_file| uploaded_file.file_path) + .collect(); + { let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - for (path, _) in uploaded_files.iter() { + for path in paths.iter() { guard.remove(path); } @@ -1226,29 +1290,12 @@ async fn collect_upload_results( .is_ok_and(|ts| (now - ts).num_minutes() >= 5) }); } - let manifest_files: Vec<_> = uploaded_files - .into_par_iter() - .map(|(path, manifest_file)| { - if let Err(e) = remove_file(&path) { - warn!("Failed to remove staged file: {e}"); - } - manifest_file - }) - .collect(); - - Ok(manifest_files) -} -/// Updates snapshot with collected manifest files -async fn update_snapshot_with_manifests( - stream_name: &str, - manifest_files: Vec, - tenant_id: &Option, -) -> Result<(), ObjectStorageError> { - if !manifest_files.is_empty() { - catalog::update_snapshot(stream_name, manifest_files, tenant_id).await?; - } - Ok(()) + paths.into_par_iter().for_each(|path| { + if let Err(e) = remove_file(&path) { + warn!("Failed to remove staged file: {e}"); + } + }); } /// Processes schema files