Skip to content

feat(stream): support XDELEX command#3502

Open
kirito632 wants to merge 6 commits into
apache:unstablefrom
kirito632:feat/stream-xdelex
Open

feat(stream): support XDELEX command#3502
kirito632 wants to merge 6 commits into
apache:unstablefrom
kirito632:feat/stream-xdelex

Conversation

@kirito632

Copy link
Copy Markdown
Contributor

What

This PR introduces the XDELEX stream command with configurable
PEL cleanup behaviors.

It also fixes an existing issue in XINFO CONSUMERS where consumer metadata
from different groups could leak into the same scan result.

XINFO CONSUMERS Fix

Root cause:

GetConsumerInfo() used iterator bounds based only on the stream version range
([version, version+1)).

Since the InternalKey encoding places the version field before the sub-key,
consumer records from other groups could still fall within the same iteration
range.

Fix:

Extract and validate the group name from each internal key during iteration,
filtering out unrelated consumers.

XDELEX

Supported deletion strategies:

KEEPREF (default):
Delete the stream entry only.
DELREF:
Delete the stream entry and remove it from all groups' PELs.
ACKED:
Delete the stream entry only if it has been acknowledged by all consumer groups.

Per-ID return codes:

1: entry deleted
2: entry retained (e.g. ACKED condition not satisfied)
-1: entry not found or duplicate ID within the same request

Implementation Notes

Pending-number decrements are aggregated in memory and flushed through
a single WriteBatch to avoid repeated metadata updates during DELREF operations.
Duplicate IDs are deduplicated to preserve idempotency and prevent
stream metadata corruption.
The originating command name is propagated into WriteBatchExtractor
so replicas replay the exact same deletion semantics.
Internal stream metadata and PEL delete events are filtered out in
WriteBatchExtractor to avoid generating invalid replicated XDEL commands.

Testing

Added Go integration tests covering:

multi-group DELREF cleanup
ACKED blocking semantics
duplicate-ID idempotency
invalid syntax handling
XINFO CONSUMERS group isolation

AI-Assisted Contribution Disclosure

This contribution complies with the ASF AI-assisted contribution guidelines.

AI tools were used to assist with English phrasing and some test scaffolding.
The core C++ implementation, debugging, correctness analysis, and final verification
were independently completed by me.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new stream deletion command XDELEX with configurable deletion/PEL-cleanup semantics, and fixes XINFO CONSUMERS so consumer metadata from other groups doesn’t leak into the scan results.

Changes:

  • Implement XDELEX with KEEPREF (default), DELREF, and ACKED strategies, including replica replay support via WriteBatchLogData.
  • Fix XINFO CONSUMERS iteration to filter consumers by group name, preventing cross-group leakage.
  • Add Go integration tests and a C++ unit test for new deletion behaviors and parsing edge cases.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/gocase/unit/type/stream/stream_test.go Adds Go integration coverage for XDELEX behaviors and error handling.
tests/cppunit/types/stream_test.cc Adds a C++ unit test ensuring empty ID list handling for the new deletion API.
src/types/redis_stream.h Exposes DeleteEntriesWithOption and helper declarations for XDELEX implementation.
src/types/redis_stream.cc Implements DeleteEntriesWithOption plus PEL cleanup and ACKED semantics; fixes GetConsumerInfo group isolation.
src/types/redis_stream_base.h Introduces enums for delete options and per-ID deletion result codes.
src/storage/batch_extractor.cc Ensures replication replays XDELEX semantics and filters internal stream/PEL/meta events.
src/commands/cmd_stream.cc Adds the xdelex command parser/executor and registers the command.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/types/redis_stream.cc
Comment on lines +684 to +707
if (first_deleted) {
iter->SeekToFirst();
while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) {
iter->Next();
}
if (iter->Valid()) {
metadata.first_entry_id = entryIDFromInternalKey(iter->key());
metadata.recorded_first_entry_id = metadata.first_entry_id;
} else {
metadata.first_entry_id.Clear();
metadata.recorded_first_entry_id.Clear();
}
}
if (last_deleted) {
iter->SeekToLast();
while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) {
iter->Prev();
}
if (iter->Valid()) {
metadata.last_entry_id = entryIDFromInternalKey(iter->key());
} else {
metadata.last_entry_id.Clear();
}
}
Comment thread src/types/redis_stream.h
Comment on lines +25 to 32
#include <map>
#include <optional>
#include <string>
#include <vector>

#include "common/db_util.h"
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
Comment thread src/commands/cmd_stream.cc Outdated
Comment on lines +304 to +312
for (int64_t i = 0; i < numids; i++) {
auto id_str = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseStreamEntryID(id_str, &id);
if (!s.IsOK()) return s;
entry_ids_.emplace_back(id);
}

