feat(stream): support XDELEX command#3502
Conversation
b464d40 to
ccc15fd
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds a new stream deletion command XDELEX with configurable deletion/PEL-cleanup semantics, and fixes XINFO CONSUMERS so consumer metadata from other groups doesn’t leak into the scan results.
Changes:
- Implement
XDELEXwithKEEPREF(default),DELREF, andACKEDstrategies, including replica replay support viaWriteBatchLogData. - Fix
XINFO CONSUMERSiteration to filter consumers by group name, preventing cross-group leakage. - Add Go integration tests and a C++ unit test for new deletion behaviors and parsing edge cases.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/gocase/unit/type/stream/stream_test.go | Adds Go integration coverage for XDELEX behaviors and error handling. |
| tests/cppunit/types/stream_test.cc | Adds a C++ unit test ensuring empty ID list handling for the new deletion API. |
| src/types/redis_stream.h | Exposes DeleteEntriesWithOption and helper declarations for XDELEX implementation. |
| src/types/redis_stream.cc | Implements DeleteEntriesWithOption plus PEL cleanup and ACKED semantics; fixes GetConsumerInfo group isolation. |
| src/types/redis_stream_base.h | Introduces enums for delete options and per-ID deletion result codes. |
| src/storage/batch_extractor.cc | Ensures replication replays XDELEX semantics and filters internal stream/PEL/meta events. |
| src/commands/cmd_stream.cc | Adds the xdelex command parser/executor and registers the command. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (first_deleted) { | ||
| iter->SeekToFirst(); | ||
| while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { | ||
| iter->Next(); | ||
| } | ||
| if (iter->Valid()) { | ||
| metadata.first_entry_id = entryIDFromInternalKey(iter->key()); | ||
| metadata.recorded_first_entry_id = metadata.first_entry_id; | ||
| } else { | ||
| metadata.first_entry_id.Clear(); | ||
| metadata.recorded_first_entry_id.Clear(); | ||
| } | ||
| } | ||
| if (last_deleted) { | ||
| iter->SeekToLast(); | ||
| while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { | ||
| iter->Prev(); | ||
| } | ||
| if (iter->Valid()) { | ||
| metadata.last_entry_id = entryIDFromInternalKey(iter->key()); | ||
| } else { | ||
| metadata.last_entry_id.Clear(); | ||
| } | ||
| } |
| #include <map> | ||
| #include <optional> | ||
| #include <string> | ||
| #include <vector> | ||
|
|
||
| #include "common/db_util.h" | ||
| #include "storage/redis_db.h" | ||
| #include "storage/redis_metadata.h" |
| for (int64_t i = 0; i < numids; i++) { | ||
| auto id_str = GET_OR_RET(parser.TakeStr()); | ||
| redis::StreamEntryID id; | ||
| auto s = ParseStreamEntryID(id_str, &id); | ||
| if (!s.IsOK()) return s; | ||
| entry_ids_.emplace_back(id); | ||
| } | ||
|
|
||
| return Status::OK(); |
|
Hi @kirito632. Thanks for your PR. I reviewed your PR once using my AI tools (Codex / GPT-5.5 xhigh), and it found quite a few issues (I’m not going to reveal these things. I want you to try and see whether you can find them on your own.). I hope you can also try using an AI tool locally to review your PR once before submitting it. Below is the prompt I use. It is written in Chinese, but you can translate it into the language you use and run it there: If you have any other questions, feel free to @ me directly. Hope this helps you. |
|
Thanks a lot for the detailed feedback and for sharing your workflow. I’m going through the current PR again to identify and fix the remaining issues on my own first.Really appreciate the guidance and the time you spent reviewing this PR. |
d2839d9 to
649043e
Compare
|
Sorry for another force-push. I've added two missing checks: |
649043e to
0ddf4de
Compare
|
Hi @jihuayu ,I took a look at the CI failures and they don't appear to be directly related to the changes in this PR.These might be flaky failures. Could you please help rerun those failed tests?Thanks. |
Done. When you need a review, please @ me. |
|
@jihuayu , I investigated the coverage report after adding the new XDELEX integration tests. The Go tests now cover:
However, cmd_stream.cc and batch_extractor.cc still appear as 0% covered This makes me wonder whether the issue is related to coverage collection Do you think additional C++ unit tests are still expected here, or should |
|
Thanks for waiting! This is a bit weird. Since you only added the instructions, go test should theoretically hit most of the branches, so the coverage shouldn't be this low. That said, Sonar is sometimes inaccurate with coverage stats because it pulls in useless files. Let me check it out. |
|
I’ve confirmed that the code coverage issue is related to CI. I’ve been busy working on the CI these days. Sorry, the review may still need to wait a few more days. PS: I asked Copilot to review the code. It is not 100% correct, so feel free to ignore its comments if you disagree. |
| void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { | ||
| // Currently, we only have two kinds of log data | ||
| if (ServerLogData::IsServerLogData(blob.data())) { | ||
| ServerLogData server_log; | ||
| if (server_log.Decode(blob).IsOK()) { | ||
| // We don't handle server log currently | ||
| } | ||
| } else { | ||
| // Redis type log data | ||
| if (auto s = log_data_.Decode(blob); !s.IsOK()) { | ||
| WARN("Failed to decode Redis type log: {}", s.Msg()); | ||
| } else { | ||
| seen_xdelex_entry_keys_.clear(); | ||
| } | ||
| } | ||
| } |
| auto group_meta = decodeStreamConsumerGroupMetadataValue(group_value); | ||
| group_meta.pending_number -= decrement; | ||
| s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_meta)); |
| auto consumer_meta = decodeStreamConsumerMetadataValue(consumer_value); | ||
| consumer_meta.pending_number -= decrement; | ||
| s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_meta)); |
|
@jihuayu ,While reviewing the stream code for XDELEX, I found two unrelated stream issues:
These appear independent of XDELEX, so I plan to handle them separately to keep the current PR focused.Do you think this approach is acceptable, or should I address them within the current PR? |
Yes, splitting it into a separate PR would be better. You can simply mark the Copilot review comments as resolved. |
…vent cross-group leakage
Add XDELEX to delete stream entries with KEEPREF/DELREF options. Includes refactoring for unified entry/PEL deletion. Also fix a consumer pending count bug when destroying a consumer.
ecc4365 to
b952fd6
Compare
|
Hi @jihuayu , I’ve been trying to improve my review process while working on this PR. After your previous feedback, I spent some time comparing behavior against Redis 8.2 and found several compatibility issues myself before updating the implementation. I’m still learning, so I’m curious about one thing: when you review a PR like this, what are the main categories you usually check beyond basic functionality? For example, do you focus more on Redis compatibility, invariants, replication behavior, edge cases, maintainability, or something else? I’m mainly asking because I’d like to improve my own review workflow and understand what experienced maintainers tend to look for. Thanks! |
|
@kirito632 Wow, I’m glad you’re interested in this. First, what I care about most is correctness. This includes consistency with Redis semantics, although we may intentionally use different semantics when there is a specific reason to do so. It also includes whether the implementation follows Kvrocks conventions, whether reads and writes follow the intended design, whether locking is handled correctly, and whether there are any performance issues. Beyond that, there are some style-related concerns, such as whether the code is clear and whether the naming is appropriate. This part can be somewhat subjective and depends on each reviewer’s preferences. In general, as long as it is not too far off, it should be fine. As for test cases, the ideal case would be to cover all edge cases. In practice, that is quite difficult, so covering the main flow and important error paths is usually enough. Among these, the first point—correctness—is the main focus of the review. For especially complex or very long code, code style and readability will also be treated as being just as important as correctness. I hope to see in the review that contributors understand their own code and have put their own thought into it. |
|
Thanks, this is very helpful. For this PR, I’ll keep correctness as the main focus and continue checking it from those angles. One thing I noticed while testing against Redis 8.2 is that some XDELEX behavior is a bit more permissive than what I first expected from the command description, so I’ve been using Redis 8.2 behavior as the source of truth where possible and adding focused tests for those cases. |
|
@jihuayu ,Could you please help rerun the failed tests?Thanks. |
|
OK, done! Don't worry, if you confirm the test is not your issue, I will make sure they all pass before merging. |
|
|
@jihuayu , The CI is green now, but SonarCloud is still failing because Coverage on New Code is 40.5% (required >= 50%). After checking the report, most uncovered lines are in:
The stream command behavior itself is already covered by the newly added Go tests, but Sonar seems to be counting several WriteBatchExtractor paths that are difficult to exercise through command-level tests. I'm considering adding a few focused C++ tests around WriteBatchExtractor to increase coverage. Before doing that, I'd like to confirm whether this is the direction you'd prefer, or if there are other areas you think would benefit more from additional coverage. Thanks! |
|
There are still some issues with the code coverage, but we can ignore them for now. |
|
Understood, thanks! |


