diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b9087c91c..35f1d7eba 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,10 +16,32 @@ * */ +use crate::catalog::{self, snapshot::Snapshot}; +use crate::event::format::LogSource; +use crate::event::format::LogSourceEntry; +use crate::handlers::DatasetTag; +use crate::handlers::http::fetch_schema; +use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; +use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR}; +use crate::metrics::increment_parquets_stored_by_date; +use crate::metrics::increment_parquets_stored_size_by_date; +use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; +use crate::option::Mode; +use crate::parseable::DEFAULT_TENANT; +use crate::parseable::{LogStream, PARSEABLE, Stream}; +use crate::stats::FullStats; +use crate::storage::SETTINGS_ROOT_DIRECTORY; +use crate::storage::TARGETS_ROOT_DIRECTORY; +use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; +use crate::storage::field_stats::calculate_field_stats; +use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; +use crate::sync::{ACTIVE_OBJECT_STORE_SYNC_FILES, FLUSH_AND_CONVERT_RUNTIME}; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use dashmap::mapref::entry::Entry; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use itertools::Itertools; use object_store::ListResult; @@ -43,29 +65,6 @@ use tokio::task::JoinSet; use tracing::{Instrument, error, info, info_span, warn}; use ulid::Ulid; -use crate::catalog::{self, snapshot::Snapshot}; -use crate::event::format::LogSource; -use crate::event::format::LogSourceEntry; -use crate::handlers::DatasetTag; -use crate::handlers::http::fetch_schema; -use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; -use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR}; -use crate::metrics::increment_parquets_stored_by_date; -use crate::metrics::increment_parquets_stored_size_by_date; -use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; -use crate::option::Mode; -use crate::parseable::DEFAULT_TENANT; -use crate::parseable::{LogStream, PARSEABLE, Stream}; -use crate::stats::FullStats; -use crate::storage::SETTINGS_ROOT_DIRECTORY; -use crate::storage::TARGETS_ROOT_DIRECTORY; -use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; -use crate::storage::field_stats::calculate_field_stats; -use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; -use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES; -use crate::sync::FLUSH_AND_CONVERT_RUNTIME; - use super::{ ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, @@ -1084,22 +1083,20 @@ async fn process_parquet_files( let object_store = PARSEABLE.storage().get_object_store(); // collect all parquet files to upload - let parquet_paths = { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - - let parquet_paths: Vec = upload_context - .stream - .parquet_files() - .into_iter() - .filter(|p| !guard.contains(p)) - .collect(); - - let mut ret = Vec::with_capacity(parquet_paths.len()); - ret.clone_from(&parquet_paths); - guard.extend(parquet_paths); - tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES); - ret - }; + let parquet_paths: Vec = upload_context + .stream + .parquet_files() + .into_par_iter() + .filter_map( + |path| match ACTIVE_OBJECT_STORE_SYNC_FILES.entry(path.clone()) { + Entry::Vacant(entry) => { + entry.insert(Instant::now()); + Some(path) + } + Entry::Occupied(_) => None, + }, + ) + .collect(); let mut total_size: u64 = 0; let mut min_dt: Option> = None; @@ -1196,18 +1193,12 @@ async fn collect_upload_results( "Parquet file upload size validation failed for {:?}, preserving in staging for retry", upload_result.file_path ); - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - guard.remove(&upload_result.file_path); - } + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&upload_result.file_path); } } Ok(Err((path, e))) => { error!("Error uploading parquet file: {e}"); - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - guard.remove(&path); - } + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&path); return Err(e); } Err(e) => { @@ -1272,30 +1263,24 @@ async fn calculate_stats_for_uploaded_files( } async fn cleanup_uploaded_staged_files(uploaded_files: Vec) { - let paths: Vec<_> = uploaded_files - .into_iter() - .map(|uploaded_file| uploaded_file.file_path) - .collect(); - - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - for path in paths.iter() { - guard.remove(path); - } - - // check if file has been in hashset for more than 5 minutes - let now = Utc::now(); - guard.retain(|f| { - !extract_datetime_from_parquet_path_regex(f) - .is_ok_and(|ts| (now - ts).num_minutes() >= 5) - }); + // successfully uploaded files, remove from DashMap + for uploaded_parquet_file in uploaded_files.iter() { + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&uploaded_parquet_file.file_path); } - paths.into_par_iter().for_each(|path| { - if let Err(e) = remove_file(&path) { - warn!("Failed to remove staged file: {e}"); - } + // Use monotonic time to ensure the 5-minute eviction window(cleanup) is immune to system clock adjustments. + let now = Instant::now(); + ACTIVE_OBJECT_STORE_SYNC_FILES.retain(|_, tracked_instant| { + now.duration_since(*tracked_instant) < Duration::from_secs(300) }); + + uploaded_files + .into_par_iter() + .for_each(|uploaded_parquet_file| { + if let Err(e) = remove_file(&uploaded_parquet_file.file_path) { + warn!("Failed to remove staged file: {e}"); + } + }); } /// Processes schema files diff --git a/src/sync.rs b/src/sync.rs index f6f20be48..b6ffe21db 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,17 +17,16 @@ */ use chrono::{TimeDelta, Timelike}; -use datafusion::common::HashSet; +use dashmap::DashMap; use futures::FutureExt; use once_cell::sync::Lazy; use std::collections::HashMap; use std::future::Future; use std::panic::AssertUnwindSafe; use std::path::PathBuf; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::runtime::Runtime; -use tokio::sync::{RwLock, mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinSet; use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; @@ -39,8 +38,8 @@ pub static FLUSH_AND_CONVERT_RUNTIME: Lazy = static LOCAL_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); static REMOTE_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); -pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(HashSet::new()))); +pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy> = + Lazy::new(DashMap::new); /// RAII guard that clears a sync-running flag on drop, so a panic inside the /// sync body cannot leave the flag stuck at `true` and wedge future ticks. struct SyncRunningGuard(&'static AtomicBool);