return Status::OK();
@jihuayu

jihuayu commented May 27, 2026

Copy link
Copy Markdown
Member

Hi @kirito632. Thanks for your PR.

I reviewed your PR once using my AI tools (Codex / GPT-5.5 xhigh), and it found quite a few issues (I’m not going to reveal these things. I want you to try and see whether you can find them on your own.).
This is not meant as criticism — I just want to share my workflow with you. This can help the PR get merged faster.

I hope you can also try using an AI tool locally to review your PR once before submitting it. Below is the prompt I use. It is written in Chinese, but you can translate it into the language you use and run it there:

请你审核一下这个PR是否符合redis和kvrocks规范,代码是否清晰,命名是否清晰

If you have any other questions, feel free to @ me directly. Hope this helps you.

@kirito632

Copy link
Copy Markdown
Contributor Author

Thanks a lot for the detailed feedback and for sharing your workflow. I’m going through the current PR again to identify and fix the remaining issues on my own first.Really appreciate the guidance and the time you spent reviewing this PR.

@kirito632 kirito632 force-pushed the feat/stream-xdelex branch 2 times, most recently from d2839d9 to 649043e Compare May 29, 2026 13:03
@kirito632

Copy link
Copy Markdown
Contributor Author

Sorry for another force-push. I've added two missing checks:
Get key_slot_id from user_key, and if slot_range_.IsValid() and key_slot_id is not in range, return rocksdb::Status::OK() immediately.
Extract namespace ns from ikey.

@kirito632 kirito632 force-pushed the feat/stream-xdelex branch from 649043e to 0ddf4de Compare May 30, 2026 12:30
@kirito632

Copy link
Copy Markdown
Contributor Author

Hi @jihuayu ,I took a look at the CI failures and they don't appear to be directly related to the changes in this PR.These might be flaky failures. Could you please help rerun those failed tests?Thanks.

@jihuayu

jihuayu commented Jun 1, 2026

Copy link
Copy Markdown
Member

Could you please help rerun those failed tests?Thanks.

Done. When you need a review, please @ me.

@kirito632

Copy link
Copy Markdown
Contributor Author

@jihuayu , I investigated the coverage report after adding the new XDELEX integration tests.

The Go tests now cover:

  • KEEPREF / DELREF / ACKED
  • multi-ID requests
  • mixed deleted/skipped results
  • multiple consumer groups
  • dangling PEL cleanup
  • ACKED semantics with groups created using $
  • non-existent stream handling
  • POLLUPDATES propagation

However, cmd_stream.cc and batch_extractor.cc still appear as 0% covered
in the Sonar report, even though the integration tests must pass through
those layers in order to reach redis_stream.cc.

This makes me wonder whether the issue is related to coverage collection
from the integration-test process rather than missing functional test
coverage.

Do you think additional C++ unit tests are still expected here, or should
we investigate the coverage reporting path first?

@jihuayu

jihuayu commented Jun 3, 2026

Copy link
Copy Markdown
Member

Thanks for waiting! This is a bit weird. Since you only added the instructions, go test should theoretically hit most of the branches, so the coverage shouldn't be this low. That said, Sonar is sometimes inaccurate with coverage stats because it pulls in useless files. Let me check it out.

@jihuayu

jihuayu commented Jun 6, 2026

Copy link
Copy Markdown
Member

I’ve confirmed that the code coverage issue is related to CI. I’ve been busy working on the CI these days. Sorry, the review may still need to wait a few more days.

PS: I asked Copilot to review the code. It is not 100% correct, so feel free to ignore its comments if you disagree.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Comment on lines 33 to 48
void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
// Currently, we only have two kinds of log data
if (ServerLogData::IsServerLogData(blob.data())) {
ServerLogData server_log;
if (server_log.Decode(blob).IsOK()) {
// We don't handle server log currently
}
} else {
// Redis type log data
if (auto s = log_data_.Decode(blob); !s.IsOK()) {
WARN("Failed to decode Redis type log: {}", s.Msg());
} else {
seen_xdelex_entry_keys_.clear();
}
}
}
Comment thread src/types/redis_stream.cc
Comment on lines +478 to +480
auto group_meta = decodeStreamConsumerGroupMetadataValue(group_value);
group_meta.pending_number -= decrement;
s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_meta));
Comment thread src/types/redis_stream.cc
Comment on lines +494 to +496
auto consumer_meta = decodeStreamConsumerMetadataValue(consumer_value);
consumer_meta.pending_number -= decrement;
s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_meta));
@kirito632