What
This PR introduces the XDELEX stream command with configurable
PEL cleanup behaviors.
It also fixes an existing issue in XINFO CONSUMERS where consumer metadata
from different groups could leak into the same scan result.
XINFO CONSUMERS Fix
Root cause:
GetConsumerInfo() used iterator bounds based only on the stream version range
([version, version+1)).
Since the InternalKey encoding places the version field before the sub-key,
consumer records from other groups could still fall within the same iteration
range.
Fix:
Extract and validate the group name from each internal key during iteration,
filtering out unrelated consumers.
XDELEX
Supported deletion strategies:
KEEPREF (default):
Delete the stream entry only.
DELREF:
Delete the stream entry and remove it from all groups' PELs.
ACKED:
Delete the stream entry only if it has been acknowledged by all consumer groups.
Per-ID return codes:
1: entry deleted
2: entry retained (e.g. ACKED condition not satisfied)
-1: entry not found or duplicate ID within the same request
Implementation Notes
Pending-number decrements are aggregated in memory and flushed through
a single WriteBatch to avoid repeated metadata updates during DELREF operations.
Duplicate IDs are deduplicated to preserve idempotency and prevent
stream metadata corruption.
The originating command name is propagated into WriteBatchExtractor
so replicas replay the exact same deletion semantics.
Internal stream metadata and PEL delete events are filtered out in
WriteBatchExtractor to avoid generating invalid replicated XDEL commands.
Testing
Added Go integration tests covering:
multi-group DELREF cleanup
ACKED blocking semantics
duplicate-ID idempotency
invalid syntax handling
XINFO CONSUMERS group isolation
AI-Assisted Contribution Disclosure
This contribution complies with the ASF AI-assisted contribution guidelines.
AI tools were used to assist with English phrasing and some test scaffolding.
The core C++ implementation, debugging, correctness analysis, and final verification
were independently completed by me.