C++: optimize batch read/write paths and aligned table null handling#823
C++: optimize batch read/write paths and aligned table null handling#823ColinLeeo wants to merge 1 commit into
Conversation
69e1658 to
7db1a3a
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #823 +/- ##
===========================================
- Coverage 61.57% 58.28% -3.30%
===========================================
Files 731 733 +2
Lines 45874 47652 +1778
Branches 6880 7407 +527
===========================================
- Hits 28249 27774 -475
- Misses 16614 18684 +2070
- Partials 1011 1194 +183 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR consolidates a large optimization branch into develop: it adds batch decode/write APIs across the C++ Decoder/Encoder hierarchy, multi-value aligned read paths with optional parallel decode, columnar tablet write helpers, SIMD fast paths, and a set of correctness fixes for aligned/table null handling (null TAG segments, null FIELD writes, all-null value pages, sparse aligned columns, repeated logical devices, ValuePageWriter::reset state). It also trims the C wrapper API (drops unused metadata export/tag-filter symbols, then re-adds tag-filter helpers in a different section) and removes several regression tests for behaviors it claims to fix.
Changes:
- Add batch decode/encode/write paths through
Decoder/Encoder/page/chunk writers and aMultiAlignedTimeseriesIndexplus single-device aligned fast-path reader. - Several aligned table fixes (null TAG/FIELD, all-null pages, single-device tablet flag,
ValuePageWriter::reset, double-free of first-page buffers viarelease_cur_page_data). - Build/infra: SIMD option, optional
BUILD_EXAMPLES, mem-stat counters widened to 64-bit, newBlockingQueue, removal of several existing regression tests, license-header punctuation churn in multiple CMake files.
Reviewed changes
Copilot reviewed 118 out of 119 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/src/reader/filter/{and,or}_filter.h, filter.h, time_operator.h | Adds satisfy_batch_time (uses fixed 129-element stack buffer — flagged). |
| cpp/src/encoding/{plain,decoder,encoder,plain_decoder,dictionary_encoder}.h | Batch encode/decode API + dictionary index assignment change (flagged). |
| cpp/src/writer/{value_,time_,}{chunk,page}_writer.{h,cc} | Batch write paths, first-page ownership transfer, larger page buffers. |
| cpp/src/writer/tsfile_table_writer.{h,cc}, tsfile_writer.h | Memoized lowercasing, idempotent close, optional parallel write pool. |
| cpp/src/reader/* | Aligned multi-value batch path, bloom-filter contains, table result-set lifecycle. |
| cpp/src/file/tsfile_io_writer.{h,cc}, restorable_tsfile_io_writer.cc | Recovery cleanup simplified; conditional sync_on_close_ (flagged); chunk-group index for O(1) lookup. |
| cpp/src/file/tsfile_io_reader.h | Device-node cache + multi-SSI alloc. |
| cpp/src/common/allocator/byte_stream.h, alloc_base.h, mem_alloc.cc | Page-mask bitwise modulo + power-of-2 rounding for wrapped buffers (flagged), 64-bit stat counters. |
| cpp/src/common/{tablet,schema,path,global,thread_pool}.* | Single-device flag, string-column uint32_t offsets, Path inlined, config knobs reshuffled. |
| cpp/src/common/container/{bit_map,blocking_queue,byte_buffer}.* | New BlockingQueue, BitMap::may_have_set_bits, bounds asserts. |
| cpp/src/compress/{snappy,lz4,uncompressed}_compressor.* | Safer after_compress ownership handling; Uncompressed now copies. |
| cpp/src/cwrapper/{tsfile_cwrapper.h,arrow_c.cc} | Tag-filter API moved, sliced-Arrow handling reverted (loses prior bug-fix paths). |
| cpp/test/** | Deletes several regression tests (deep path, missing measurement, aligned NULL boundary, dictionary RLE run counts, Arrow slice-with-offset, etc.) and adds new batch/page-boundary tests. |
| python/tsfile/dataset/reader.py + tests | Switches row reads to read_arrow_batch(). |
| cpp/{CMakeLists.txt,examples/**,src/CMakeLists.txt,src/common/CMakeLists.txt,test/CMakeLists.txt} | Build flags, SIMD option, Arrow/Parquet-dependent examples, license-header punctuation regressions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int satisfy_batch_time(const int64_t* times, int count, bool* mask) { | ||
| bool mask_right[129]; | ||
| left_->satisfy_batch_time(times, count, mask); | ||
| right_->satisfy_batch_time(times, count, mask_right); | ||
| int pass = 0; | ||
| for (int i = 0; i < count; ++i) { | ||
| mask[i] = mask[i] || mask_right[i]; | ||
| if (mask[i]) ++pass; | ||
| } | ||
| return pass; | ||
| } |
| int satisfy_batch_time(const int64_t* times, int count, bool* mask) { | ||
| bool mask_right[129]; | ||
| left_->satisfy_batch_time(times, count, mask); | ||
| right_->satisfy_batch_time(times, count, mask_right); | ||
| int pass = 0; | ||
| for (int i = 0; i < count; ++i) { | ||
| mask[i] = mask[i] && mask_right[i]; | ||
| if (mask[i]) ++pass; | ||
| } | ||
| return pass; | ||
| } |
| index_entry_.push_back(value); | ||
| map_size_ = map_size_ + value.length(); | ||
| entry_index_[value] = static_cast<int>(index_entry_.size()) - 1; | ||
| entry_index_[value] = entry_index_.size(); |
| // page_mask_ is used as a bitmask and only works correctly for | ||
| // power-of-2 page sizes. Round up to the next power-of-2 so that | ||
| // (read_pos_ & page_mask_) gives the correct within-page offset and | ||
| // the page-crossing check doesn't misfire on arbitrary buffer sizes. | ||
| uint32_t ps = 1; | ||
| while (ps < (uint32_t)buf_len) ps <<= 1; | ||
| page_size_ = ps; | ||
| page_mask_ = ps - 1; | ||
| head_.store(&wrapped_page_); |
| } else if (RET_FAIL(write_file_footer())) { | ||
| std::cout << "writer file footer error, ret = " << ret << std::endl; | ||
| } else if (RET_FAIL(sync_file())) { | ||
| } else if (g_config_value_.sync_on_close_ && RET_FAIL(sync_file())) { |
| @@ -1,5 +1,5 @@ | |||
| #[[ | |||
| Licensed to the Apache Software Foundation (ASF) under one | |||
| Licensed to the Apache Software Foundation(ASF) under one | |||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| software distributed under the LICENSE is distributed on an |
| file = write_file_new("test/", &error_no); | ||
| ASSERT_TRUE(error_no == RET_FILRET_OPEN_ERR || | ||
| error_no == RET_ALREADY_EXIST); | ||
| ASSERT_EQ(RET_FILRET_OPEN_ERR, error_no); |
| @@ -1320,7 +1112,7 @@ TEST_F(TreeQueryByRowTest, DISABLED_QueryByRowFasterThanManualNext) { | |||
| write_test_file(devices, measurements, num_rows); | |||
|
|
|||
| const int num_iters = 5; | |||
| const double tolerance = 0.2; | |||
| const double tolerance = 0.05; | |||
| uint32_t cur_points = value_page_writer_.get_point_numer(); | ||
| uint32_t page_remaining = | ||
| common::g_config_value_.page_writer_max_point_num_ - cur_points; | ||
| if (page_remaining == 0) { | ||
| if (RET_FAIL(seal_cur_page(false))) { | ||
| return ret; | ||
| } | ||
| page_remaining = | ||
| common::g_config_value_.page_writer_max_point_num_; | ||
| } |
Squashed PR snapshot of the long-lived `final` work, rebased on top of current develop (2a864c5). Combines the original "TsFile C++ batch read/write optimization" (5f12115) snapshot with subsequent build / platform fixes and a follow-up read-path optimization commit (c902b2b). ═════════════════════════════════════════════════════════════════════ Read path ═════════════════════════════════════════════════════════════════════ - Decoder base gains batch APIs (read_batch_int32 / int64 / float / double, skip_*); PLAIN, TS2DIFF, Gorilla decoders implement them. TS2DIFF has block-level peeking so time filters can skip blocks without decoding. Gorilla adds a raw-pointer GorillaBitReader that bypasses ByteStream overhead. - ChunkReader / AlignedChunkReader add *_DECODE_TV_BATCH methods that decode time + value into a TsBlock in one pass, applying batch time filters before append. - AlignedChunkReader supports a multi-value mode: one time chunk + N value chunks decoded in a single pass, sharing the decoded timestamps and filter mask. SingleDeviceTsBlockReader auto-detects same-device measurements via VectorMeasurementColumnContext. - Optional page-level parallel decompression via a DecodeThreadPool when ENABLE_THREADS is set. Page-plan classification (SKIP / FULL_PASS / BOUNDARY) lets a scatter-free memcpy fast path fire when every row passes and no column has nulls. Additional optimizations (from c902b2b, ported from `final`): - Aligned fast path: enable_dense_aligned_fast_path defaults true and compute_dense_row_count falls back to the TimeseriesIndex top-level statistic for single-chunk timeseries (chunk-level stat is omitted during serialization for those). Re-enables the bulk-copy SSI --> caller path that was defensively disabled. - Chunk-level parallel decode: per-column tasks own all that column's pages and write into a per-(col,page) PageDecodedState slot; one wait_all per chunk amortizes thread-pool overhead. Hybrid dispatch in get_next_page_multi — chunk-level for narrow chunks (<= 6 value columns), 4/6 thesis path otherwise to avoid cache thrash. - Per-worker time decoder/compressor pool (via ThreadPool:: current_worker_id) parallelizes the previously-serial time-page decode loop. - Pre-decode int64/float/double values in the parallel worker into ValueColumnState::pending_decoded_values; multi_DECODE_TV_BATCH then memcpys the per-batch slice instead of calling the decoder inline. - Partial-page bulk scatter: bulk-memcpy path now copies min(budget, remaining_in_page) rows from page_time_cursor_ so the tail page of every SSI tsblock takes the memcpy fast path instead of bleeding into the row-by-row scatter loop. - tsblock_max_memory_ 64KB -> 2MB so a 10K-row page fits in one SSI tsblock and bulk_copy_into doesn't fragment into many tiny batches. ═════════════════════════════════════════════════════════════════════ Write path ═════════════════════════════════════════════════════════════════════ - ValuePageWriter gains write_batch / write_string_batch that take timestamp + value + nullness arrays directly, removing the per-value append loop. Tablet exposes set_timestamps / set_column_values / set_column_string_repeated / reset for bulk reuse and switches StringColumn to an Arrow-compatible offset+buffer layout. - TS2DIFFEncoder::flush packs all deltas with a single pack_bits_msb + write_buf instead of per-value write_bits, falling back to the scalar path for the rare bit_width > 56 case. - Int64Statistic::update_batch (NEON-accelerated min/max/sum). ═════════════════════════════════════════════════════════════════════ Encoding / SIMD ═════════════════════════════════════════════════════════════════════ - TS2DIFF batch decode adds AVX2 helpers via SIMDe (already on develop) for both i32 and i64; scalar fallback unchanged. - PLAIN byte-swap path uses ARM NEON (vrev64q_u8 / vrev32q_u8) when available, falling back to __builtin_bswap. - CMakeLists adds ENABLE_SIMD; Release builds turn on -O3 -march=native -flto (off when ASan is on or on Windows/MinGW). ═════════════════════════════════════════════════════════════════════ Allocator / ByteStream / ThreadPool ═════════════════════════════════════════════════════════════════════ - ByteStream caches page_mask_ (= page_size - 1) so the hot path uses a bitmask instead of modulo; wrap_from rounds buffer sizes up to a power of two for correctness. - common::ThreadPool gets a thread_local current_worker_id() accessor (set by worker_loop) and a num_threads() getter, letting callers attach per-worker state without contention. ═════════════════════════════════════════════════════════════════════ Build / platform ═════════════════════════════════════════════════════════════════════ - Linux Release: -march=native + -flto by default, automatically dropped under ASan to keep leak detection accurate. - MSVC / MinGW: replace GCC-only intrinsics, restore lost includes, disable LTO + -march=native there. - Restore tag_filter_create/between, metadata test, and segment behavior; restore cwrapper metadata + tag_filter/batch_size args on table query C APIs that the batch-opt snapshot had dropped. - Disable QueryByRowPerformanceTest and the flaky QueryByRowFasterThanManualNext test. ═════════════════════════════════════════════════════════════════════ Python binding ═════════════════════════════════════════════════════════════════════ - read_series_by_row: pull TsBlocks via Arrow IPC instead of the row-by-row Python loop. Aligns reader query plumbing with develop so the binding sees the same parameter set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
This PR optimizes the C++ TsFile read/write paths for batch and columnar workloads, and fixes several aligned table null-handling issues uncovered while validating the optimized path.
It consolidates the batch decode/write work from the long-lived optimization branch into a reviewable change for
develop.Supersedes #749, #754, and #774.
Main Changes
Decoderhierarchy and implement batch paths for PLAIN, TS2DIFF, and Gorilla.ChunkReaderandAlignedChunkReader, including shared timestamp decoding for aligned multi-value reads.ByteStream, compressor, and page/chunk writer internals used by the optimized paths.Correctness Fixes
ValuePageWriter::reset()so row count and null bitmap state are reset together.Compatibility Notes
cpp/third_party/is left atdevelopstate so existing platform compatibility fixes are preserved.Verification
cmake --build cpp/target/build --target TsFile_Test -j1ctest --test-dir cpp/target/build/test --output-on-failure -R '^TsFileTableReaderTest\.TestNullInTable4$'ctest --test-dir cpp/target/build/test --output-on-failure -j4cd cpp && mvn spotless:checkcd cpp && mvn apache-rat:checkCurrent full C++ test result:
496/496tests pass.