Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 98 additions & 64 deletions src/storage/field_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,17 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Semaphore;
use tracing::{debug, error, warn};

pub const DATASET_STATS_STREAM_NAME: &str = "pstats";
const DATASET_STATS_CUSTOM_PARTITION: &str = "dataset_name";
const MAX_CONCURRENT_FIELD_STATS: usize = 4;
const PARALLEL_FIELD_STATS_MIN_FIELDS: usize = 16;
const MIN_TRACKED_DISTINCT_VALUES_PER_FIELD: usize = 1024;
const MAX_TRACKED_DISTINCT_VALUES_PER_FIELD: usize = 10_000;
const HLL_PRECISION_BITS: u32 = 12;
const HLL_REGISTER_COUNT: usize = 1 << HLL_PRECISION_BITS;
static FIELD_STATS_QUERY_SEMAPHORE: Lazy<Arc<Semaphore>> =
Lazy::new(|| Arc::new(Semaphore::new(MAX_CONCURRENT_FIELD_STATS)));
static FIELD_STATS_RAYON_POOL: Lazy<ThreadPool> = Lazy::new(|| {
ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FIELD_STATS)
.thread_name(|index| format!("field-stats-{index}"))
.build()
.expect("field stats rayon pool should initialize")
Expand Down Expand Up @@ -139,7 +132,7 @@ pub async fn calculate_field_stats(
tenant_id,
)
.await;
log_stats_calculation_time(stream_name, parquet_path, started_at, result.is_ok());
log_stats_calculation_time(stream_name, parquet_path, started_at, &result);

result
}
Expand Down Expand Up @@ -218,20 +211,21 @@ fn log_stats_calculation_time(
stream_name: &str,
parquet_path: &Path,
started_at: Instant,
success: bool,
result: &Result<bool, PostError>,
) {
let parquet_file = parquet_path
.file_name()
.and_then(|filename| filename.to_str())
.unwrap_or("<unknown>");
let elapsed_ms = started_at.elapsed().as_millis();
if success {
if result.is_ok() {
debug!(
"Field stats calculation completed for parquet file {parquet_file} in {elapsed_ms} ms. stream={stream_name}"
);
} else {
let err = result.as_ref().expect_err("result checked as error");
warn!(
"Field stats calculation failed for parquet file {parquet_file} after {elapsed_ms} ms. stream={stream_name}"
"Field stats calculation failed for parquet file {parquet_file} after {elapsed_ms} ms. stream={stream_name}: {err}"
);
}
}
Expand Down Expand Up @@ -260,22 +254,11 @@ async fn collect_all_field_stats_from_parquet(
schema: &Schema,
max_field_statistics: usize,
) -> Result<Vec<FieldStat>, PostError> {
let permit = FIELD_STATS_QUERY_SEMAPHORE
.clone()
.acquire_owned()
.await
.map_err(|e| {
PostError::Invalid(anyhow::anyhow!(
"Failed to acquire field stats query permit: {}",
e
))
})?;
let stream_name = stream_name.to_string();
let parquet_path = parquet_path.to_path_buf();
let schema = schema.clone();

tokio::task::spawn_blocking(move || {
let _permit = permit;
collect_all_field_stats_from_parquet_blocking(
&stream_name,
&parquet_path,
Expand Down Expand Up @@ -335,14 +318,15 @@ fn collect_all_field_stats_from_parquet_blocking(
}
};

let batch_counts = if field_names.len() >= PARALLEL_FIELD_STATS_MIN_FIELDS {
collect_batch_field_counts_parallel(&batch, &field_names)
} else {
collect_batch_field_counts_serial(&batch, &field_names)
};
let batch_field_counts = collect_batch_field_counts_parallel(
stream_name,
&batch,
&field_names,
max_field_statistics,
);

for (field_name, counts) in batch_counts {
merge_field_counts(&mut field_counts, &field_name, counts);
for (field_name, batch_field_count) in batch_field_counts {
merge_field_counts(&mut field_counts, &field_name, batch_field_count);
}
}

Expand All @@ -364,53 +348,53 @@ fn collect_all_field_stats_from_parquet_blocking(
.collect())
}

fn collect_batch_field_counts_serial(
batch: &arrow_array::RecordBatch,
field_names: &[String],
) -> Vec<(String, HashMap<String, i64>)> {
field_names
.iter()
.filter_map(|field_name| collect_field_counts(batch, field_name))
.collect()
}

fn collect_batch_field_counts_parallel(
stream_name: &str,
batch: &arrow_array::RecordBatch,
field_names: &[String],
) -> Vec<(String, HashMap<String, i64>)> {
max_field_statistics: usize,
) -> Vec<(String, FieldCountState)> {
FIELD_STATS_RAYON_POOL.install(|| {
field_names
.par_iter()
.filter_map(|field_name| collect_field_counts(batch, field_name))
.filter_map(|field_name| {
collect_field_counts(stream_name, batch, field_name, max_field_statistics)
})
.collect()
})
}

fn collect_field_counts(
stream_name: &str,
batch: &arrow_array::RecordBatch,
field_name: &str,
) -> Option<(String, HashMap<String, i64>)> {
max_field_statistics: usize,
) -> Option<(String, FieldCountState)> {
let array = batch.column_by_name(field_name)?;
let mut counts = HashMap::new();
let mut field_count = FieldCountState::new_for_batch(
stream_name.to_string(),
field_name.to_string(),
max_field_statistics,
);

for row_index in 0..array.len() {
let value = format_arrow_value(array.as_ref(), row_index);
*counts.entry(value).or_default() += 1;
field_count.record_value(value);
}

Some((field_name.to_string(), counts))
Some((field_name.to_string(), field_count))
}

fn merge_field_counts(
field_counts: &mut HashMap<String, FieldCountState>,
field_name: &str,
counts: HashMap<String, i64>,
batch_field_count: FieldCountState,
) {
let field_total = field_counts
.get_mut(field_name)
.expect("field_counts initialized for each field");

field_total.merge_counts(counts);
field_total.merge_state(batch_field_count);
}

struct FieldCountState {
Expand All @@ -421,10 +405,24 @@ struct FieldCountState {
hll: HyperLogLog,
max_tracked_values: usize,
approximate: bool,
log_cardinality_cap: bool,
}

impl FieldCountState {
fn new(stream_name: String, field_name: String, max_field_statistics: usize) -> Self {
Self::new_with_logging(stream_name, field_name, max_field_statistics, true)
}

fn new_for_batch(stream_name: String, field_name: String, max_field_statistics: usize) -> Self {
Self::new_with_logging(stream_name, field_name, max_field_statistics, false)
}

fn new_with_logging(
stream_name: String,
field_name: String,
max_field_statistics: usize,
log_cardinality_cap: bool,
) -> Self {
Self {
stream_name,
field_name,
Expand All @@ -433,38 +431,68 @@ impl FieldCountState {
hll: HyperLogLog::new(),
max_tracked_values: tracked_distinct_value_limit(max_field_statistics),
approximate: false,
log_cardinality_cap,
}
}

fn record_value(&mut self, value: String) {
self.hll.add(&value);
self.total_count += 1;
self.track_value(value, 1);
}

#[cfg(test)]
fn merge_counts(&mut self, counts: HashMap<String, i64>) {
for (value, count) in counts {
self.hll.add(&value);
self.total_count += count;
self.track_value(value, count);
}
}

if let Some(existing_count) = self.counts.get_mut(&value) {
*existing_count += count;
continue;
}
fn merge_state(&mut self, state: FieldCountState) {
self.hll.merge(&state.hll);
self.total_count += state.total_count;

if self.counts.len() < self.max_tracked_values {
self.counts.insert(value, count);
continue;
}
if state.approximate {
self.mark_approximate();
}

if !self.approximate {
self.approximate = true;
for (value, count) in state.counts {
self.track_value(value, count);
}
}

fn track_value(&mut self, value: String, count: i64) {
if let Some(existing_count) = self.counts.get_mut(&value) {
*existing_count += count;
return;
}

if self.counts.len() < self.max_tracked_values {
self.counts.insert(value, count);
return;
}

self.mark_approximate();

if let Some((min_value, min_count)) = self.current_min_value()
&& count > min_count
{
self.counts.remove(&min_value);
self.counts.insert(value, count);
}
}

fn mark_approximate(&mut self) {
if !self.approximate {
self.approximate = true;
if self.log_cardinality_cap {
debug!(
"Field stats cardinality cap reached for stream {} field {}. Tracking bounded top-value candidates with max_tracked_values={}",
self.stream_name, self.field_name, self.max_tracked_values
);
}

if let Some((min_value, min_count)) = self.current_min_value()
&& count > min_count
{
self.counts.remove(&min_value);
self.counts.insert(value, count);
}
}
}

Expand Down Expand Up @@ -533,6 +561,12 @@ impl HyperLogLog {
self.registers[register_index] = self.registers[register_index].max(rank);
}

fn merge(&mut self, other: &Self) {
for (register, other_register) in self.registers.iter_mut().zip(other.registers.iter()) {
*register = (*register).max(*other_register);
}
}

fn estimate(&self) -> f64 {
let register_count = HLL_REGISTER_COUNT as f64;
let zero_registers = self
Expand Down
Loading
Loading