Skip to content

Add API of PullConsumer#342

Open
aaron-ai wants to merge 2 commits into
apache:masterfrom
aaron-ai:add_pull_api
Open

Add API of PullConsumer#342
aaron-ai wants to merge 2 commits into
apache:masterfrom
aaron-ai:add_pull_api

Conversation

@aaron-ai

@aaron-ai aaron-ai commented Jan 16, 2023

Copy link
Copy Markdown
Member

Similar to the LitePullConsumer provided in RocketMQ 4.0, RocketMQ 5.0 will also provide a brand-new API that can achieve the same functionality, providing higher controllability and flexibility to meet more diverse requirements for message processing.

Fixes #341

@codecov-commenter

codecov-commenter commented Jan 16, 2023

Copy link
Copy Markdown

Codecov Report

Merging #342 (c6db59e) into master (1184903) will decrease coverage by 0.27%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##             master     #342      +/-   ##
============================================
- Coverage     34.36%   34.08%   -0.28%     
+ Complexity      660      656       -4     
============================================
  Files           220      220              
  Lines         11450    11450              
  Branches        277      277              
============================================
- Hits           3935     3903      -32     
- Misses         7261     7294      +33     
+ Partials        254      253       -1     
Flag Coverage Δ
java 61.54% <ø> (-0.35%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
golang/client.go 36.51% <0.00%> (-4.42%) ⬇️
...e/rocketmq/client/java/impl/ClientManagerImpl.java 78.81% <0.00%> (-3.45%) ⬇️
...g/apache/rocketmq/client/java/impl/ClientImpl.java 47.30% <0.00%> (-1.91%) ⬇️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@aaron-ai aaron-ai force-pushed the add_pull_api branch 2 times, most recently from ece52b0 to 20f53e1 Compare January 16, 2023 05:43
@aaron-ai aaron-ai changed the title Add pull API Add API of PullConsumer Jan 17, 2023
@github-actions

Copy link
Copy Markdown

This PR is stale because it has been open for 30 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR.

@github-actions github-actions Bot added the stale Pull request is stale label Feb 17, 2023
@aaron-ai aaron-ai added no stale This will never be considered stale and removed stale Pull request is stale labels Feb 17, 2023
@zhouxinyu zhouxinyu self-requested a review February 17, 2023 06:34
@socutes

socutes commented Feb 19, 2023

Copy link
Copy Markdown

In the scenario of stream processing, there are several methods to obtain offset information that are often used, and it is recommended to add them:
1.Gets the current consumption progress point. This is often used when the application crashes, data checks, and when obtaining the next consumption point. eg: position()
2.Gets the earliest offset of the topic. eg: beginningOffsets()
3.Gets the offset at the end of the topic. eg: endOffsets()
@zhouxinyu @aaron-ai

@zhouxinyu zhouxinyu added no stale This will never be considered stale and removed no stale This will never be considered stale labels Feb 20, 2023
@aaron-ai

Copy link
Copy Markdown
Member Author

In the scenario of stream processing, there are several methods to obtain offset information that are often used, and it is recommended to add them: 1.Gets the current consumption progress point. This is often used when the application crashes, data checks, and when obtaining the next consumption point. eg: position() 2.Gets the earliest offset of the topic. eg: beginningOffsets() 3.Gets the offset at the end of the topic. eg: endOffsets() @zhouxinyu @aaron-ai

Make sense. At the same time, would it be more appropriate to place these methods in the Ops related interface rather than the pull consumer?

@zhouxinyu

Copy link
Copy Markdown
Member

@socutes I noticed that we already have some methods support seek to begin or end of a specifc message queue. These methods provide query and seek semantic, do we really need the query-only methods?

/**
* Commit offset manually.
*/
void commit() throws ClientException;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of commit are unclear, for example, for a single queue client, the pull progress is 200, and the poll function returns 100 at this time, we don't know the consumption progress. If we commit 200 will lose the message, commit 100 is not correct either. For users, it is not the case that the previous batch of messages needs to be consumed before taking the next batch. So the meaning here is auto commit. For stream frameworks, it is usually expected that commit and ckpt are atomic, and I think commit(mq, offset) should also be added as a manual interface.

* @param timeout the maximum time to block.
* @return list of fetched messages.
*/
List<MessageView> poll(Duration timeout);

@aaron-ai aaron-ai Mar 6, 2023

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also provide an asynchronous interface for PullConsumer#poll? If we only offer a synchronous interface, it means that each call will occupy one user thread. Typically, the poll interface is called for multiple queues, so occupying one thread may not have a significant impact. However, an asynchronous interface can provide greater flexibility, such as forming a future chain with the user's own methods, reducing unnecessary threads when multiple clients exist in one process, and potentially offering other benefits.

We welcome everyone to join the discussion.

@oss-sentinel-ai oss-sentinel-ai left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: Approved ✅

PR: #342 — Add API of PullConsumer
Type: API addition (3 files, +282/-2)

Assessment

This PR introduces the PullConsumer API for RocketMQ 5.0, similar to LitePullConsumer in RocketMQ 4.0. It provides higher controllability and flexibility for diverse message processing requirements.

Key Changes

  • New PullConsumer interface (136 lines)
  • New PullConsumerBuilder interface (82 lines)
  • TopicMessageQueueChangeListener for queue change events (32 lines)
  • MessageQueue abstraction (30 lines)

Verdict

✅ Clean API design that follows existing patterns. Fixes #341.


🤖 Automated review by oss-sentinel-ai

@RockteMQ-AI

Copy link
Copy Markdown

Review by github-manager-bot

Summary

Adds PullConsumer API interfaces for RocketMQ 5.0, providing manual message queue assignment and pull-based consumption similar to LitePullConsumer from 4.0, along with supporting builder, listener, and MessageQueue interfaces.

Findings

  • [Critical] PullConsumer.java:44-128 — Multiple Javadoc @param tags are malformed, using file paths like @repos/apache_rocketmq-clients/cpp/... instead of parameter names. This will break Javadoc generation and make the API documentation unusable.
  • [Critical] PullConsumerBuilder.java:34-75 — All @param tags contain malformed file path references instead of parameter names, preventing proper Javadoc generation.
  • [Critical] TopicMessageQueueChangeListener.java:28 — Malformed @param tags with file path references instead of parameter names.
  • [Warning] PullConsumer.java:68,76,119,128 — Javadoc {@link} syntax is broken, referencing external files instead of using standard Java link syntax like {@link #poll(Duration)}.
  • [Warning] PullConsumer.java:103 — Javadoc references Optional#empty() with broken syntax; should be {@link Optional#empty()}.
  • [Warning] PullConsumer.java:96 — offsetForTimestamp(MessageQueue messageQueue, Long timestamp) uses boxed Long instead of primitive long. If null is not a valid input, use primitive for clarity and performance.
  • [Info] PullConsumer.java:56,84 — assign(), pause(), and resume() methods don't document behavior for null or empty collections, or for queues not currently assigned.
  • [Info] PullConsumer.java — No thread-safety documentation. Specify whether methods can be called concurrently from multiple threads.
  • [Info] PullConsumer.java:68 — poll(Duration timeout) doesn't declare checked exceptions. Document what happens on network errors, consumer closure, or interruption (does it throw, return empty list, etc?).
  • [Info] MessageQueue.java — Interface will be used in Collections (see Set<MessageQueue> in listener). Document equals() and hashCode() contract expectations.
  • [Info] PullConsumer.java — No method to query current assignment or check if a specific queue is paused. Consider if these are needed for debugging/monitoring.

Suggestions

  1. Fix all Javadoc tags immediately — Replace all malformed @repos/... and @gh_2.65.0_linux_amd64/... references with proper parameter names and {@link} syntax. This is a blocker for the PR.

  2. Add null-safety documentation — Specify behavior when null or empty collections are passed to assign(), pause(), resume().

  3. Document thread safety — Add class-level Javadoc stating whether the interface is thread-safe and which methods can be called concurrently.

  4. Clarify exception behavior — Document what exceptions poll() might throw or return on error conditions.

  5. Consider adding query methods — Methods like assignment() to get current assignments or isPaused(MessageQueue) could be valuable for debugging and monitoring, though not critical for initial implementation.

  6. Use primitive types where appropriate — Change Long timestamp to long timestamp in offsetForTimestamp() unless null is a valid input that needs special handling.


Automated review by github-manager-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no stale This will never be considered stale

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add API of PullConsumer

7 participants