diff --git a/include/splinterdb/splinterdb.h b/include/splinterdb/splinterdb.h index fdb35d4e..abb36455 100644 --- a/include/splinterdb/splinterdb.h +++ b/include/splinterdb/splinterdb.h @@ -146,6 +146,13 @@ typedef struct splinterdb_config { // work to be performed on foreground threads, increasing tail // latencies. uint64 queue_scale_percent; + + // Soft byte budget for extent read-ahead in range scans and compactions, + // divided across the branches being merged. Roughly the storage's + // bandwidth-delay product (bandwidth x latency); raise it for higher-latency + // devices such as networked/cloud volumes. Zero selects a default suited to + // local SSDs. + uint64 prefetch_budget; } splinterdb_config; /////////////////////////////////////// diff --git a/src/btree.c b/src/btree.c index 5209bcbc..6299f92b 100644 --- a/src/btree.c +++ b/src/btree.c @@ -2661,13 +2661,14 @@ static inline uint64 btree_iterator_curr_live_prev_addr(btree_iterator *itor) { if (!btree_iterator_curr_is_copy(itor)) { - return itor->curr.hdr->prev_addr; + return btree_node_prev_addr(itor->cfg, &itor->curr, itor->page_type); } btree_node live_curr; live_curr.addr = itor->curr.addr; btree_node_get(itor->cc, itor->cfg, &live_curr, itor->page_type); - uint64 prev_addr = live_curr.hdr->prev_addr; + uint64 prev_addr = + btree_node_prev_addr(itor->cfg, &live_curr, itor->page_type); btree_node_unget(itor->cc, itor->cfg, &live_curr); return prev_addr; } @@ -2686,16 +2687,18 @@ btree_iterator_end_key_beyond_curr(btree_iterator *itor) uint64 num_entries = btree_num_entries(itor->curr.hdr); if (key_is_positive_infinity(itor->max_key)) { - return itor->curr.hdr->next_addr != 0; + return btree_node_next_addr(itor->cfg, &itor->curr, itor->page_type) != 0; } if (num_entries == 0 || itor->height > btree_height(itor->curr.hdr)) { - return num_entries == 0 && itor->curr.hdr->next_addr != 0; + return num_entries == 0 + && btree_node_next_addr(itor->cfg, &itor->curr, itor->page_type) + != 0; } key last_key = btree_iterator_get_node_key(itor, itor->curr.hdr, num_entries - 1); return btree_key_compare(itor->cfg, itor->max_key, last_key) > 0 - && itor->curr.hdr->next_addr != 0; + && btree_node_next_addr(itor->cfg, &itor->curr, itor->page_type) != 0; } static void @@ -2841,6 +2844,237 @@ btree_iterator_find_end_addr_async(btree_iterator_async_state *state, async_return(state); } +/* + * ---------------------------------------------------------------------------- + * btree_prefetch_cursor -- + * + * Bidirectional extent-prefetcher for a btree_iterator. It reads extent + * addresses ahead of (or behind) the iterator from the branch's + * mini_allocator (via a mini_meta_cursor) and issues cache_prefetch for + * them, keeping ~lookahead leaf extents of IO in flight. Within the + * iterator's level (batch), the extents are in key order, so the cursor + * advances in lockstep with consumption. Internal-node extents are skipped; + * blob extents are prefetched for height-0 scans. See btree_prefetch_cursor + * in btree.h. + * ---------------------------------------------------------------------------- + */ +static inline uint64 +btree_extent_base_addr(cache *cc, uint64 addr) +{ + allocator *al = cache_get_allocator(cc); + return allocator_config_extent_base_addr(allocator_get_config(al), addr); +} + +static inline mini_meta_cursor_status +btree_prefetch_cursor_step(btree_prefetch_cursor *pf, + uint64 *extent_addr, + uint64 *batch) +{ + mini_meta_cursor_status status = + pf->going_forward ? mini_meta_cursor_next(&pf->meta_cursor) + : mini_meta_cursor_prev(&pf->meta_cursor); + if (status == MINI_META_CURSOR_ENTRY) { + mini_meta_cursor_curr(&pf->meta_cursor, extent_addr, batch); + } + return status; +} + +/* + * Issue prefetches until ~depth leaf extents are in flight, or the stream is + * exhausted. Leaf extents count toward the depth; blob extents in the window + * are prefetched but not counted; internal-node extents are skipped. Forward + * fill stops at end_addr. Backward fill stops at the beginning of the extent + * stream. Non-blocking: if a meta page needed to read further ahead isn't + * resident yet, fill stops early (a single-page prefetch was issued) and + * resumes later. + */ +static void +btree_prefetch_cursor_fill(btree_iterator *itor) +{ + btree_prefetch_cursor *pf = &itor->prefetch; + while (!pf->at_end && pf->prefetched_ahead < pf->depth) { + uint64 extent_addr; + uint64 batch; + mini_meta_cursor_status status = + btree_prefetch_cursor_step(pf, &extent_addr, &batch); + if (status == MINI_META_CURSOR_WOULD_BLOCK) { + break; + } + if (status == MINI_META_CURSOR_END) { + pf->at_end = TRUE; + break; + } + if (batch == pf->leaf_batch) { + cache_prefetch(itor->cc, extent_addr, itor->page_type); + pf->prefetched_ahead++; + if (pf->going_forward + && btree_addrs_share_extent(itor->cc, extent_addr, itor->end_addr)) + { + pf->at_end = TRUE; + } + } else if (pf->prefetch_blobs && batch < NUM_BLOB_BATCHES) { + cache_prefetch(itor->cc, extent_addr, PAGE_TYPE_BLOB); + } + } +} + +/* + * Try to position the (PRIMING) cursor on the iterator's current leaf extent. + * Non-blocking: kicks off meta-page IO and leaves the cursor PRIMING when the + * page is not resident yet. Reads the current leaf's meta_page_addr every call, + * so it positions correctly even if the iterator advanced while priming. Fill + * starts by moving in the scan direction, so the current extent is not + * prefetched. Returns TRUE iff the cursor just became ACTIVE. + */ +static bool32 +btree_prefetch_cursor_pump(btree_iterator *itor) +{ + btree_prefetch_cursor *pf = &itor->prefetch; + + uint64 meta_page_addr = itor->curr.hdr->meta_page_addr; + if (meta_page_addr == 0) { + // No extent list is available; use the header extent links. + pf->state = BTREE_PREFETCH_DISABLED; + return FALSE; + } + + mini_meta_cursor_deinit(&pf->meta_cursor); + uint64 cur_extent = btree_extent_base_addr(itor->cc, itor->curr.addr); + mini_meta_cursor_status status = mini_meta_cursor_init( + &pf->meta_cursor, itor->cc, itor->page_type, meta_page_addr, cur_extent); + if (status == MINI_META_CURSOR_WOULD_BLOCK) { + return FALSE; + } + platform_assert(status == MINI_META_CURSOR_ENTRY); + + pf->state = BTREE_PREFETCH_ACTIVE; + pf->at_end = FALSE; + pf->prefetched_ahead = 0; + pf->depth = BTREE_PREFETCH_RAMP_MIN; + btree_prefetch_cursor_fill(itor); + return TRUE; +} + +/* + * Called when the iterator crosses into a new leaf extent while the cursor is + * ACTIVE: account for the consumed extent, ramp the depth toward the configured + * cap, and refill the lookahead window. + */ +static void +btree_prefetch_cursor_on_boundary(btree_iterator *itor) +{ + btree_prefetch_cursor *pf = &itor->prefetch; + if (pf->prefetched_ahead > 0) { + pf->prefetched_ahead--; + } + if (pf->depth < pf->lookahead) { + pf->depth *= 2; + if (pf->depth > pf->lookahead) { + pf->depth = pf->lookahead; + } + } + btree_prefetch_cursor_fill(itor); +} + +static inline bool32 +btree_iterator_prefetch_enabled(btree_iterator *itor) +{ + return itor->prefetch.lookahead > 0; +} + +/* + * (Re)start deep prefetch at the iterator's current leaf. Non-blocking: kicks + * off meta-page IO and leaves the cursor PRIMING unless the meta page is + * already resident. Leaves the deep cursor DISABLED when deep prefetch does not + * apply; the header extent links may still prefetch one extent. + */ +static void +btree_prefetch_cursor_start(btree_iterator *itor, bool32 going_forward) +{ + btree_prefetch_cursor *pf = &itor->prefetch; + + mini_meta_cursor_deinit(&pf->meta_cursor); + pf->state = BTREE_PREFETCH_DISABLED; + pf->going_forward = going_forward; + pf->at_end = FALSE; + pf->prefetched_ahead = 0; + pf->depth = BTREE_PREFETCH_RAMP_MIN; + pf->leaf_batch = NUM_BLOB_BATCHES + itor->height; + pf->prefetch_blobs = (itor->height == 0); + + if (!btree_iterator_prefetch_enabled(itor) || pf->lookahead <= 1 + || itor->page_type != PAGE_TYPE_BRANCH || itor->curr.page == NULL) + { + return; + } + + pf->state = BTREE_PREFETCH_PRIMING; + btree_prefetch_cursor_pump(itor); +} + +/* Release the cursor's resources and turn it off. */ +static void +btree_prefetch_cursor_deinit(btree_iterator *itor) +{ + btree_prefetch_cursor *pf = &itor->prefetch; + mini_meta_cursor_deinit(&pf->meta_cursor); + pf->state = BTREE_PREFETCH_DISABLED; +} + +/* + * Drive prefetching after the iterator advances one leaf, in either direction. + * On a direction change, restarts the cursor in the new direction (resetting + * the ramp). Otherwise pumps the non-blocking prime while PRIMING. Deep + * prefetch refills after an extent-boundary crossing; header-link prefetch runs + * after a crossing or direction restart, when the adjacent extent link is + * useful and not just a duplicate prefetch for the current extent. + */ +static void +btree_iterator_prefetch_on_advance(btree_iterator *itor, + uint64 last_addr, + bool32 going_forward) +{ + cache *cc = itor->cc; + btree_prefetch_cursor *pf = &itor->prefetch; + bool32 restarted = FALSE; + + // Direction change: restart cursor in the new direction, resetting ramp. + if (pf->state != BTREE_PREFETCH_DISABLED + && pf->going_forward != going_forward) + { + btree_prefetch_cursor_start(itor, going_forward); + restarted = TRUE; + } + + bool32 positioned_now = FALSE; + if (pf->state == BTREE_PREFETCH_PRIMING) { + positioned_now = btree_prefetch_cursor_pump(itor); + } + bool32 crossed_extent = + !btree_addrs_share_extent(cc, last_addr, itor->curr.addr); + if (pf->state == BTREE_PREFETCH_ACTIVE) { + if (crossed_extent && !positioned_now && !restarted) { + btree_prefetch_cursor_on_boundary(itor); + } + } else if (btree_iterator_prefetch_enabled(itor) + && (crossed_extent || restarted)) + { + uint64 extent_addr = going_forward + ? btree_node_next_extent_addr( + itor->cfg, &itor->curr, itor->page_type) + : btree_node_prev_extent_addr( + itor->cfg, &itor->curr, itor->page_type); + + if (extent_addr != 0 + && (!going_forward + || !btree_addrs_share_extent( + cc, itor->curr.addr, itor->end_addr))) + { + cache_prefetch(cc, extent_addr, itor->page_type); + } + } +} + /* * ---------------------------------------------------------------------------- * Move to the next leaf when we've reached the end of one leaf but @@ -2850,26 +3084,15 @@ btree_iterator_find_end_addr_async(btree_iterator_async_state *state, static void btree_iterator_next_leaf(btree_iterator *itor) { - cache *cc = itor->cc; - uint64 last_addr = itor->curr.addr; - uint64 next_addr = itor->curr.hdr->next_addr; + uint64 next_addr = + btree_node_next_addr(itor->cfg, &itor->curr, itor->page_type); btree_iterator_release_curr(itor); btree_iterator_get_curr_addr(itor, next_addr); itor->idx = 0; itor->curr_min_idx = -1; - // To prefetch: - // 1. we just moved from one extent to the next - // 2. this can't be the last extent - if (itor->do_prefetch - && !btree_addrs_share_extent(cc, last_addr, itor->curr.addr) - && itor->curr.hdr->next_extent_addr != 0 - && !btree_addrs_share_extent(cc, itor->curr.addr, itor->end_addr)) - { - // IO prefetch the next extent - cache_prefetch(cc, itor->curr.hdr->next_extent_addr, itor->page_type); - } + btree_iterator_prefetch_on_advance(itor, last_addr, TRUE); } static async_status @@ -2878,7 +3101,8 @@ btree_iterator_next_leaf_async(btree_iterator_async_state *state, uint64 depth) async_begin(state, depth); state->last_addr = state->itor->curr.addr; - state->next_addr = state->itor->curr.hdr->next_addr; + state->next_addr = btree_node_next_addr( + state->itor->cfg, &state->itor->curr, state->itor->page_type); btree_iterator_release_curr(state->itor); state->itor->curr.addr = state->next_addr; @@ -2898,21 +3122,8 @@ btree_iterator_next_leaf_async(btree_iterator_async_state *state, uint64 depth) state->itor->idx = 0; state->itor->curr_min_idx = -1; - // To prefetch: - // 1. we just moved from one extent to the next - // 2. this can't be the last extent - if (state->itor->do_prefetch - && !btree_addrs_share_extent( - state->itor->cc, state->last_addr, state->itor->curr.addr) - && state->itor->curr.hdr->next_extent_addr != 0 - && !btree_addrs_share_extent( - state->itor->cc, state->itor->curr.addr, state->itor->end_addr)) - { - // IO prefetch the next extent - cache_prefetch(state->itor->cc, - state->itor->curr.hdr->next_extent_addr, - state->itor->page_type); - } + // Prefetching is all non-blocking, so it needs no awaits here. + btree_iterator_prefetch_on_advance(state->itor, state->last_addr, TRUE); async_return(state); } @@ -2927,7 +3138,7 @@ btree_iterator_prev_leaf(btree_iterator *itor) { const btree_config *cfg = itor->cfg; - debug_only uint64 curr_addr = itor->curr.addr; + uint64 last_addr = itor->curr.addr; /* * Copied nodes can have stale prev_addr values. Read the live current node * before moving backward so predecessor splits are not skipped. @@ -2941,8 +3152,10 @@ btree_iterator_prev_leaf(btree_iterator *itor) * old curr node and the new one. In this case, we can just walk * forward until we find the leaf whose successor is our old leaf. */ - while (itor->curr.hdr->next_addr != curr_addr) { - uint64 next_addr = itor->curr.hdr->next_addr; + while (btree_node_next_addr(cfg, &itor->curr, itor->page_type) != last_addr) + { + uint64 next_addr = + btree_node_next_addr(cfg, &itor->curr, itor->page_type); btree_iterator_release_curr(itor); btree_iterator_get_curr_addr(itor, next_addr); } @@ -2958,24 +3171,13 @@ btree_iterator_prev_leaf(btree_iterator *itor) itor->curr_min_idx = find_key_in_node( itor, itor->curr.hdr, itor->min_key, itor->min_key_comparison, NULL); } - if (itor->curr.hdr->prev_addr == 0 && itor->curr_min_idx == -1) { + if (btree_node_prev_addr(cfg, &itor->curr, itor->page_type) == 0 + && itor->curr_min_idx == -1) + { itor->curr_min_idx = 0; } - // FIXME: To prefetch: - // 1. we just moved from one extent to the next - // 2. this can't be the last extent - /* if (itor->do_prefetch */ - /* && !btree_addrs_share_extent(cc, last_addr, itor->curr.addr) */ - /* && itor->curr.hdr->next_extent_addr != 0 */ - /* && !btree_addrs_share_extent(cc, itor->curr.addr, itor->end_addr)) - */ - /* { */ - /* // IO prefetch the next extent */ - /* cache_prefetch(cc, itor->curr.hdr->next_extent_addr, - * itor->page_type); - */ - /* } */ + btree_iterator_prefetch_on_advance(itor, last_addr, FALSE); } static async_status @@ -2997,10 +3199,12 @@ btree_iterator_prev_leaf_async(btree_iterator_async_state *state, uint64 depth) state->live_curr.page = cache_get_async_state_result(&state->cache_get_state); state->live_curr.hdr = (btree_hdr *)state->live_curr.page->data; - state->prev_addr = state->live_curr.hdr->prev_addr; + state->prev_addr = btree_node_prev_addr( + state->itor->cfg, &state->live_curr, state->itor->page_type); btree_node_unget(state->itor->cc, state->itor->cfg, &state->live_curr); } else { - state->prev_addr = state->itor->curr.hdr->prev_addr; + state->prev_addr = btree_node_prev_addr( + state->itor->cfg, &state->itor->curr, state->itor->page_type); } btree_iterator_release_curr(state->itor); state->itor->curr.addr = state->prev_addr; @@ -3023,8 +3227,12 @@ btree_iterator_prev_leaf_async(btree_iterator_async_state *state, uint64 depth) * old curr node and the new one. In this case, we can just walk * forward until we find the leaf whose successor is our old leaf. */ - while (state->itor->curr.hdr->next_addr != state->curr_addr) { - state->next_addr = state->itor->curr.hdr->next_addr; + while (btree_node_next_addr( + state->itor->cfg, &state->itor->curr, state->itor->page_type) + != state->curr_addr) + { + state->next_addr = btree_node_next_addr( + state->itor->cfg, &state->itor->curr, state->itor->page_type); btree_iterator_release_curr(state->itor); state->itor->curr.addr = state->next_addr; @@ -3062,25 +3270,16 @@ btree_iterator_prev_leaf_async(btree_iterator_async_state *state, uint64 depth) state->itor->min_key_comparison, NULL); } - if (state->itor->curr.hdr->prev_addr == 0 && state->itor->curr_min_idx == -1) + if (btree_node_prev_addr( + state->itor->cfg, &state->itor->curr, state->itor->page_type) + == 0 + && state->itor->curr_min_idx == -1) { state->itor->curr_min_idx = 0; } - // FIXME: To prefetch: - // 1. we just moved from one extent to the next - // 2. this can't be the last extent - /* if (itor->do_prefetch */ - /* && !btree_addrs_share_extent(cc, last_addr, itor->curr.addr) */ - /* && itor->curr.hdr->next_extent_addr != 0 */ - /* && !btree_addrs_share_extent(cc, itor->curr.addr, itor->end_addr)) - */ - /* { */ - /* // IO prefetch the next extent */ - /* cache_prefetch(cc, itor->curr.hdr->next_extent_addr, - * itor->page_type); - */ - /* } */ + // Prefetching is non-blocking, so no awaits needed here. + btree_iterator_prefetch_on_advance(state->itor, state->curr_addr, FALSE); async_return(state); } @@ -3239,7 +3438,9 @@ find_btree_node_and_get_idx_bounds(btree_iterator *itor, itor->curr_min_idx = !found && tmp == 0 ? tmp - 1 : tmp; // if min_key is not within the current node but there is no previous node // then set curr_min_idx to 0 - if (itor->curr_min_idx == -1 && itor->curr.hdr->prev_addr == 0) { + if (itor->curr_min_idx == -1 + && btree_node_prev_addr(itor->cfg, &itor->curr, itor->page_type) == 0) + { itor->curr_min_idx = 0; } @@ -3329,7 +3530,10 @@ find_btree_node_and_get_idx_bounds_async(btree_iterator_async_state *state, !state->found && state->tmp == 0 ? state->tmp - 1 : state->tmp; // if min_key is not within the current node but there is no previous node // then set curr_min_idx to 0 - if (state->itor->curr_min_idx == -1 && state->itor->curr.hdr->prev_addr == 0) + if (state->itor->curr_min_idx == -1 + && btree_node_prev_addr( + state->itor->cfg, &state->itor->curr, state->itor->page_type) + == 0) { state->itor->curr_min_idx = 0; } @@ -3390,6 +3594,10 @@ btree_iterator_seek(iterator *base_itor, comparison seek_type, key seek_key) find_btree_node_and_get_idx_bounds(itor, seek_key, seek_type); } + // The iterator may have repositioned; re-anchor the prefetch cursor so a + // subsequent forward scan prefetches from the new location. + btree_prefetch_cursor_start(itor, TRUE); + return STATUS_OK; } @@ -3443,9 +3651,9 @@ btree_iterator_init_common(cache *cc, comparison max_key_comparison, key max_key, key start_key, - bool32 do_prefetch, bool32 copy_nodes, uint32 height, + uint32 prefetch_lookahead, key *normalized_start_key) { platform_assert(root_addr != 0); @@ -3475,7 +3683,6 @@ btree_iterator_init_common(cache *cc, itor->cc = cc; itor->cfg = cfg; itor->root_addr = root_addr; - itor->do_prefetch = do_prefetch; itor->height = height; itor->copy_nodes = copy_nodes; itor->min_key_comparison = min_key_comparison; @@ -3484,6 +3691,7 @@ btree_iterator_init_common(cache *cc, itor->max_key = max_key; itor->page_type = page_type; itor->super.ops = &btree_iterator_ops; + itor->prefetch.lookahead = prefetch_lookahead; if (copy_nodes) { itor->node_copy = TYPED_MANUAL_MALLOC( PROCESS_PRIVATE_HEAP_ID, itor->node_copy, btree_page_size(itor->cfg)); @@ -3509,9 +3717,9 @@ btree_iterator_init(cache *cc, key max_key, comparison start_type, key start_key, - bool32 do_prefetch, bool32 copy_nodes, - uint32 height) + uint32 height, + uint32 prefetch_lookahead) { platform_status rc = btree_iterator_init_common(cc, cfg, @@ -3523,9 +3731,9 @@ btree_iterator_init(cache *cc, max_key_comparison, max_key, start_key, - do_prefetch, copy_nodes, height, + prefetch_lookahead, &start_key); if (!SUCCESS(rc)) { return rc; @@ -3533,11 +3741,16 @@ btree_iterator_init(cache *cc, find_btree_node_and_get_idx_bounds(itor, start_key, start_type); - if (itor->do_prefetch && itor->curr.hdr->next_extent_addr != 0 + btree_prefetch_cursor_start(itor, TRUE); + // While the deep cursor is priming or disabled, keep the next forward extent + // warm when the leaf header names one. + uint64 next_extent_addr = + btree_node_next_extent_addr(itor->cfg, &itor->curr, itor->page_type); + if (itor->prefetch.state != BTREE_PREFETCH_ACTIVE + && btree_iterator_prefetch_enabled(itor) && next_extent_addr != 0 && !btree_addrs_share_extent(cc, itor->curr.addr, itor->end_addr)) { - // IO prefetch the next extent - cache_prefetch(cc, itor->curr.hdr->next_extent_addr, itor->page_type); + cache_prefetch(cc, next_extent_addr, itor->page_type); } debug_assert(!iterator_can_curr((iterator *)itor) @@ -3561,9 +3774,9 @@ btree_iterator_init_async(btree_iterator_async_state *state) state->max_key_comparison, state->max_key, state->start_key, - state->do_prefetch, state->copy_nodes, state->height, + state->prefetch_lookahead, &state->target); if (!SUCCESS(rc)) { async_return(state, rc); @@ -3573,14 +3786,17 @@ btree_iterator_init_async(btree_iterator_async_state *state) async_await_subroutine(state, find_btree_node_and_get_idx_bounds_async); btree_iterator_copy_curr_if_needed(state->itor); - if (state->itor->do_prefetch && state->itor->curr.hdr->next_extent_addr != 0 + btree_prefetch_cursor_start(state->itor, TRUE); + // While the deep cursor is priming or disabled, keep the next forward extent + // warm when the leaf header names one. + uint64 next_extent_addr = btree_node_next_extent_addr( + state->itor->cfg, &state->itor->curr, state->itor->page_type); + if (state->itor->prefetch.state != BTREE_PREFETCH_ACTIVE + && btree_iterator_prefetch_enabled(state->itor) && next_extent_addr != 0 && !btree_addrs_share_extent( state->cc, state->itor->curr.addr, state->itor->end_addr)) { - // IO prefetch the next extent - cache_prefetch(state->cc, - state->itor->curr.hdr->next_extent_addr, - state->itor->page_type); + cache_prefetch(state->cc, next_extent_addr, state->itor->page_type); } debug_assert(!iterator_can_curr((iterator *)state->itor) @@ -3599,6 +3815,7 @@ void btree_iterator_deinit(btree_iterator *itor) { debug_assert(itor != NULL); + btree_prefetch_cursor_deinit(itor); btree_iterator_release_curr(itor); if (itor->node_copy != NULL) { platform_free(PROCESS_PRIVATE_HEAP_ID, itor->node_copy); @@ -3610,16 +3827,12 @@ btree_iterator_deinit(btree_iterator *itor) * B-tree packing functions ****************************/ -// generation number isn't used in packed btrees +// Branch nodes use the header union for meta_page_addr. static inline void -btree_pack_node_init_hdr(const btree_config *cfg, - btree_hdr *hdr, - uint64 next_extent, - uint8 height) +btree_pack_node_init_hdr(const btree_config *cfg, btree_hdr *hdr, uint8 height) { btree_init_hdr(cfg, hdr); - hdr->next_extent_addr = next_extent; - hdr->height = height; + hdr->height = height; } static inline void @@ -3629,6 +3842,7 @@ btree_pack_setup_start(btree_pack_req *req) ZERO_ARRAY(req->edge); ZERO_ARRAY(req->edge_stats); ZERO_ARRAY(req->num_edges); + ZERO_ARRAY(req->level_has_nodes); // we create a root here, but we won't build it with the rest // of the tree, we'll copy into it at the end @@ -3665,16 +3879,12 @@ btree_pack_create_next_node(btree_pack_req *req, uint64 height, key pivot); * Add the specified node to its parent. Creates a parent if necessary. */ static inline void -btree_pack_link_node(btree_pack_req *req, - uint64 height, - uint64 offset, - uint64 next_extent_addr) +btree_pack_link_node(btree_pack_req *req, uint64 height, uint64 offset) { btree_node *edge = &req->edge[height][offset]; btree_pivot_stats *edge_stats = &req->edge_stats[height][offset]; key pivot = height ? btree_get_pivot(req->cfg, edge->hdr, 0) : btree_get_tuple_key(req->cfg, edge->hdr, 0); - edge->hdr->next_extent_addr = next_extent_addr; btree_node_unlock(req->cc, req->cfg, edge); btree_node_unclaim(req->cc, req->cfg, edge); // Cannot fully unlock edge yet because the key "pivot" may point into it. @@ -3706,10 +3916,17 @@ btree_pack_link_node(btree_pack_req *req, static inline void btree_pack_link_extent(btree_pack_req *req, uint64 height, + bool32 last_extent_in_level, uint64 next_extent_addr) { + debug_assert(0 < req->num_edges[height]); + if (last_extent_in_level) { + btree_hdr_set_last_in_level( + req->edge[height][req->num_edges[height] - 1].hdr); + } for (int i = 0; i < req->num_edges[height]; i++) { - btree_pack_link_node(req, height, i, next_extent_addr); + req->edge[height][i].hdr->next_addr = next_extent_addr; + btree_pack_link_node(req, height, i); } req->num_edges[height] = 0; } @@ -3725,14 +3942,26 @@ btree_pack_create_next_node(btree_pack_req *req, uint64 height, key pivot) &node_next_extent, PAGE_TYPE_BRANCH, &new_node); - btree_pack_node_init_hdr(req->cfg, new_node.hdr, 0, height); + btree_pack_node_init_hdr(req->cfg, new_node.hdr, height); + // Record where this node's extent is listed in the mini_allocator meta + // stream, so an iterator can position a prefetch cursor in O(1). Must come + // after init_hdr, which zeroes the header. + new_node.hdr->meta_page_addr = + mini_current_extent_meta_page(&req->mini, NUM_BLOB_BATCHES + height); + + if (!req->level_has_nodes[height]) { + btree_hdr_set_first_in_level(new_node.hdr); + } if (0 < req->num_edges[height]) { - btree_node *old_node = btree_pack_get_current_node(req, height); - old_node->hdr->next_addr = new_node.addr; - new_node.hdr->prev_addr = old_node->addr; + btree_node *old_node = btree_pack_get_current_node(req, height); + new_node.hdr->prev_addr = old_node->hdr->prev_addr; if (!btree_addrs_share_extent(req->cc, old_node->addr, new_node.addr)) { - btree_pack_link_extent(req, height, new_node.addr); + debug_assert(btree_page_is_last_in_extent(req->cfg, old_node->addr)); + debug_assert(btree_page_is_first_in_extent(req->cfg, new_node.addr)); + new_node.hdr->prev_addr = + btree_extent_base_addr(req->cc, old_node->addr); + btree_pack_link_extent(req, height, FALSE, new_node.addr); } } @@ -3742,6 +3971,7 @@ btree_pack_create_next_node(btree_pack_req *req, uint64 height, key pivot) req->edge[height][req->num_edges[height]] = new_node; req->num_edges[height]++; + req->level_has_nodes[height] = TRUE; debug_assert(btree_pack_get_current_node_stats(req, height)->num_kvs == 0); return &req->edge[height][req->num_edges[height] - 1]; } @@ -3829,18 +4059,23 @@ btree_pack_post_loop(btree_pack_req *req, key last_key) int h = 0; while (h < req->height || 1 < req->num_edges[h]) { - btree_pack_link_extent(req, h, 0); + btree_pack_link_extent(req, h, TRUE, 0); h++; } + btree_hdr_set_last_in_level(req->edge[req->height][0].hdr); root.addr = req->root_addr; btree_node_get(cc, cfg, &root, PAGE_TYPE_BRANCH); debug_only bool32 success = btree_node_claim(cc, cfg, &root); debug_assert(success); btree_node_lock(cc, cfg, &root); memmove(root.hdr, req->edge[req->height][0].hdr, btree_page_size(cfg)); - // fix the root next extent - root.hdr->next_extent_addr = 0; + // The root is allocated outside the mini allocator's extent stream. + root.hdr->meta_page_addr = 0; + root.hdr->prev_addr = 0; + root.hdr->next_addr = 0; + btree_hdr_set_first_in_level(root.hdr); + btree_hdr_set_last_in_level(root.hdr); btree_node_full_unlock(cc, cfg, &root); btree_node_full_unlock(cc, cfg, &req->edge[req->height][0]); @@ -3993,9 +4228,9 @@ btree_count_in_range_by_iterator(cache *cc, max_key, greater_than_or_equal, min_key, - TRUE, FALSE, - 0); + 0, + 1); platform_assert_status_ok(rc); memset(stats, 0, sizeof(*stats)); @@ -4098,10 +4333,25 @@ btree_print_index_node(platform_log_handle *log_handle, log_handle, "** Page type: %s, INDEX NODE \n", page_type_str[type]); platform_log(log_handle, "** Header ptr: %p\n", hdr); platform_log(log_handle, "** addr: %lu \n", addr); - platform_log(log_handle, "** next_addr: %lu \n", hdr->next_addr); - platform_log( - log_handle, "** next_extent_addr: %lu \n", hdr->next_extent_addr); - platform_log(log_handle, "** generation: %lu \n", hdr->generation); + platform_log(log_handle, + "** prev_addr: %lu \n", + btree_hdr_prev_addr(cfg, hdr, addr, type)); + platform_log(log_handle, + "** next_addr: %lu \n", + btree_hdr_next_addr(cfg, hdr, addr, type)); + if (type == PAGE_TYPE_BRANCH) { + platform_log(log_handle, "** flags: %u \n", hdr->flags); + platform_log( + log_handle, "** meta_page_addr: %lu \n", hdr->meta_page_addr); + platform_log(log_handle, + "** prev_extent_addr: %lu \n", + btree_hdr_prev_extent_addr(cfg, hdr, addr, type)); + platform_log(log_handle, + "** next_extent_addr: %lu \n", + btree_hdr_next_extent_addr(cfg, hdr, addr, type)); + } else { + platform_log(log_handle, "** generation: %lu \n", hdr->generation); + } platform_log(log_handle, "** height: %u \n", btree_height(hdr)); platform_log(log_handle, "** next_entry: %u \n", hdr->next_entry); platform_log(log_handle, "** num_entries: %u \n", btree_num_entries(hdr)); @@ -4145,10 +4395,25 @@ btree_print_leaf_node(platform_log_handle *log_handle, log_handle, "** Page type: %s, LEAF NODE \n", page_type_str[type]); platform_log(log_handle, "** hdrptr: %p\n", hdr); platform_log(log_handle, "** addr: %lu \n", addr); - platform_log(log_handle, "** next_addr: %lu \n", hdr->next_addr); - platform_log( - log_handle, "** next_extent_addr: %lu \n", hdr->next_extent_addr); - platform_log(log_handle, "** generation: %lu \n", hdr->generation); + platform_log(log_handle, + "** prev_addr: %lu \n", + btree_hdr_prev_addr(cfg, hdr, addr, type)); + platform_log(log_handle, + "** next_addr: %lu \n", + btree_hdr_next_addr(cfg, hdr, addr, type)); + if (type == PAGE_TYPE_BRANCH) { + platform_log(log_handle, "** flags: %u \n", hdr->flags); + platform_log( + log_handle, "** meta_page_addr: %lu \n", hdr->meta_page_addr); + platform_log(log_handle, + "** prev_extent_addr: %lu \n", + btree_hdr_prev_extent_addr(cfg, hdr, addr, type)); + platform_log(log_handle, + "** next_extent_addr: %lu \n", + btree_hdr_next_extent_addr(cfg, hdr, addr, type)); + } else { + platform_log(log_handle, "** generation: %lu \n", hdr->generation); + } platform_log(log_handle, "** height: %u \n", btree_height(hdr)); platform_log(log_handle, "** next_entry: %u \n", hdr->next_entry); platform_log(log_handle, "** num_entries: %u \n", btree_num_entries(hdr)); diff --git a/src/btree.h b/src/btree.h index 1aa06a33..48795fb0 100644 --- a/src/btree.h +++ b/src/btree.h @@ -130,6 +130,45 @@ typedef struct ONDISK btree_pivot_data { btree_pivot_stats stats; } btree_pivot_data; +/* + * Drives extent prefetching for a btree_iterator in either direction. Reads + * extent addresses ahead of (or behind) the iterator from the branch's + * mini_allocator (via a mini_meta_cursor, exploiting that extents within a + * batch are in key order) and issues cache_prefetch for them, keeping up to + * ~depth leaf extents of IO in flight. Internal-node extents are skipped; blob + * extents are prefetched (for height-0 scans). + * + * Priming is non-blocking: the cursor's meta page is fetched lazily (PRIMING + * state) so the iterator's async init never waits on it and the first tuple is + * not delayed. The leaf header's extent links are used for prefetching until + * the cursor becomes ACTIVE. + * + * Depth ramps up (slow-start) from BTREE_PREFETCH_RAMP_MIN toward `lookahead` + * as the scan proves long, so short scans don't waste bandwidth reading far + * ahead. On a direction change the ramp resets so both forward and backward + * scans get the same slow-start treatment. + */ +typedef enum btree_prefetch_state { + BTREE_PREFETCH_DISABLED = 0, // deep prefetch inactive + BTREE_PREFETCH_PRIMING, // meta-page IO kicked off; not yet positioned + BTREE_PREFETCH_ACTIVE, // positioned; issuing deep prefetches +} btree_prefetch_state; + +// Initial (and minimum) ramp-up depth; depth doubles toward `lookahead`. +#define BTREE_PREFETCH_RAMP_MIN (1) + +typedef struct btree_prefetch_cursor { + btree_prefetch_state state; + bool32 at_end; // prefetched through the last in-range extent + bool32 going_forward; // current scan direction; reset resets ramp + uint32 lookahead; // K: max leaf extents in flight (the cap) + uint32 depth; // current ramp-up depth (<= lookahead) + uint64 leaf_batch; // mini batch of this iterator's level + bool32 prefetch_blobs; // also prefetch blob extents (height 0) + uint64 prefetched_ahead; // leaf extents prefetched, not yet consumed + mini_meta_cursor meta_cursor; +} btree_prefetch_cursor; + /* * A BTree iterator: */ @@ -137,7 +176,6 @@ typedef struct btree_iterator { iterator super; cache *cc; const btree_config *cfg; - bool32 do_prefetch; uint32 height; page_type page_type; // Active memtable iterators copy nodes here and release page locks. @@ -155,6 +193,8 @@ typedef struct btree_iterator { uint64 end_addr; int64 end_idx; bool32 end_idx_valid; + + btree_prefetch_cursor prefetch; } btree_iterator; typedef struct btree_pack_req { @@ -171,6 +211,7 @@ typedef struct btree_pack_req { btree_node edge[BTREE_MAX_HEIGHT][MAX_PAGES_PER_EXTENT]; btree_pivot_stats edge_stats[BTREE_MAX_HEIGHT][MAX_PAGES_PER_EXTENT]; uint32 num_edges[BTREE_MAX_HEIGHT]; + bool32 level_has_nodes[BTREE_MAX_HEIGHT]; merge_accumulator blob_buffer; mini_allocator mini; @@ -300,6 +341,11 @@ btree_lookup_and_merge_async(btree_lookup_async_state *state); async_status btree_lookup_async(btree_lookup_async_state *state); +/* + * prefetch_lookahead is measured in leaf extents. 0 disables prefetch, 1 + * prefetches at most the next extent, and values >= 2 enable deep extent + * prefetch. + */ platform_status btree_iterator_init(cache *cc, const btree_config *cfg, @@ -312,9 +358,9 @@ btree_iterator_init(cache *cc, key max_key, comparison start_type, key start_key, - bool32 do_prefetch, bool32 copy_nodes, - uint32 height); + uint32 height, + uint32 prefetch_lookahead); // clang-format off DEFINE_ASYNC_STATE(btree_iterator_async_state, 5, @@ -329,9 +375,9 @@ DEFINE_ASYNC_STATE(btree_iterator_async_state, 5, param, key, max_key, param, comparison, start_type, param, key, start_key, - param, bool32, do_prefetch, param, bool32, copy_nodes, param, uint32, height, + param, uint32, prefetch_lookahead, param, async_callback_fn, callback, param, void *, callback_arg, local, platform_status, __async_result, @@ -341,7 +387,6 @@ DEFINE_ASYNC_STATE(btree_iterator_async_state, 5, local, key, target, local, comparison, position_rule, local, bool32, found, - local, bool32, forward, local, int64, tmp, local, uint64, curr_addr, local, uint64, last_addr, diff --git a/src/btree_private.h b/src/btree_private.h index 5b8995b2..3b40a92d 100644 --- a/src/btree_private.h +++ b/src/btree_private.h @@ -32,16 +32,31 @@ typedef node_offset table_entry; * ************************************************************************* */ struct ONDISK btree_hdr { - uint64 prev_addr; - uint64 next_addr; - uint64 next_extent_addr; - uint64 generation; + /* + * Memtables store literal previous/next node addresses. Branches compute + * same-extent neighbors from the page address; every branch page in an + * extent stores the adjacent previous/next extent base addresses for its + * level. FIRST_IN_LEVEL and LAST_IN_LEVEL stop logical neighbor computation + * at level edges. + */ + uint64 prev_addr; + uint64 next_addr; + union { + // Branch: mini_allocator meta page that lists this node's extent. + uint64 meta_page_addr; + // Memtable: generation used to detect stale copied nodes. + uint64 generation; + }; uint8 height; + uint8 flags; node_offset next_entry; table_index num_entries; table_entry offsets[]; }; +#define BTREE_HDR_FIRST_IN_LEVEL (1 << 0) +#define BTREE_HDR_LAST_IN_LEVEL (1 << 1) + /* * ************************************************************************* * BTree Node index entries: Disk-resident structure @@ -167,6 +182,162 @@ btree_extent_size(const btree_config *cfg) return cache_config_extent_size(cfg->cache_cfg); } +static inline bool32 +btree_page_is_first_in_extent(const btree_config *cfg, uint64 addr) +{ + return addr % btree_extent_size(cfg) == 0; +} + +static inline bool32 +btree_page_is_last_in_extent(const btree_config *cfg, uint64 addr) +{ + return (addr + btree_page_size(cfg)) % btree_extent_size(cfg) == 0; +} + +static inline void +btree_hdr_set_first_in_level(btree_hdr *hdr) +{ + hdr->flags |= BTREE_HDR_FIRST_IN_LEVEL; +} + +static inline void +btree_hdr_set_last_in_level(btree_hdr *hdr) +{ + hdr->flags |= BTREE_HDR_LAST_IN_LEVEL; +} + +static inline bool32 +btree_hdr_is_first_in_level(const btree_hdr *hdr) +{ + return hdr->flags & BTREE_HDR_FIRST_IN_LEVEL; +} + +static inline bool32 +btree_hdr_is_last_in_level(const btree_hdr *hdr) +{ + return hdr->flags & BTREE_HDR_LAST_IN_LEVEL; +} + +static inline uint64 +btree_hdr_next_addr(const btree_config *cfg, + const btree_hdr *hdr, + uint64 addr, + page_type type) +{ + if (type == PAGE_TYPE_MEMTABLE) { + return hdr->next_addr; + } + platform_assert(type == PAGE_TYPE_BRANCH); + + if (btree_hdr_is_last_in_level(hdr)) { + return 0; + } + if (!btree_page_is_last_in_extent(cfg, addr)) { + return addr + btree_page_size(cfg); + } + debug_assert(hdr->next_addr != 0); + return hdr->next_addr; +} + +static inline uint64 +btree_hdr_prev_addr(const btree_config *cfg, + const btree_hdr *hdr, + uint64 addr, + page_type type) +{ + if (type == PAGE_TYPE_MEMTABLE) { + return hdr->prev_addr; + } + platform_assert(type == PAGE_TYPE_BRANCH); + + if (btree_hdr_is_first_in_level(hdr)) { + return 0; + } + if (!btree_page_is_first_in_extent(cfg, addr)) { + return addr - btree_page_size(cfg); + } + debug_assert(hdr->prev_addr != 0); + return hdr->prev_addr + btree_extent_size(cfg) - btree_page_size(cfg); +} + +static inline uint64 +btree_hdr_next_extent_addr(const btree_config *cfg, + const btree_hdr *hdr, + uint64 addr, + page_type type) +{ + (void)cfg; + (void)addr; + + if (type != PAGE_TYPE_BRANCH || btree_hdr_is_last_in_level(hdr) + || hdr->next_addr == 0) + { + return 0; + } + return hdr->next_addr; +} + +static inline uint64 +btree_hdr_prev_extent_addr(const btree_config *cfg, + const btree_hdr *hdr, + uint64 addr, + page_type type) +{ + (void)cfg; + (void)addr; + + if (type != PAGE_TYPE_BRANCH || btree_hdr_is_first_in_level(hdr) + || hdr->prev_addr == 0) + { + return 0; + } + return hdr->prev_addr; +} + +static inline uint64 +btree_node_next_addr(const btree_config *cfg, + const btree_node *node, + page_type type) +{ + platform_assert(node != NULL); + platform_assert(node->hdr != NULL); + + return btree_hdr_next_addr(cfg, node->hdr, node->addr, type); +} + +static inline uint64 +btree_node_prev_addr(const btree_config *cfg, + const btree_node *node, + page_type type) +{ + platform_assert(node != NULL); + platform_assert(node->hdr != NULL); + + return btree_hdr_prev_addr(cfg, node->hdr, node->addr, type); +} + +static inline uint64 +btree_node_next_extent_addr(const btree_config *cfg, + const btree_node *node, + page_type type) +{ + platform_assert(node != NULL); + platform_assert(node->hdr != NULL); + + return btree_hdr_next_extent_addr(cfg, node->hdr, node->addr, type); +} + +static inline uint64 +btree_node_prev_extent_addr(const btree_config *cfg, + const btree_node *node, + page_type type) +{ + platform_assert(node != NULL); + platform_assert(node->hdr != NULL); + + return btree_hdr_prev_extent_addr(cfg, node->hdr, node->addr, type); +} + static inline void btree_init_hdr(const btree_config *cfg, btree_hdr *hdr) { diff --git a/src/cache.h b/src/cache.h index 9e638aea..bc239b7a 100644 --- a/src/cache.h +++ b/src/cache.h @@ -169,6 +169,7 @@ typedef struct cache_ops { page_generic_fn page_lock; page_generic_fn page_unlock; page_prefetch_fn page_prefetch; + page_prefetch_fn page_prefetch_page; page_generic_fn page_mark_dirty; page_generic_fn page_pin; page_generic_fn page_unpin; @@ -408,6 +409,24 @@ cache_prefetch(cache *cc, uint64 addr, page_type type) return cc->ops->page_prefetch(cc, addr, type); } +/* + *---------------------------------------------------------------------- + * cache_prefetch_page + * + * Like cache_prefetch, but loads only the single page at addr rather than the + * whole extent that contains it. Use this for sparse reads (e.g. a single + * mini_allocator meta page) where pulling in the surrounding extent would waste + * bandwidth. No notification is provided to the calling thread; it may call + * cache_get when it's ready to block on the arrival of the page. + * + *---------------------------------------------------------------------- + */ +static inline void +cache_prefetch_page(cache *cc, uint64 addr, page_type type) +{ + return cc->ops->page_prefetch_page(cc, addr, type); +} + /* *---------------------------------------------------------------------- * cache_mark_dirty diff --git a/src/clockcache.c b/src/clockcache.c index d9585733..2fe92b3e 100644 --- a/src/clockcache.c +++ b/src/clockcache.c @@ -1805,7 +1805,7 @@ page_handle * clockcache_get(clockcache *cc, uint64 addr, bool32 blocking, page_type type) { bool32 retry; - page_handle *handle; + page_handle *handle = NULL; debug_assert(cc->per_thread[platform_get_tid()].enable_sync_get || type == PAGE_TYPE_MEMTABLE); @@ -2517,25 +2517,59 @@ clockcache_prefetch_callback(void *pfs) platform_free(PROCESS_PRIVATE_HEAP_ID, state); } -/* - *----------------------------------------------------------------------------- - * clockcache_prefetch -- - * - * prefetch asynchronously loads the extent with given base address - *----------------------------------------------------------------------------- - */ -void -clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) +static void +clockcache_prefetch_issue(clockcache *cc, + async_io_state **state, + uint64 *state_num_pages, + page_type type) { - async_io_state *state = NULL; - uint64 state_num_pages = 0; - uint64 pages_per_extent = cc->cfg->pages_per_extent; - threadid tid = platform_get_tid(); + platform_assert(cc != NULL); + platform_assert(state != NULL); + platform_assert(*state != NULL); + platform_assert(state_num_pages != NULL); + platform_assert(*state_num_pages > 0); + platform_assert(PAGE_TYPE_FIRST <= type && type < NUM_PAGE_TYPES); + + if (cc->cfg->use_stats) { + threadid tid = platform_get_tid(); + cc->stats[tid].page_reads[type] += *state_num_pages; + cc->stats[tid].prefetches_issued[type]++; + } - debug_assert(base_addr % clockcache_extent_size(cc) == 0); + io_async_run((*state)->iostate); + *state = NULL; + *state_num_pages = 0; +} - for (uint64 page_off = 0; page_off < pages_per_extent; page_off++) { - uint64 addr = base_addr + clockcache_multiply_by_page_size(cc, page_off); +static void +clockcache_prefetch_discard_empty(async_io_state **state) +{ + platform_assert(state != NULL); + platform_assert(*state != NULL); + + io_async_state_deinit((*state)->iostate); + platform_free(PROCESS_PRIVATE_HEAP_ID, *state); + *state = NULL; +} + +static void +clockcache_prefetch_pages(clockcache *cc, + uint64 first_addr, + uint64 num_pages, + page_type type) +{ + async_io_state *state = NULL; + uint64 state_num_pages = 0; + threadid tid = platform_get_tid(); + + platform_assert(cc != NULL); + platform_assert(PAGE_TYPE_FIRST <= type && type < NUM_PAGE_TYPES); + platform_assert(num_pages > 0); + platform_assert(num_pages <= cc->cfg->pages_per_extent); + platform_assert(first_addr % clockcache_page_size(cc) == 0); + + for (uint64 page_off = 0; page_off < num_pages; page_off++) { + uint64 addr = first_addr + clockcache_multiply_by_page_size(cc, page_off); uint32 entry_no = clockcache_lookup(cc, addr); get_rc get_read_rc; if (entry_no != CC_UNMAPPED_ENTRY) { @@ -2549,42 +2583,34 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) clockcache_dec_ref(cc, entry_no, tid); // fallthrough case GET_RC_CONFLICT: - // in cache, issue IO req if pages have been queued if (state != NULL) { - platform_assert(state_num_pages > 0); - if (cc->cfg->use_stats) { - threadid tid = platform_get_tid(); - cc->stats[tid].page_reads[type] += state_num_pages; - cc->stats[tid].prefetches_issued[type]++; - } - io_async_run(state->iostate); - state = NULL; - state_num_pages = 0; + clockcache_prefetch_issue(cc, &state, &state_num_pages, type); } clockcache_log(addr, entry_no, - "prefetch (cached): entry %u addr %lu\n", + "prefetch_pages (cached): entry %u addr %lu\n", entry_no, addr); break; case GET_RC_EVICTED: { - // need to prefetch uint32 free_entry_no = clockcache_get_free_page( cc, CC_READ_LOADING_STATUS, type, FALSE, TRUE); clockcache_entry *entry = &cc->entry[free_entry_no]; entry->page.disk_addr = addr; entry->type = type; uint64 lookup_no = clockcache_divide_by_page_size(cc, addr); + if (state == NULL) { - // start a new IO req before publishing the loading entry state = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, state); if (state == NULL) { - platform_error_log("clockcache_prefetch: async_io_state " - "allocation failed for base addr %lu, " - "page addr %lu, type %u\n", - base_addr, + platform_error_log("clockcache_prefetch_pages: " + "async_io_state allocation failed for " + "first addr %lu, page addr %lu, " + "num pages %lu, type %u\n", + first_addr, addr, + num_pages, type); clockcache_release_unpublished_entry(entry); return; @@ -2598,11 +2624,13 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) clockcache_prefetch_callback, state); if (!SUCCESS(rc)) { - platform_error_log("clockcache_prefetch: " - "io_async_state_init failed for base addr " - "%lu, page addr %lu, type %u: %s\n", - base_addr, + platform_error_log("clockcache_prefetch_pages: " + "io_async_state_init failed for first " + "addr %lu, page addr %lu, num pages %lu, " + "type %u: %s\n", + first_addr, addr, + num_pages, type, platform_status_to_string(rc)); clockcache_release_unpublished_entry(entry); @@ -2611,41 +2639,38 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) return; } } + if (__sync_bool_compare_and_swap( &cc->lookup[lookup_no], CC_UNMAPPED_ENTRY, free_entry_no)) { platform_status rc = io_async_state_append_page(state->iostate, entry->page.data); if (!SUCCESS(rc)) { - platform_error_log("clockcache_prefetch: " + platform_error_log("clockcache_prefetch_pages: " "io_async_state_append_page failed for " - "base addr %lu, page addr %lu, entry %u, " - "type %u: %s\n", - base_addr, + "first addr %lu, page addr %lu, " + "entry %u, num pages %lu, type %u: %s\n", + first_addr, addr, free_entry_no, + num_pages, type, platform_status_to_string(rc)); } platform_assert_status_ok(rc); state_num_pages++; clockcache_log(addr, - entry_no, - "prefetch (load): entry %u addr %lu\n", - entry_no, + free_entry_no, + "prefetch_pages (load): entry %u addr %lu\n", + free_entry_no, addr); } else { - /* - * someone else is already loading this page, release the free - * entry and retry - */ clockcache_release_unpublished_entry(entry); - if (state_num_pages == 0) { - io_async_state_deinit(state->iostate); - platform_free(PROCESS_PRIVATE_HEAP_ID, state); - state = NULL; + if (state_num_pages > 0) { + clockcache_prefetch_issue(cc, &state, &state_num_pages, type); + } else { + clockcache_prefetch_discard_empty(&state); } - page_off--; } break; } @@ -2653,20 +2678,41 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) platform_assert(0); } } - // issue IO req if pages have been queued + if (state != NULL) { - platform_assert(state_num_pages > 0); - if (cc->cfg->use_stats) { - threadid tid = platform_get_tid(); - cc->stats[tid].page_reads[type] += state_num_pages; - cc->stats[tid].prefetches_issued[type]++; - } - io_async_run(state->iostate); - state = NULL; - state_num_pages = 0; + clockcache_prefetch_issue(cc, &state, &state_num_pages, type); } } +/* + *----------------------------------------------------------------------------- + * clockcache_prefetch -- + * + * Prefetch asynchronously loads the extent with given base address. + *----------------------------------------------------------------------------- + */ +void +clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) +{ + platform_assert(cc != NULL); + platform_assert(base_addr % clockcache_extent_size(cc) == 0); + + clockcache_prefetch_pages(cc, base_addr, cc->cfg->pages_per_extent, type); +} + +/* + *----------------------------------------------------------------------------- + * clockcache_prefetch_page -- + * + * Prefetch asynchronously loads the single page at addr. + *----------------------------------------------------------------------------- + */ +void +clockcache_prefetch_page(clockcache *cc, uint64 addr, page_type type) +{ + clockcache_prefetch_pages(cc, addr, 1, type); +} + /* *---------------------------------------------------------------------- * clockcache_print -- @@ -3044,6 +3090,13 @@ clockcache_prefetch_virtual(cache *c, uint64 addr, page_type type) clockcache_prefetch(cc, addr, type); } +void +clockcache_prefetch_page_virtual(cache *c, uint64 addr, page_type type) +{ + clockcache *cc = (clockcache *)c; + clockcache_prefetch_page(cc, addr, type); +} + void clockcache_mark_dirty_virtual(cache *c, page_handle *page) { @@ -3239,34 +3292,35 @@ static cache_ops clockcache_ops = { .page_get_async = clockcache_get_async_virtual, .page_get_async_result = clockcache_get_async_state_result_virtual, - .page_unget = clockcache_unget_virtual, - .page_try_claim = clockcache_try_claim_virtual, - .page_unclaim = clockcache_unclaim_virtual, - .page_lock = clockcache_lock_virtual, - .page_unlock = clockcache_unlock_virtual, - .page_prefetch = clockcache_prefetch_virtual, - .page_mark_dirty = clockcache_mark_dirty_virtual, - .page_pin = clockcache_pin_virtual, - .page_unpin = clockcache_unpin_virtual, - .page_sync = clockcache_page_sync_virtual, - .extent_sync = clockcache_extent_sync_virtual, - .flush = clockcache_flush_virtual, - .evict = clockcache_evict_all_virtual, - .cleanup = clockcache_wait_virtual, - .in_use = clockcache_in_use_virtual, - .assert_ungot = clockcache_assert_ungot_virtual, - .assert_free = clockcache_assert_no_locks_held_virtual, - .print = clockcache_print_virtual, - .print_stats = clockcache_print_stats_virtual, - .io_stats = clockcache_io_stats_virtual, - .reset_stats = clockcache_reset_stats_virtual, - .validate_page = clockcache_validate_page_virtual, - .count_dirty = clockcache_count_dirty_virtual, - .page_get_read_ref = clockcache_get_read_ref_virtual, - .cache_present = clockcache_present_virtual, - .enable_sync_get = clockcache_enable_sync_get_virtual, - .get_allocator = clockcache_get_allocator_virtual, - .get_config = clockcache_get_config_virtual, + .page_unget = clockcache_unget_virtual, + .page_try_claim = clockcache_try_claim_virtual, + .page_unclaim = clockcache_unclaim_virtual, + .page_lock = clockcache_lock_virtual, + .page_unlock = clockcache_unlock_virtual, + .page_prefetch = clockcache_prefetch_virtual, + .page_prefetch_page = clockcache_prefetch_page_virtual, + .page_mark_dirty = clockcache_mark_dirty_virtual, + .page_pin = clockcache_pin_virtual, + .page_unpin = clockcache_unpin_virtual, + .page_sync = clockcache_page_sync_virtual, + .extent_sync = clockcache_extent_sync_virtual, + .flush = clockcache_flush_virtual, + .evict = clockcache_evict_all_virtual, + .cleanup = clockcache_wait_virtual, + .in_use = clockcache_in_use_virtual, + .assert_ungot = clockcache_assert_ungot_virtual, + .assert_free = clockcache_assert_no_locks_held_virtual, + .print = clockcache_print_virtual, + .print_stats = clockcache_print_stats_virtual, + .io_stats = clockcache_io_stats_virtual, + .reset_stats = clockcache_reset_stats_virtual, + .validate_page = clockcache_validate_page_virtual, + .count_dirty = clockcache_count_dirty_virtual, + .page_get_read_ref = clockcache_get_read_ref_virtual, + .cache_present = clockcache_present_virtual, + .enable_sync_get = clockcache_enable_sync_get_virtual, + .get_allocator = clockcache_get_allocator_virtual, + .get_config = clockcache_get_config_virtual, }; /* diff --git a/src/core.c b/src/core.c index cbfb0868..b30c01cb 100644 --- a/src/core.c +++ b/src/core.c @@ -13,6 +13,7 @@ #include "platform_sleep.h" #include "platform_time.h" #include "platform_util.h" +#include "prefetch.h" #include "poison.h" #define LATENCYHISTO_SIZE 15 @@ -45,14 +46,6 @@ static const int64 latency_histo_buckets[LATENCYHISTO_SIZE] = { _Static_assert(CORE_NUM_MEMTABLES <= MAX_MEMTABLES, "CORE_NUM_MEMTABLES <= MAX_MEMTABLES"); -/* - * For a "small" range query, you don't want to prefetch pages. - * This is the minimal # of items requested before we turn ON prefetching. - * (Empirically established through past experiments, for small key-value - * pairs. So, _may_ be less efficient in general cases. Needs a revisit.) - */ -#define CORE_PREFETCH_MIN (16384) - /* Some randomly chosen Splinter super-block checksum seed. */ #define CORE_SUPER_CSUM_SEED (42) @@ -389,7 +382,7 @@ core_memtable_iterator_init(core_handle *spl, start_key_comparison, start_key, FALSE, - FALSE, + 0, 0); } @@ -877,8 +870,8 @@ core_start_btree_iterator_init_async( key max_key, comparison start_key_comparison, key start_key, - bool32 do_prefetch, - bool32 copy_nodes) + bool32 copy_nodes, + uint32 prefetch_lookahead) { btree_iterator_async_state_init(&ctxt->state, spl->cc, @@ -892,9 +885,9 @@ core_start_btree_iterator_init_async( max_key, start_key_comparison, start_key, - do_prefetch, copy_nodes, 0, + prefetch_lookahead, core_btree_iterator_init_async_callback, ctxt); ctxt->ready = FALSE; @@ -1004,8 +997,7 @@ core_range_iterator_init(core_handle *spl, comparison max_key_comparison, key max_key, comparison start_key_comparison, - key start_key, - uint64 num_tuples) + key start_key) { platform_status rc; @@ -1020,7 +1012,6 @@ core_range_iterator_init(core_handle *spl, range_itor->spl = spl; range_itor->super.ops = &core_range_iterator_ops; range_itor->num_branches = 0; - range_itor->num_tuples = num_tuples; range_itor->merge_itor = NULL; range_itor->can_prev = TRUE; range_itor->can_next = TRUE; @@ -1186,18 +1177,29 @@ core_range_iterator_init(core_handle *spl, return STATUS_NO_MEMORY; } + // Deep extent-prefetch for the scan: count compacted branches and give each + // a soft share of the prefetch budget. + uint64 n_prefetch_branches = 0; + for (uint64 branch_no = 0; branch_no < range_itor->num_branches; branch_no++) + { + if (range_itor->compacted[branch_no]) { + n_prefetch_branches++; + } + } + uint32 deep_lookahead = + prefetch_budget_to_extent_lookahead(cache_extent_size(spl->cc), + spl->cfg.prefetch_budget, + n_prefetch_branches); + uint64 started_inits = 0; for (uint64 i = 0; i < range_itor->num_branches; i++) { - uint64 branch_no = range_itor->num_branches - i - 1; - btree_iterator *btree_itor = &range_itor->btree_itor[branch_no]; - uint64 branch_addr = range_itor->branch[branch_no].addr; - page_type page_type = range_itor->branch[branch_no].type; - bool32 do_prefetch = FALSE; + uint64 branch_no = range_itor->num_branches - i - 1; + btree_iterator *btree_itor = &range_itor->btree_itor[branch_no]; + uint64 branch_addr = range_itor->branch[branch_no].addr; + page_type page_type = range_itor->branch[branch_no].type; + uint32 prefetch_lookahead = 0; if (range_itor->compacted[branch_no]) { - do_prefetch = - range_itor->compacted[branch_no] && num_tuples > CORE_PREFETCH_MIN - ? TRUE - : FALSE; + prefetch_lookahead = deep_lookahead; } rc = core_start_btree_iterator_init_async( spl, @@ -1211,8 +1213,8 @@ core_range_iterator_init(core_handle *spl, key_buffer_key(&range_itor->local_max_key), start_key_comparison, start_key, - do_prefetch, - branch_no == 0 ? first_memtable_copy_nodes : FALSE); + branch_no == 0 ? first_memtable_copy_nodes : FALSE, + prefetch_lookahead); started_inits++; if (!SUCCESS(rc)) { break; @@ -1263,7 +1265,6 @@ core_range_iterator_init(core_handle *spl, key_buffer local_max_buffer; rc = key_buffer_init_from_key( &local_max_buffer, PROCESS_PRIVATE_HEAP_ID, local_max); - uint64 num_tuples = range_itor->num_tuples; core_range_iterator_deinit(range_itor); if (!SUCCESS(rc)) { return rc; @@ -1276,8 +1277,7 @@ core_range_iterator_init(core_handle *spl, max_key_comparison, max_key, greater_than_or_equal, - local_max, - num_tuples); + local_max); key_buffer_deinit(&local_max_buffer); if (!SUCCESS(rc)) { return rc; @@ -1295,7 +1295,6 @@ core_range_iterator_init(core_handle *spl, key_buffer local_min_buffer; rc = key_buffer_init_from_key( &local_min_buffer, PROCESS_PRIVATE_HEAP_ID, local_min); - uint64 num_tuples = range_itor->num_tuples; core_range_iterator_deinit(range_itor); if (!SUCCESS(rc)) { return rc; @@ -1308,8 +1307,7 @@ core_range_iterator_init(core_handle *spl, max_key_comparison, max_key, less_than, - local_min, - num_tuples); + local_min); key_buffer_deinit(&local_min_buffer); if (!SUCCESS(rc)) { return rc; @@ -1343,7 +1341,6 @@ core_range_iterator_next(iterator *itor) if (!SUCCESS(rc)) { return rc; } - range_itor->num_tuples++; range_itor->can_prev = TRUE; range_itor->can_next = iterator_can_next(&range_itor->merge_itor->super); if (!range_itor->can_next) { @@ -1372,7 +1369,6 @@ core_range_iterator_next(iterator *itor) // if there is more data to get, rebuild the iterator for next leaf if (core_range_iterator_has_next_leaf(range_itor)) { core_handle *spl = range_itor->spl; - uint64 temp_tuples = range_itor->num_tuples; comparison min_key_comparison = range_itor->min_key_comparison; comparison max_key_comparison = range_itor->max_key_comparison; core_range_iterator_deinit(range_itor); @@ -1383,8 +1379,7 @@ core_range_iterator_next(iterator *itor) max_key_comparison, max_key, greater_than_or_equal, - local_max_key, - temp_tuples); + local_max_key); if (!SUCCESS(rc)) { return rc; } @@ -1407,7 +1402,6 @@ core_range_iterator_prev(iterator *itor) if (!SUCCESS(rc)) { return rc; } - range_itor->num_tuples++; range_itor->can_next = TRUE; range_itor->can_prev = iterator_can_prev(&range_itor->merge_itor->super); if (!range_itor->can_prev) { @@ -1436,7 +1430,6 @@ core_range_iterator_prev(iterator *itor) // if there is more data to get, rebuild the iterator for prev leaf if (core_key_compare(range_itor->spl, local_min_key, min_key) > 0) { core_handle *spl = range_itor->spl; - uint64 temp_tuples = range_itor->num_tuples; comparison min_key_comparison = range_itor->min_key_comparison; comparison max_key_comparison = range_itor->max_key_comparison; core_range_iterator_deinit(range_itor); @@ -1447,8 +1440,7 @@ core_range_iterator_prev(iterator *itor) max_key_comparison, max_key, less_than, - local_min_key, - temp_tuples); + local_min_key); if (!SUCCESS(rc)) { return rc; } @@ -1753,8 +1745,7 @@ core_apply_to_range(core_handle *spl, less_than, POSITIVE_INFINITY_KEY, greater_than_or_equal, - start_key, - num_tuples); + start_key); if (!SUCCESS(rc)) { platform_error_log("core_apply_to_range: range iterator init failed: " "%s\n", @@ -2483,6 +2474,7 @@ core_config_init(core_config *core_cfg, log_config *log_cfg, trunk_config *trunk_node_cfg, uint64 queue_scale_percent, + uint64 prefetch_budget, bool32 use_log, bool32 use_stats, bool32 verbose_logging, @@ -2499,6 +2491,7 @@ core_config_init(core_config *core_cfg, core_cfg->log_cfg = log_cfg; core_cfg->queue_scale_percent = queue_scale_percent; + core_cfg->prefetch_budget = prefetch_budget; core_cfg->use_log = use_log; core_cfg->use_stats = use_stats; core_cfg->verbose_logging_enabled = verbose_logging; diff --git a/src/core.h b/src/core.h index db90f982..94c10f97 100644 --- a/src/core.h +++ b/src/core.h @@ -28,6 +28,10 @@ * Splinter Configuration structure *---------------------------------------------------------------------- */ +// Default range-scan prefetch budget (total extent read-ahead kept in flight), +// ~1 MiB == 8 extents at the default 128 KiB extent size. +#define CORE_DEFAULT_PREFETCH_BUDGET (1024UL * 1024) + typedef struct core_config { cache_config *cache_cfg; @@ -35,6 +39,11 @@ typedef struct core_config { uint64 queue_scale_percent; // Governs when inserters perform bg tasks. See // task.h + // Soft byte budget for range-scan extent read-ahead, divided across the + // branches being merged. Roughly the storage's bandwidth-delay product; + // raise it for higher-latency devices. + uint64 prefetch_budget; + bool32 use_stats; // stats memtable_config mt_cfg; btree_config *btree_cfg; @@ -118,7 +127,6 @@ struct core_handle { typedef struct core_range_iterator { iterator super; core_handle *spl; - uint64 num_tuples; uint64 num_branches; uint64 num_memtable_branches; uint64 memtable_start_gen; @@ -197,8 +205,7 @@ core_range_iterator_init(core_handle *spl, comparison max_key_comparison, key max_key, comparison start_key_comparison, - key start_key, - uint64 num_tuples); + key start_key); void core_range_iterator_deinit(core_range_iterator *range_itor); @@ -291,6 +298,7 @@ core_config_init(core_config *trunk_cfg, log_config *log_cfg, trunk_config *trunk_node_cfg, uint64 queue_scale_percent, + uint64 prefetch_budget, bool32 use_log, bool32 use_stats, bool32 verbose_logging, diff --git a/src/mini_allocator.c b/src/mini_allocator.c index 4a4a6170..1099641f 100644 --- a/src/mini_allocator.c +++ b/src/mini_allocator.c @@ -33,6 +33,7 @@ */ typedef struct ONDISK mini_meta_hdr { uint64 next_meta_addr; + uint64 prev_meta_addr; uint64 pos; uint32 num_entries; char entry_buffer[]; @@ -45,14 +46,79 @@ typedef struct ONDISK mini_meta_hdr { * meta_entry -- Disk-resident structure * * Metadata for each extent stored in the extent list for a - * mini_allocator. Currently, this is just the extent address itself. + * mini_allocator. *----------------------------------------------------------------------------- */ +/* + * A meta_entry is packed into a single 8-byte word to keep the extent list + * dense (it avoids ONDISK padding and wastes no space). Extents are at least + * 128 KiB, so an extent address has at least 17 always-zero low bits; we store + * the extent *number* (extent_addr / extent_size) instead of the address, + * which frees more than enough bits to also record the page type and the + * originating mini_allocator batch: + * + * bits [ 0: 7] batch (one of MINI_MAX_BATCHES batches) + * bits [ 8:15] type (page_type) + * bits [16:63] extent number (extent_addr / extent_size) + */ typedef struct ONDISK meta_entry { - uint64 extent_addr; - uint8 type; + uint64 packed; } meta_entry; +#define META_ENTRY_BATCH_BITS (8) +#define META_ENTRY_TYPE_BITS (8) +#define META_ENTRY_EXTENT_BITS \ + (64 - META_ENTRY_BATCH_BITS - META_ENTRY_TYPE_BITS) + +_Static_assert(MINI_MAX_BATCHES <= (1 << META_ENTRY_BATCH_BITS), + "mini_allocator batch number does not fit in a meta_entry"); +_Static_assert(NUM_PAGE_TYPES <= (1 << META_ENTRY_TYPE_BITS), + "page_type does not fit in a meta_entry"); + +static inline uint64 +meta_entry_batch(const meta_entry *entry) +{ + return entry->packed & ((1 << META_ENTRY_BATCH_BITS) - 1); +} + +static inline page_type +meta_entry_type(const meta_entry *entry) +{ + return (page_type)((entry->packed >> META_ENTRY_BATCH_BITS) + & ((1 << META_ENTRY_TYPE_BITS) - 1)); +} + +static inline uint64 +meta_entry_extent_addr(cache *cc, const meta_entry *entry) +{ + uint64 extent_number = + entry->packed >> (META_ENTRY_BATCH_BITS + META_ENTRY_TYPE_BITS); + return extent_number * cache_extent_size(cc); +} + +static inline void +meta_entry_pack(cache *cc, + meta_entry *entry, + uint64 extent_addr, + page_type type, + uint64 batch) +{ + platform_assert(cc != NULL); + platform_assert(entry != NULL); + + uint64 extent_size = cache_extent_size(cc); + uint64 extent_number = extent_addr / extent_size; + platform_assert(extent_addr != 0); + platform_assert((extent_addr % extent_size) == 0); + platform_assert(extent_number < (1ULL << META_ENTRY_EXTENT_BITS)); + platform_assert(PAGE_TYPE_FIRST <= type && type < NUM_PAGE_TYPES); + platform_assert(batch < MINI_MAX_BATCHES); + + entry->packed = + (extent_number << (META_ENTRY_BATCH_BITS + META_ENTRY_TYPE_BITS)) + | ((uint64)type << META_ENTRY_BATCH_BITS) | batch; +} + static meta_entry * first_entry(page_handle *meta_page) { @@ -83,6 +149,7 @@ mini_init_meta_page(mini_allocator *mini, page_handle *meta_page) { mini_meta_hdr *hdr = (mini_meta_hdr *)meta_page->data; hdr->next_meta_addr = 0; + hdr->prev_meta_addr = 0; hdr->pos = offsetof(typeof(*hdr), entry_buffer); hdr->num_entries = 0; } @@ -284,11 +351,18 @@ static bool32 mini_append_entry_to_page(mini_allocator *mini, page_handle *meta_page, uint64 extent_addr, - page_type type) + page_type type, + uint64 batch) { + platform_assert(mini != NULL); + platform_assert(mini->cc != NULL); + platform_assert(meta_page != NULL); + platform_assert(PAGE_TYPE_FIRST <= type && type < NUM_PAGE_TYPES); + platform_assert(batch < mini->num_batches); + uint64 page_size = cache_page_size(mini->cc); - debug_assert(extent_addr != 0); - debug_assert((extent_addr % page_size) == 0); + platform_assert(extent_addr != 0); + platform_assert((extent_addr % page_size) == 0); mini_meta_hdr *hdr = (mini_meta_hdr *)meta_page->data; @@ -296,9 +370,8 @@ mini_append_entry_to_page(mini_allocator *mini, return FALSE; } - meta_entry *new_entry = pointer_byte_offset(hdr, hdr->pos); - new_entry->extent_addr = extent_addr; - new_entry->type = type; + meta_entry *new_entry = pointer_byte_offset(hdr, hdr->pos); + meta_entry_pack(mini->cc, new_entry, extent_addr, type, batch); hdr->pos += sizeof(meta_entry); hdr->num_entries++; @@ -381,13 +454,22 @@ mini_set_next_meta_addr(mini_allocator *mini, hdr->next_meta_addr = next_meta_addr; } +static void +mini_set_prev_meta_addr(mini_allocator *mini, + page_handle *meta_page, + uint64 prev_meta_addr) +{ + mini_meta_hdr *hdr = (mini_meta_hdr *)meta_page->data; + hdr->prev_meta_addr = prev_meta_addr; +} + static bool32 mini_append_entry(mini_allocator *mini, uint64 batch, uint64 next_addr) { page_handle *meta_page = mini_full_lock_meta_tail(mini); bool32 success; - success = - mini_append_entry_to_page(mini, meta_page, next_addr, mini->types[batch]); + success = mini_append_entry_to_page( + mini, meta_page, next_addr, mini->types[batch], batch); if (!success) { // need to allocate a new meta page uint64 new_meta_tail = mini->meta_tail + cache_page_size(mini->cc); @@ -401,19 +483,27 @@ mini_append_entry(mini_allocator *mini, uint64 batch, uint64 next_addr) mini_set_next_meta_addr(mini, meta_page, new_meta_tail); page_handle *last_meta_page = meta_page; + uint64 last_meta_addr = mini->meta_tail; meta_page = cache_alloc(mini->cc, new_meta_tail, mini->meta_type); mini->meta_tail = new_meta_tail; mini_full_unlock_meta_page(mini, last_meta_page); mini_init_meta_page(mini, meta_page); + // Doubly-link the meta list so a prefetch cursor can scan it backward. + mini_set_prev_meta_addr(mini, meta_page, last_meta_addr); success = mini_append_entry_to_page( - mini, meta_page, next_addr, mini->types[batch]); + mini, meta_page, next_addr, mini->types[batch], batch); if (mini->pinned) { cache_pin(mini->cc, meta_page); } debug_assert(success); } + // Record the meta page that now holds this extent's entry, so btree nodes + // allocated from this extent can point straight at it (see + // mini_current_extent_meta_page). Safe: the caller holds the batch lock, so + // mini->meta_tail cannot advance under us until we unlock the page below. + mini->cur_extent_meta_page[batch] = mini->meta_tail; mini_full_unlock_meta_page(mini, meta_page); return TRUE; } @@ -726,7 +816,10 @@ mini_for_each_meta_page_func(cache *cc, uint64 num_meta_entries = mini_num_entries(meta_page); meta_entry *entry = first_entry(meta_page); for (uint64 i = 0; i < num_meta_entries; i++) { - fef->func(cc, entry->type, entry->extent_addr, fef->arg); + fef->func(cc, + meta_entry_type(entry), + meta_entry_extent_addr(cc, entry), + fef->arg); entry = next_entry(entry); } } @@ -824,6 +917,162 @@ mini_prefetch(cache *cc, page_type type, uint64 meta_head) mini_for_each(cc, meta_head, type, mini_prefetch_extent, NULL); } +/* + *----------------------------------------------------------------------------- + * mini_meta_cursor -- cursor over a mini_allocator's extent entries. + *----------------------------------------------------------------------------- + */ +mini_meta_cursor_status +mini_meta_cursor_init(mini_meta_cursor *cursor, + cache *cc, + page_type meta_type, + uint64 meta_addr, + uint64 target_extent_addr) +{ + platform_assert(cursor != NULL); + platform_assert(cc != NULL); + platform_assert(PAGE_TYPE_FIRST <= meta_type && meta_type < NUM_PAGE_TYPES); + platform_assert(meta_addr != 0); + platform_assert(target_extent_addr != 0); + platform_assert(target_extent_addr % cache_extent_size(cc) == 0); + + cursor->cc = cc; + cursor->meta_type = meta_type; + cursor->meta_page = NULL; + cursor->meta_addr = meta_addr; + cursor->entry_idx = 0; + cursor->num_entries = 0; + + cursor->meta_page = + cache_get(cursor->cc, cursor->meta_addr, FALSE, cursor->meta_type); + if (cursor->meta_page == NULL) { + cache_prefetch_page(cursor->cc, cursor->meta_addr, cursor->meta_type); + return MINI_META_CURSOR_WOULD_BLOCK; + } + + cursor->num_entries = mini_num_entries(cursor->meta_page); + cursor->entry_idx = 0; + while (cursor->entry_idx < cursor->num_entries) { + meta_entry *entry = first_entry(cursor->meta_page) + cursor->entry_idx; + if (meta_entry_extent_addr(cursor->cc, entry) == target_extent_addr) { + return MINI_META_CURSOR_ENTRY; + } + cursor->entry_idx++; + } + + platform_assert(FALSE, + "target extent %lu not found on meta page %lu", + target_extent_addr, + meta_addr); + return MINI_META_CURSOR_END; +} + +void +mini_meta_cursor_deinit(mini_meta_cursor *cursor) +{ + platform_assert(cursor != NULL); + + if (cursor->meta_page != NULL) { + cache_unget(cursor->cc, cursor->meta_page); + cursor->meta_page = NULL; + } + cursor->meta_addr = 0; + cursor->entry_idx = 0; + cursor->num_entries = 0; +} + +void +mini_meta_cursor_curr(mini_meta_cursor *cursor, + uint64 *extent_addr, + uint64 *batch) +{ + platform_assert(cursor != NULL); + platform_assert(cursor->cc != NULL); + platform_assert(extent_addr != NULL); + platform_assert(batch != NULL); + platform_assert(PAGE_TYPE_FIRST <= cursor->meta_type + && cursor->meta_type < NUM_PAGE_TYPES); + platform_assert(cursor->meta_page != NULL); + platform_assert(cursor->entry_idx < cursor->num_entries); + + meta_entry *entry = first_entry(cursor->meta_page) + cursor->entry_idx; + *extent_addr = meta_entry_extent_addr(cursor->cc, entry); + *batch = meta_entry_batch(entry); +} + +mini_meta_cursor_status +mini_meta_cursor_next(mini_meta_cursor *cursor) +{ + platform_assert(cursor != NULL); + platform_assert(cursor->cc != NULL); + platform_assert(PAGE_TYPE_FIRST <= cursor->meta_type + && cursor->meta_type < NUM_PAGE_TYPES); + platform_assert(cursor->meta_page != NULL); + platform_assert(cursor->entry_idx < cursor->num_entries); + + if (cursor->entry_idx + 1 < cursor->num_entries) { + cursor->entry_idx++; + return MINI_META_CURSOR_ENTRY; + } + + uint64 next_meta_addr = mini_get_next_meta_addr(cursor->meta_page); + if (next_meta_addr == 0) { + return MINI_META_CURSOR_END; + } + + page_handle *next_page = + cache_get(cursor->cc, next_meta_addr, FALSE, cursor->meta_type); + if (next_page == NULL) { + cache_prefetch_page(cursor->cc, next_meta_addr, cursor->meta_type); + return MINI_META_CURSOR_WOULD_BLOCK; + } + + cache_unget(cursor->cc, cursor->meta_page); + cursor->meta_page = next_page; + cursor->meta_addr = next_meta_addr; + cursor->num_entries = mini_num_entries(cursor->meta_page); + platform_assert(cursor->num_entries > 0); + cursor->entry_idx = 0; + return MINI_META_CURSOR_ENTRY; +} + +mini_meta_cursor_status +mini_meta_cursor_prev(mini_meta_cursor *cursor) +{ + platform_assert(cursor != NULL); + platform_assert(cursor->cc != NULL); + platform_assert(PAGE_TYPE_FIRST <= cursor->meta_type + && cursor->meta_type < NUM_PAGE_TYPES); + platform_assert(cursor->meta_page != NULL); + platform_assert(cursor->entry_idx < cursor->num_entries); + + if (cursor->entry_idx > 0) { + cursor->entry_idx--; + return MINI_META_CURSOR_ENTRY; + } + + mini_meta_hdr *hdr = (mini_meta_hdr *)cursor->meta_page->data; + uint64 prev_addr = hdr->prev_meta_addr; + if (prev_addr == 0) { + return MINI_META_CURSOR_END; + } + + page_handle *prev_page = + cache_get(cursor->cc, prev_addr, FALSE, cursor->meta_type); + if (prev_page == NULL) { + cache_prefetch_page(cursor->cc, prev_addr, cursor->meta_type); + return MINI_META_CURSOR_WOULD_BLOCK; + } + + cache_unget(cursor->cc, cursor->meta_page); + cursor->meta_page = prev_page; + cursor->meta_addr = prev_addr; + cursor->num_entries = mini_num_entries(cursor->meta_page); + platform_assert(cursor->num_entries > 0); + cursor->entry_idx = cursor->num_entries - 1; + return MINI_META_CURSOR_ENTRY; +} + static void space_use_add_extent(cache *cc, page_type type, uint64 extent_addr, void *out) { @@ -888,8 +1137,8 @@ mini_print(cache *cc, uint64 meta_head, page_type type) for (uint64 i = 0; i < num_entries; i++) { platform_default_log("| %3lu | %35lu | %s\n", i, - entry->extent_addr, - page_type_str[entry->type]); + meta_entry_extent_addr(cc, entry), + page_type_str[meta_entry_type(entry)]); entry = next_entry(entry); } platform_default_log("|-------------------------------------------|\n"); diff --git a/src/mini_allocator.h b/src/mini_allocator.h index f130ca78..1a7995b7 100644 --- a/src/mini_allocator.h +++ b/src/mini_allocator.h @@ -45,6 +45,12 @@ typedef struct mini_allocator { volatile uint64 next_addr[MINI_MAX_BATCHES]; uint64 saved_next_addr[MINI_MAX_BATCHES]; uint64 next_extent[MINI_MAX_BATCHES]; + // For each batch, the meta page that holds the entry for the extent the + // batch is currently allocating from. Lets a caller (e.g. the btree) record, + // in each page it allocates, where that page's extent is listed in the meta + // stream, so a prefetch cursor can start there without scanning from + // meta_head. See mini_current_extent_meta_page(). + uint64 cur_extent_meta_page[MINI_MAX_BATCHES]; } mini_allocator; uint64 @@ -107,6 +113,68 @@ mini_unblock_dec_ref(cache *cc, uint64 meta_head); void mini_prefetch(cache *cc, page_type type, uint64 meta_head); +/* + * mini_meta_cursor: a non-blocking cursor over the extent entries of a + * finalized mini_allocator. Entries from all batches are interleaved in + * allocation order; the caller filters by batch as needed (each entry reports + * its batch). The btree iterator uses this to read extent addresses ahead of or + * behind itself for prefetching. + * + * The cursor holds a read reference on the meta page it is currently reading; + * call mini_meta_cursor_deinit() to release it. The cursor is non-blocking: it + * reads meta pages with a non-blocking cache_get() and, on a miss, issues a + * single-page prefetch and reports MINI_META_CURSOR_WOULD_BLOCK so the caller + * can do other work and retry later (the meta page lands shortly). + */ +typedef struct mini_meta_cursor { + cache *cc; + page_type meta_type; + page_handle *meta_page; // currently held meta page, or NULL + uint64 meta_addr; // addr of meta_page, or the next page to load + uint64 entry_idx; // index of the current entry on meta_page + uint64 num_entries; // number of entries on meta_page +} mini_meta_cursor; + +// Result of a non-blocking cursor operation. +typedef enum mini_meta_cursor_status { + MINI_META_CURSOR_ENTRY, // cursor is positioned on an entry + MINI_META_CURSOR_END, // stream exhausted + MINI_META_CURSOR_WOULD_BLOCK, // needed meta page not resident (prefetch + // issued) +} mini_meta_cursor_status; + +// Initialize cursor on target_extent_addr, which must be listed on meta_addr. +// Non-blocking: returns MINI_META_CURSOR_WOULD_BLOCK (and issues a prefetch for +// it) if meta_addr is not yet resident. On MINI_META_CURSOR_ENTRY, curr is +// valid. Asserts if target_extent_addr is not listed on meta_addr. +mini_meta_cursor_status +mini_meta_cursor_init(mini_meta_cursor *cursor, + cache *cc, + page_type meta_type, + uint64 meta_addr, + uint64 target_extent_addr); + +void +mini_meta_cursor_deinit(mini_meta_cursor *cursor); + +// Get the current extent entry. Requires a successful init, next, or prev. +void +mini_meta_cursor_curr(mini_meta_cursor *cursor, + uint64 *extent_addr, + uint64 *batch); + +// Move to the next extent entry in allocation order. Non-blocking: returns +// MINI_META_CURSOR_WOULD_BLOCK (and issues a prefetch for it) if the next meta +// page is not yet resident. END and WOULD_BLOCK leave curr unchanged. +mini_meta_cursor_status +mini_meta_cursor_next(mini_meta_cursor *cursor); + +// Move to the previous extent entry in allocation order. Non-blocking: returns +// MINI_META_CURSOR_WOULD_BLOCK (and issues a prefetch for it) if the previous +// meta page is not yet resident. END and WOULD_BLOCK leave curr unchanged. +mini_meta_cursor_status +mini_meta_cursor_prev(mini_meta_cursor *cursor); + /* Return total bytes allocated by the mini_allocator, including space used by * the mini_allocator itself.*/ uint64 @@ -121,6 +189,21 @@ mini_meta_tail(mini_allocator *mini) return mini->meta_tail; } +/* + * Address of the meta page holding the extent entry for the extent that batch + * is currently allocating from. Valid immediately after an allocation from + * batch (e.g. mini_alloc_page), for the thread that performed it. + */ +static inline uint64 +mini_current_extent_meta_page(mini_allocator *mini, uint64 batch) +{ + platform_assert(mini != NULL); + platform_assert(batch < mini->num_batches); + platform_assert(mini->cur_extent_meta_page[batch] != 0); + + return mini->cur_extent_meta_page[batch]; +} + static inline uint64 mini_num_extents(mini_allocator *mini) diff --git a/src/prefetch.h b/src/prefetch.h new file mode 100644 index 00000000..329aa9dd --- /dev/null +++ b/src/prefetch.h @@ -0,0 +1,43 @@ +// Copyright 2026 VMware, Inc. +// SPDX-License-Identifier: Apache-2.0 + +/* + * prefetch.h -- + * + * Shared helpers for read-ahead policy. + */ + +#pragma once + +#include "platform_assert.h" + +/* + * Minimum deep-prefetch depth for one eligible stream. + */ +#define PREFETCH_MIN_EXTENT_LOOKAHEAD (2) + +/* + * Convert a soft byte budget into a per-stream extent lookahead. The budget is + * divided across the streams, but each active stream gets at least + * PREFETCH_MIN_EXTENT_LOOKAHEAD extents. With many streams, that minimum can + * intentionally exceed the byte budget; the budget is a read-ahead target, not + * a hard cap. + */ +static inline uint32 +prefetch_budget_to_extent_lookahead(uint64 extent_size, + uint64 prefetch_budget, + uint64 num_streams) +{ + platform_assert(extent_size != 0); + + if (prefetch_budget == 0 || num_streams == 0) { + return 0; + } + + uint64 budget_extents = prefetch_budget / extent_size; + uint64 per_stream = budget_extents / num_streams; + if (per_stream < PREFETCH_MIN_EXTENT_LOOKAHEAD) { + per_stream = PREFETCH_MIN_EXTENT_LOOKAHEAD; + } + return (uint32)per_stream; +} diff --git a/src/splinterdb.c b/src/splinterdb.c index 10addc51..b8614668 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -160,6 +160,9 @@ splinterdb_config_set_defaults(splinterdb_config *cfg) if (!cfg->reclaim_threshold) { cfg->reclaim_threshold = UINT64_MAX; } + if (!cfg->prefetch_budget) { + cfg->prefetch_budget = CORE_DEFAULT_PREFETCH_BUDGET; + } } static platform_status @@ -281,6 +284,7 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN cfg.memtable_capacity, cfg.fanout, cfg.btree_rough_count_height, + cfg.prefetch_budget, cfg.use_stats); rc = core_config_init(&kvs->trunk_cfg, @@ -290,6 +294,7 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN (log_config *)&kvs->log_cfg, &kvs->trunk_node_cfg, cfg.queue_scale_percent, + cfg.prefetch_budget, cfg.use_log, cfg.use_stats, FALSE, @@ -876,8 +881,7 @@ splinterdb_iterator_init_with_bounds(splinterdb *kvs, // IN max_key_comparison, max_key, start_key_comparison, - start_key, - UINT64_MAX); + start_key); if (!SUCCESS(rc)) { merge_accumulator_deinit(&it->materialized_message); platform_free(kvs->spl.heap_id, it); diff --git a/src/task.c b/src/task.c index dc298c14..f4e741db 100644 --- a/src/task.c +++ b/src/task.c @@ -40,9 +40,8 @@ task_tracker_list_init(task_tracker_list *list) void task_tracker_add(task_tracker *tracker) { - if (tracker != NULL) { - __sync_fetch_and_add(&tracker->outstanding, 1); - } + platform_assert(tracker != NULL); + __sync_fetch_and_add(&tracker->outstanding, 1); } static uint64 @@ -67,10 +66,6 @@ task_tracker_done(task_tracker *tracker, platform_status status, task_tracker_list *completed) { - if (tracker == NULL) { - return; - } - uint64 old_outstanding = tracker_done_common(tracker, status); if (old_outstanding == 1 && tracker->callback != NULL) { @@ -85,10 +80,6 @@ task_tracker_done(task_tracker *tracker, void task_tracker_done_but_not_last(task_tracker *tracker, platform_status status) { - if (tracker == NULL) { - return; - } - uint64 old_outstanding = tracker_done_common(tracker, status); platform_assert(1 < old_outstanding); } diff --git a/src/trunk.c b/src/trunk.c index d92e5200..133139a5 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -19,6 +19,7 @@ #include "data_internal.h" #include "task.h" #include "notification.h" +#include "prefetch.h" #include "poison.h" typedef VECTOR(routing_filter) routing_filter_vector; @@ -71,6 +72,17 @@ typedef enum bundle_compaction_phase { typedef VECTOR(trunk_branch_info) trunk_branch_info_vector; +typedef struct trunk_branch_merger { + platform_heap_id hid; + const trunk_config *cfg; + key min_key; + key max_key; + uint64 height; + trunk_branch_info_vector branches; + merge_iterator *merge_itor; + iterator_vector itors; +} trunk_branch_merger; + typedef struct bundle_compaction { struct bundle_compaction *next; task tsk; // bundle_comaction_task @@ -2307,63 +2319,42 @@ serialize_nodes(trunk_context *context, static void trunk_branch_merger_init(trunk_branch_merger *merger, platform_heap_id hid, - const data_config *data_cfg, + const trunk_config *cfg, key min_key, key max_key, uint64 height) { + platform_assert(cfg != NULL); + platform_assert(cfg->data_cfg != NULL); + platform_assert(cfg->btree_cfg != NULL); + platform_assert(cfg->btree_cfg->cache_cfg != NULL); + merger->hid = hid; - merger->data_cfg = data_cfg; + merger->cfg = cfg; merger->min_key = min_key; merger->max_key = max_key; merger->height = height; merger->merge_itor = NULL; + vector_init(&merger->branches, hid); vector_init(&merger->itors, hid); } static platform_status trunk_branch_merger_add_branch(trunk_branch_merger *merger, - cache *cc, - const btree_config *btree_cfg, uint64 addr, page_type type) { - btree_iterator *iter = TYPED_MALLOC(merger->hid, iter); - if (iter == NULL) { - platform_error_log( - "%s():%d: platform_malloc() failed", __func__, __LINE__); - return STATUS_NO_MEMORY; - } - platform_status rc = btree_iterator_init(cc, - btree_cfg, - iter, - addr, - type, - greater_than_or_equal, - merger->min_key, - less_than, - merger->max_key, - greater_than_or_equal, - merger->min_key, - TRUE, - FALSE, - merger->height); - if (!SUCCESS(rc)) { - platform_error_log("%s():%d: btree_iterator_init() failed: %s", - __func__, - __LINE__, - platform_status_to_string(rc)); - platform_free(merger->hid, iter); - return rc; - } - rc = vector_append(&merger->itors, (iterator *)iter); + platform_assert(merger != NULL); + platform_assert(addr != 0); + platform_assert(PAGE_TYPE_FIRST <= type && type < NUM_PAGE_TYPES); + + trunk_branch_info branch = {addr, type}; + platform_status rc = vector_append(&merger->branches, branch); if (!SUCCESS(rc)) { platform_error_log("%s():%d: vector_append() failed: %s", __func__, __LINE__, platform_status_to_string(rc)); - btree_iterator_deinit(iter); - platform_free(merger->hid, iter); } return rc; } @@ -2371,13 +2362,14 @@ trunk_branch_merger_add_branch(trunk_branch_merger *merger, static platform_status trunk_branch_merger_add_branches(trunk_branch_merger *merger, - cache *cc, - const btree_config *btree_cfg, uint64 num_branches, const trunk_branch_info *branches) { + platform_assert(merger != NULL); + platform_assert(branches != NULL || num_branches == 0); + platform_status rc = vector_ensure_capacity( - &merger->itors, vector_length(&merger->itors) + num_branches); + &merger->branches, vector_length(&merger->branches) + num_branches); if (!SUCCESS(rc)) { platform_error_log("%s():%d: vector_ensure_capacity() failed: %s", __func__, @@ -2388,7 +2380,7 @@ trunk_branch_merger_add_branches(trunk_branch_merger *merger, for (uint64 i = 0; i < num_branches; i++) { rc = trunk_branch_merger_add_branch( - merger, cc, btree_cfg, branches[i].addr, branches[i].type); + merger, branches[i].addr, branches[i].type); if (!SUCCESS(rc)) { platform_error_log("%s():%d: btree_merger_add_branch() failed: %s", __func__, @@ -2402,13 +2394,14 @@ trunk_branch_merger_add_branches(trunk_branch_merger *merger, static platform_status trunk_branch_merger_add_bundle(trunk_branch_merger *merger, - cache *cc, - const btree_config *btree_cfg, const bundle *routed) { + platform_assert(merger != NULL); + platform_assert(routed != NULL); + platform_status rc = vector_ensure_capacity( - &merger->itors, - vector_length(&merger->itors) + bundle_num_branches(routed)); + &merger->branches, + vector_length(&merger->branches) + bundle_num_branches(routed)); if (!SUCCESS(rc)) { platform_error_log("%s():%d: vector_ensure_capacity() failed: %s", __func__, @@ -2419,11 +2412,8 @@ trunk_branch_merger_add_bundle(trunk_branch_merger *merger, for (uint64 i = 0; i < bundle_num_branches(routed); i++) { branch_ref bref = vector_get(&routed->branches, i); - rc = trunk_branch_merger_add_branch(merger, - cc, - btree_cfg, - branch_ref_addr(bref), - bundle_branch_type(routed)); + rc = trunk_branch_merger_add_branch( + merger, branch_ref_addr(bref), bundle_branch_type(routed)); if (!SUCCESS(rc)) { platform_error_log("%s():%d: btree_merger_add_branch() failed: %s", __func__, @@ -2437,13 +2427,79 @@ trunk_branch_merger_add_bundle(trunk_branch_merger *merger, static platform_status trunk_branch_merger_build_merge_itor(trunk_branch_merger *merger, + cache *cc, merge_behavior merge_mode) { + platform_assert(merger != NULL); + platform_assert(cc != NULL); platform_assert(merger->merge_itor == NULL); + // A compaction/leaf-split merge reads each input branch end to end, so give + // the branches a soft share of the read-ahead budget. + uint64 num_branches = vector_length(&merger->branches); + uint64 extent_size = + cache_config_extent_size(merger->cfg->btree_cfg->cache_cfg); + uint32 lookahead = prefetch_budget_to_extent_lookahead( + extent_size, merger->cfg->prefetch_budget, num_branches); + if (lookahead == 0) { + lookahead = 1; + } + + platform_status rc = vector_ensure_capacity(&merger->itors, num_branches); + if (!SUCCESS(rc)) { + platform_error_log("%s():%d: vector_ensure_capacity() failed: %s", + __func__, + __LINE__, + platform_status_to_string(rc)); + return rc; + } + + for (uint64 i = 0; i < num_branches; i++) { + trunk_branch_info branch = vector_get(&merger->branches, i); + btree_iterator *itor = TYPED_MALLOC(merger->hid, itor); + if (itor == NULL) { + platform_error_log( + "%s():%d: platform_malloc() failed", __func__, __LINE__); + return STATUS_NO_MEMORY; + } + rc = btree_iterator_init(cc, + merger->cfg->btree_cfg, + itor, + branch.addr, + branch.type, + greater_than_or_equal, + merger->min_key, + less_than, + merger->max_key, + greater_than_or_equal, + merger->min_key, + FALSE, + merger->height, + lookahead); + if (!SUCCESS(rc)) { + platform_error_log("%s():%d: btree_iterator_init() failed: %s", + __func__, + __LINE__, + platform_status_to_string(rc)); + platform_free(merger->hid, itor); + return rc; + } + + rc = vector_append(&merger->itors, (iterator *)itor); + if (!SUCCESS(rc)) { + platform_error_log("%s():%d: vector_append() failed: %s", + __func__, + __LINE__, + platform_status_to_string(rc)); + btree_iterator_deinit(itor); + platform_free(merger->hid, itor); + return rc; + } + } + return merge_iterator_create(merger->hid, - merger->data_cfg, - vector_length(&merger->itors), + merger->cfg->data_cfg, + num_branches, vector_data(&merger->itors), merge_mode, TRUE, @@ -2453,7 +2509,7 @@ trunk_branch_merger_build_merge_itor(trunk_branch_merger *merger, static platform_status trunk_branch_merger_deinit(trunk_branch_merger *merger) { - platform_status rc; + platform_status rc = STATUS_OK; if (merger->merge_itor != NULL) { rc = merge_iterator_destroy(merger->hid, &merger->merge_itor); } @@ -2464,6 +2520,7 @@ trunk_branch_merger_deinit(trunk_branch_merger *merger) platform_free(merger->hid, itor); } vector_deinit(&merger->itors); + vector_deinit(&merger->branches); return rc; } @@ -2776,10 +2833,12 @@ bundle_compaction_destroy(bundle_compaction *compaction, PAGE_TYPE_BRANCH); } - task_tracker_done( - compaction->tracker, - bundle_compaction_notify_status(compaction, maplet_compaction_rc), - completed); + if (compaction->tracker != NULL) { + task_tracker_done( + compaction->tracker, + bundle_compaction_notify_status(compaction, maplet_compaction_rc), + completed); + } platform_free(context->hid, compaction); } @@ -3943,13 +4002,11 @@ bundle_compaction_task(task *arg) trunk_branch_merger merger; trunk_branch_merger_init(&merger, PROCESS_PRIVATE_HEAP_ID, - context->cfg->data_cfg, + context->cfg, key_buffer_key(&state->key), key_buffer_key(&state->ubkey), 0); rc = trunk_branch_merger_add_branches(&merger, - context->cc, - context->cfg->btree_cfg, vector_length(&bc->input.branches), vector_data(&bc->input.branches)); if (!SUCCESS(rc)) { @@ -3976,7 +4033,8 @@ bundle_compaction_task(task *arg) goto cleanup_branch_merger; } - rc = trunk_branch_merger_build_merge_itor(&merger, bc->input.merge_mode); + rc = trunk_branch_merger_build_merge_itor( + &merger, context->cc, bc->input.merge_mode); if (!SUCCESS(rc)) { platform_error_log( "branch_merger_build_merge_itor failed for state: %p bc: %p: %s\n", @@ -4099,7 +4157,9 @@ enqueue_bundle_compaction(trunk_context *context, rc = STATUS_NO_MEMORY; goto next; } - task_tracker_add(tracker); + if (tracker != NULL) { + task_tracker_add(tracker); + } trunk_pivot_state_incref(state); @@ -4110,8 +4170,8 @@ enqueue_bundle_compaction(trunk_context *context, &bc->tsk, bundle_compaction_task, FALSE); - // Upon success, the trunk_pivot_state_incref and task_tracker_add are - // passed to the task + // Upon success, the trunk_pivot_state_incref and optional + // task_tracker_add are passed to the task. if (!SUCCESS(rc)) { trunk_pivot_state_decref(state); // undoes trunk_pivot_state_incref @@ -4548,14 +4608,12 @@ leaf_split_select_pivots(trunk_context *context, trunk_branch_merger merger; trunk_branch_merger_init(&merger, PROCESS_PRIVATE_HEAP_ID, - context->cfg->data_cfg, + context->cfg, min_key, max_key, context->cfg->branch_rough_count_height); rc = trunk_branch_merger_add_bundle(&merger, - context->cc, - context->cfg->btree_cfg, vector_get_ptr(&leaf->pivot_bundles, 0)); if (!SUCCESS(rc)) { platform_error_log("leaf_split_select_pivots: " @@ -4569,8 +4627,7 @@ leaf_split_select_pivots(trunk_context *context, bundle_num++) { bundle *bndl = vector_get_ptr(&leaf->inflight_bundles, bundle_num); - rc = trunk_branch_merger_add_bundle( - &merger, context->cc, context->cfg->btree_cfg, bndl); + rc = trunk_branch_merger_add_bundle(&merger, bndl); if (!SUCCESS(rc)) { platform_error_log("leaf_split_select_pivots: " "branch_merger_add_bundle failed: %d\n", @@ -4579,7 +4636,7 @@ leaf_split_select_pivots(trunk_context *context, } } - rc = trunk_branch_merger_build_merge_itor(&merger, MERGE_RAW); + rc = trunk_branch_merger_build_merge_itor(&merger, context->cc, MERGE_RAW); if (!SUCCESS(rc)) { platform_error_log("leaf_split_select_pivots: " "branch_merger_build_merge_itor failed: %d\n", @@ -5720,7 +5777,9 @@ trunk_flush_cleanup(trunk_context *context, task_tracker *tracker) &context->tasks, context, tracker, &completed); incorporation_tasks_deinit(&context->tasks, context); trunk_modification_end(context); - task_tracker_done(tracker, rc, &completed); + if (tracker != NULL) { + task_tracker_done(tracker, rc, &completed); + } task_tracker_notify_all(&completed); } @@ -6625,6 +6684,7 @@ trunk_config_init(trunk_config *config, uint64 incorporation_size_kv_bytes, uint64 target_fanout, uint64 branch_rough_count_height, + uint64 prefetch_budget, bool32 use_stats) { config->data_cfg = data_cfg; @@ -6633,6 +6693,7 @@ trunk_config_init(trunk_config *config, config->incorporation_size_kv_bytes = incorporation_size_kv_bytes; config->target_fanout = target_fanout; config->branch_rough_count_height = branch_rough_count_height; + config->prefetch_budget = prefetch_budget; config->use_stats = use_stats; } diff --git a/src/trunk.h b/src/trunk.h index c4c82956..7df20d09 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -26,6 +26,7 @@ typedef struct trunk_config { uint64 incorporation_size_kv_bytes; uint64 target_fanout; uint64 branch_rough_count_height; + uint64 prefetch_budget; // soft read-ahead bytes per merge bool32 use_stats; } trunk_config; @@ -180,16 +181,6 @@ typedef struct trunk_ondisk_node_handle { page_handle *inflight_bundle_page; } trunk_ondisk_node_handle; -typedef struct trunk_branch_merger { - platform_heap_id hid; - const data_config *data_cfg; - key min_key; - key max_key; - uint64 height; - merge_iterator *merge_itor; - iterator_vector itors; -} trunk_branch_merger; - /******************************** * Lifecycle ********************************/ @@ -202,6 +193,7 @@ trunk_config_init(trunk_config *config, uint64 incorporation_size_kv_bytes, uint64 target_fanout, uint64 branch_rough_count_height, + uint64 prefetch_budget, bool32 use_stats); platform_status diff --git a/tests/config.c b/tests/config.c index 2fe9a5f5..1f2f873d 100644 --- a/tests/config.c +++ b/tests/config.c @@ -90,6 +90,7 @@ config_set_defaults(master_config *cfg) .use_stats = FALSE, .reclaim_threshold = UINT64_MAX, .queue_scale_percent = TEST_CONFIG_DEFAULT_QUEUE_SCALE_PERCENT, + .prefetch_budget = MiB_TO_B(1), .verbose_logging_enabled = FALSE, .verbose_progress = FALSE, @@ -380,6 +381,9 @@ config_parse(master_config *cfg, const uint8 num_config, int argc, char *argv[]) config_set_gib("cache-capacity", cfg, cache_capacity) {} config_set_string("cache-debug-log", cfg, cache_logfile) {} config_set_uint64("queue-scale-percent", cfg, queue_scale_percent) {} + config_set_mib("prefetch-budget", cfg, prefetch_budget) {} + config_set_gib("prefetch-budget", cfg, prefetch_budget) {} + config_set_uint64("prefetch-budget-bytes", cfg, prefetch_budget) {} config_set_mib("memtable-capacity", cfg, memtable_capacity) {} config_set_gib("memtable-capacity", cfg, memtable_capacity) {} config_set_uint64("rough-count-height", cfg, btree_rough_count_height) diff --git a/tests/config.h b/tests/config.h index 2c474d83..089f06a1 100644 --- a/tests/config.h +++ b/tests/config.h @@ -96,6 +96,9 @@ typedef struct master_config { platform_log_handle *log_handle; + // prefetch + uint64 prefetch_budget; + // data uint64 key_size; uint64 message_size; diff --git a/tests/functional/btree_test.c b/tests/functional/btree_test.c index 7705a2d1..2511e615 100644 --- a/tests/functional/btree_test.c +++ b/tests/functional/btree_test.c @@ -35,6 +35,7 @@ typedef struct btree_scan_perf_options { bool32 random_bounds; bool32 memtable_no_copy_nodes; bool32 memtable_copy_nodes; + uint32 prefetch_lookahead; // 0=off, 1=next extent, >=2 deep } btree_scan_perf_options; static const char * @@ -365,6 +366,7 @@ test_btree_scan_once(cache *cc, key min_key, key max_key, bool32 copy_nodes, + uint32 prefetch_lookahead, uint64 expected_tuples, uint64 *init_elapsed_ns, uint64 *scan_elapsed_ns, @@ -384,9 +386,9 @@ test_btree_scan_once(cache *cc, max_key, greater_than_or_equal, min_key, - FALSE, copy_nodes, - 0); + 0, + prefetch_lookahead); *init_elapsed_ns += platform_timestamp_elapsed(start_time); if (!SUCCESS(rc)) { return rc; @@ -540,6 +542,7 @@ test_btree_scan_benchmark_tree(cache *cc, min_key, max_key, copy_nodes, + options->prefetch_lookahead, expected_tuples, &init_elapsed_ns, &scan_elapsed_ns, @@ -683,7 +686,7 @@ test_btree_scan_perf(cache *cc, greater_than_or_equal, NEGATIVE_INFINITY_KEY, FALSE, - FALSE, + 0, 0); if (!SUCCESS(rc)) { goto out; @@ -1080,7 +1083,7 @@ test_btree_basic(cache *cc, greater_than_or_equal, NEGATIVE_INFINITY_KEY, FALSE, - FALSE, + 0, 0); platform_assert_status_ok(rc); platform_default_log("btree iterator init time %luns\n", @@ -1265,7 +1268,7 @@ test_btree_create_packed_trees(cache *cc, greater_than_or_equal, NEGATIVE_INFINITY_KEY, FALSE, - FALSE, + 0, 0); platform_assert_status_ok(rc); @@ -1322,9 +1325,9 @@ test_count_tuples_in_range(cache *cc, high_key, greater_than_or_equal, low_key, - TRUE, FALSE, - 0); + 0, + 1); if (!SUCCESS(rc)) { return rc; } @@ -1421,9 +1424,9 @@ test_btree_print_all_keys(cache *cc, high_key, greater_than_or_equal, low_key, - TRUE, FALSE, - 0); + 0, + 1); platform_assert_status_ok(rc); while (iterator_can_curr(&itor.super)) { key curr_key; @@ -1500,9 +1503,9 @@ test_btree_merge_basic(cache *cc, hi, greater_than_or_equal, lo, - TRUE, FALSE, - 0); + 0, + 1); platform_assert_status_ok(rc); itor_arr[tree_no] = &btree_itor_arr[tree_no].super; } @@ -1724,7 +1727,7 @@ test_btree_rough_iterator(cache *cc, greater_than_or_equal, NEGATIVE_INFINITY_KEY, TRUE, - TRUE, + 1, 1); platform_assert_status_ok(rc); if (iterator_can_curr(&rough_btree_itor[tree_no].super)) { @@ -1889,9 +1892,9 @@ test_btree_merge_perf(cache *cc, max_key, greater_than_or_equal, min_key, - TRUE, FALSE, - 0); + 0, + 1); platform_assert_status_ok(rc); itor_arr[tree_no] = &btree_itor_arr[tree_no].super; } @@ -1961,6 +1964,7 @@ btree_scan_perf_options_default(btree_scan_perf_options *options) .random_bounds = FALSE, .memtable_no_copy_nodes = TRUE, .memtable_copy_nodes = TRUE, + .prefetch_lookahead = 0, }; } @@ -2018,6 +2022,15 @@ btree_scan_perf_parse_args(int argc, platform_free(platform_get_heap_id(), filtered); return STATUS_BAD_PARAM; } + } else if (STRING_EQUALS_LITERAL(argv[i], "--prefetch-lookahead")) { + uint64 lookahead; + if (i + 1 == argc || !try_string_to_uint64(argv[++i], &lookahead)) { + platform_error_log( + "btree_test: failed to parse --prefetch-lookahead\n"); + platform_free(platform_get_heap_id(), filtered); + return STATUS_BAD_PARAM; + } + options->prefetch_lookahead = (uint32)lookahead; } else if (STRING_EQUALS_LITERAL(argv[i], "--random-scan-bounds") || STRING_EQUALS_LITERAL(argv[i], "--random-scan-starts")) { @@ -2087,6 +2100,8 @@ usage(const char *argv0) "for each scan\n"); platform_error_log("\t--memtable-scan-mode choose which memtable " "iterator mode(s) to benchmark (default both)\n"); + platform_error_log("\t--prefetch-lookahead extents to prefetch ahead " + "(0=off, 1=next extent, >=2 deep; default 0)\n"); config_usage(); } diff --git a/tests/functional/scan_benchmark.c b/tests/functional/scan_benchmark.c index 444acdb8..dc8f2a9e 100644 --- a/tests/functional/scan_benchmark.c +++ b/tests/functional/scan_benchmark.c @@ -18,6 +18,7 @@ typedef enum scan_benchmark_mode { SCAN_BENCHMARK_LOAD_AND_SCAN, SCAN_BENCHMARK_INIT_ONLY, SCAN_BENCHMARK_SCAN_ONLY, + SCAN_BENCHMARK_OPTIMIZE_ONLY, } scan_benchmark_mode; typedef struct scan_benchmark_options { @@ -117,6 +118,10 @@ scan_benchmark_parse_args(int argc, return STATUS_BAD_PARAM; } options->mode = SCAN_BENCHMARK_SCAN_ONLY; + } else if (STRING_EQUALS_LITERAL(argv[i], "--optimize-only")) { + // Open an existing DB (cold cache) and time a blocking full-leaf + // optimize -- a compaction-throughput benchmark. + options->mode = SCAN_BENCHMARK_OPTIMIZE_ONLY; } else if (STRING_EQUALS_LITERAL(argv[i], "--random-load-order")) { options->random_load_order = TRUE; } else if (STRING_EQUALS_LITERAL(argv[i], "--splinter-random-keys")) { @@ -408,6 +413,7 @@ scan_benchmark_make_config(const master_config *master_cfg, .use_stats = master_cfg->use_stats, .reclaim_threshold = master_cfg->reclaim_threshold, .queue_scale_percent = master_cfg->queue_scale_percent, + .prefetch_budget = master_cfg->prefetch_budget, }; if (open_existing) { @@ -487,6 +493,43 @@ scan_benchmark_load_database(const splinterdb_config *cfg, return rc; } +/* + * Compaction-throughput benchmark: open an existing DB (cold cache) and time a + * single blocking full-leaf optimize over the whole key range. With data > + * cache and O_DIRECT, the branch reads done by compaction's merge iterators are + * cold, so this exercises the btree-iterator prefetch path under compaction. + */ +static int +scan_benchmark_run_optimize(const splinterdb_config *cfg) +{ + splinterdb *kvs = NULL; + int rc = splinterdb_open(cfg, &kvs); + if (rc != 0) { + return rc; + } + + io_reset_stats((io_handle *)splinterdb_get_io_handle(kvs)); + + splinterdb_notification notification; + splinterdb_notification_init_blocking(¬ification); + + platform_default_log( + "scan_benchmark: running blocking full-leaf optimize\n"); + timestamp start_time = platform_get_timestamp(); + rc = splinterdb_optimize(kvs, NULL_SLICE, NULL_SLICE, TRUE, ¬ification); + uint64 elapsed_ns = platform_timestamp_elapsed(start_time); + splinterdb_notification_deinit(¬ification); + + platform_default_log("optimize complete: rc=%d, %.3fs elapsed\n", + rc, + (double)elapsed_ns / BILLION); + io_print_stats((io_handle *)splinterdb_get_io_handle(kvs), + Platform_default_log_handle); + + splinterdb_close(&kvs); + return rc; +} + static int scan_benchmark_run_scan(const splinterdb_config *cfg, bool print_lookup_stats, @@ -782,7 +825,9 @@ scan_benchmark(int argc, char *argv[]) goto out; } - if (options.mode != SCAN_BENCHMARK_SCAN_ONLY && master_cfg.num_inserts == 0) + if (options.mode != SCAN_BENCHMARK_SCAN_ONLY + && options.mode != SCAN_BENCHMARK_OPTIMIZE_ONLY + && master_cfg.num_inserts == 0) { platform_error_log( "scan_benchmark: --num-inserts must be set for load modes\n"); @@ -818,7 +863,9 @@ scan_benchmark(int argc, char *argv[]) options.backwards_scan, master_cfg.seed); - if (options.mode != SCAN_BENCHMARK_SCAN_ONLY) { + if (options.mode == SCAN_BENCHMARK_LOAD_AND_SCAN + || options.mode == SCAN_BENCHMARK_INIT_ONLY) + { scan_benchmark_make_config(&master_cfg, &default_data_cfg, &cfg, FALSE); rc = scan_benchmark_load_database(&cfg, master_cfg.num_inserts, @@ -832,8 +879,10 @@ scan_benchmark(int argc, char *argv[]) } scan_benchmark_make_config(&master_cfg, &default_data_cfg, &cfg, TRUE); - if (options.scan_count == 1 && options.scan_length == 0 - && !options.random_scan_starts) + if (options.mode == SCAN_BENCHMARK_OPTIMIZE_ONLY) { + rc = scan_benchmark_run_optimize(&cfg); + } else if (options.scan_count == 1 && options.scan_length == 0 + && !options.random_scan_starts) { rc = scan_benchmark_run_scan(&cfg, master_cfg.use_stats, diff --git a/tests/functional/test.h b/tests/functional/test.h index 15fd7eb8..82ab07ce 100644 --- a/tests/functional/test.h +++ b/tests/functional/test.h @@ -296,6 +296,7 @@ test_config_init(system_config *system_cfg, // OUT master_cfg->memtable_capacity, master_cfg->fanout, master_cfg->btree_rough_count_height, + master_cfg->prefetch_budget, master_cfg->use_stats); rc = core_config_init(&system_cfg->splinter_cfg, @@ -305,6 +306,7 @@ test_config_init(system_config *system_cfg, // OUT (log_config *)&system_cfg->log_cfg, &system_cfg->trunk_node_cfg, master_cfg->queue_scale_percent, + master_cfg->prefetch_budget, master_cfg->use_log, master_cfg->use_stats, master_cfg->verbose_logging_enabled, diff --git a/tests/functional/test_functionality.c b/tests/functional/test_functionality.c index ab32d4b7..7cac864b 100644 --- a/tests/functional/test_functionality.c +++ b/tests/functional/test_functionality.c @@ -40,8 +40,7 @@ search_for_key_via_iterator(core_handle *spl, key target) less_than, POSITIVE_INFINITY_KEY, greater_than_or_equal, - NEGATIVE_INFINITY_KEY, - UINT64_MAX); + NEGATIVE_INFINITY_KEY); uint64 count = 0; while (iterator_can_curr((iterator *)&iter)) { key curr_key; @@ -248,8 +247,7 @@ verify_range_against_shadow(core_handle *spl, less_than, end_key, greater_than_or_equal, - start_key, - end_index - start_index); + start_key); if (!SUCCESS(status)) { platform_error_log("failed to create range itor: %s\n", platform_status_to_string(status)); diff --git a/tests/unit/btree_stress_test.c b/tests/unit/btree_stress_test.c index 0182208c..96dcfbf5 100644 --- a/tests/unit/btree_stress_test.c +++ b/tests/unit/btree_stress_test.c @@ -708,7 +708,7 @@ iterator_tests(cache *cc, greater_than_or_equal, start_key, FALSE, - FALSE, + 0, 0); ASSERT_TRUE(SUCCESS(rc)); @@ -761,7 +761,7 @@ iterator_seek_tests(cache *cc, greater_than_or_equal, start_key, FALSE, - FALSE, + 0, 0); ASSERT_TRUE(SUCCESS(rc)); iterator *iter = (iterator *)&dbiter; @@ -809,7 +809,7 @@ pack_tests(cache *cc, greater_than_or_equal, NEGATIVE_INFINITY_KEY, FALSE, - FALSE, + 0, 0); ASSERT_TRUE(SUCCESS(rc)); diff --git a/tests/unit/btree_test.c b/tests/unit/btree_test.c index 35f59259..e811728c 100644 --- a/tests/unit/btree_test.c +++ b/tests/unit/btree_test.c @@ -188,6 +188,7 @@ leaf_hdr_tests(btree_config *cfg, btree_scratch *scratch, platform_heap_id hid) * about this number. If you change the size of a btree leaf header * or the size of a btree leafy entry, then this number will need * to be changed, and that's fine. + * (The header currently has room for 208 such entries.) */ int nkvs = 208;