Copy link
Copy Markdown
Contributor Author

@jihuayu ,While reviewing the stream code for XDELEX, I found two unrelated stream issues:

  1. XACK with duplicate IDs may acknowledge the same PEL entry multiple times before the batch is committed, leading to incorrect return values and pending counter updates.

  2. XLEN with entry-id filtering on an empty MKSTREAM-created stream can underflow and return UINT64_MAX.

These appear independent of XDELEX, so I plan to handle them separately to keep the current PR focused.Do you think this approach is acceptable, or should I address them within the current PR?

@jihuayu

jihuayu commented Jun 9, 2026

Copy link
Copy Markdown
Member

These appear independent of XDELEX, so I plan to handle them separately to keep the current PR focused.

Yes, splitting it into a separate PR would be better.

You can simply mark the Copilot review comments as resolved.

kirito632 added 5 commits June 9, 2026 11:44
Add XDELEX to delete stream entries with KEEPREF/DELREF options.
Includes refactoring for unified entry/PEL deletion.
Also fix a consumer pending count bug when destroying a consumer.
@kirito632 kirito632 force-pushed the feat/stream-xdelex branch from ecc4365 to b952fd6 Compare June 9, 2026 09:29
@kirito632

Copy link
Copy Markdown
Contributor Author

Hi @jihuayu ,

I’ve been trying to improve my review process while working on this PR. After your previous feedback, I spent some time comparing behavior against Redis 8.2 and found several compatibility issues myself before updating the implementation.

I’m still learning, so I’m curious about one thing: when you review a PR like this, what are the main categories you usually check beyond basic functionality? For example, do you focus more on Redis compatibility, invariants, replication behavior, edge cases, maintainability, or something else?

I’m mainly asking because I’d like to improve my own review workflow and understand what experienced maintainers tend to look for.

Thanks!

@jihuayu

jihuayu commented Jun 9, 2026

Copy link
Copy Markdown
Member

@kirito632 Wow, I’m glad you’re interested in this.

First, what I care about most is correctness. This includes consistency with Redis semantics, although we may intentionally use different semantics when there is a specific reason to do so. It also includes whether the implementation follows Kvrocks conventions, whether reads and writes follow the intended design, whether locking is handled correctly, and whether there are any performance issues.

Beyond that, there are some style-related concerns, such as whether the code is clear and whether the naming is appropriate. This part can be somewhat subjective and depends on each reviewer’s preferences. In general, as long as it is not too far off, it should be fine.

As for test cases, the ideal case would be to cover all edge cases. In practice, that is quite difficult, so covering the main flow and important error paths is usually enough.

Among these, the first point—correctness—is the main focus of the review. For especially complex or very long code, code style and readability will also be treated as being just as important as correctness.

I hope to see in the review that contributors understand their own code and have put their own thought into it.

@kirito632

Copy link
Copy Markdown
Contributor Author

Thanks, this is very helpful.

For this PR, I’ll keep correctness as the main focus and continue checking it from those angles. One thing I noticed while testing against Redis 8.2 is that some XDELEX behavior is a bit more permissive than what I first expected from the command description, so I’ve been using Redis 8.2 behavior as the source of truth where possible and adding focused tests for those cases.

@kirito632

Copy link
Copy Markdown
Contributor Author

@jihuayu ,Could you please help rerun the failed tests?Thanks.

@jihuayu

jihuayu commented Jun 12, 2026

Copy link
Copy Markdown
Member

OK, done!

Don't worry, if you confirm the test is not your issue, I will make sure they all pass before merging.

@sonarqubecloud

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
40.5% Coverage on New Code (required ≥ 50%)

See analysis details on SonarQube Cloud

@kirito632

Copy link
Copy Markdown
Contributor Author

@jihuayu ,

The CI is green now, but SonarCloud is still failing because Coverage on New Code is 40.5% (required >= 50%).

After checking the report, most uncovered lines are in:

  • src/storage/batch_extractor.cc
  • src/commands/cmd_stream.cc

The stream command behavior itself is already covered by the newly added Go tests, but Sonar seems to be counting several WriteBatchExtractor paths that are difficult to exercise through command-level tests.

I'm considering adding a few focused C++ tests around WriteBatchExtractor to increase coverage.

Before doing that, I'd like to confirm whether this is the direction you'd prefer, or if there are other areas you think would benefit more from additional coverage.

Thanks!

@jihuayu

jihuayu commented Jun 12, 2026

Copy link
Copy Markdown
Member

There are still some issues with the code coverage, but we can ignore them for now.

@kirito632

Copy link
Copy Markdown
Contributor Author

Understood, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants