feat: implement real-time message steering for agents during execution#1802
feat: implement real-time message steering for agents during execution#1802praisonai-triage-agent[bot] wants to merge 2 commits into
Conversation
- Add MessageSteeringProtocol and AgentSteeringProtocol following protocol-driven architecture - Implement MessageSteering class with priority-based message queuing - Add SteeringMixin to Agent class with message_steering parameter - Integrate steering checks into execution flow and tool execution - Support mid-execution guidance with priority levels (normal, high, urgent, interrupt) - Maintains backward compatibility with existing Agent API - Zero overhead when message_steering=False (default) Fixes #1799 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: praisonai-triage-agent[bot] <praisonai-triage-agent[bot]@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds real-time message steering capability to agents, enabling runtime guidance during execution. Defines steering protocols with priority levels and message structures, implements a queue-based MessageSteering system with a SteeringMixin integration layer, wires steering into Agent initialization and execution flows, and provides comprehensive test coverage for enabling/disabling and message processing. ChangesMessage Steering System
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
Greptile SummaryThis PR adds real-time message steering to PraisonAI agents, allowing callers to queue guidance messages that are injected into the agent's LLM prompt between execution turns, with high-priority messages able to interrupt in-progress tool calls.
Confidence Score: 4/5Safe to merge with the understanding that LOW/NORMAL priority steering messages arriving during tool execution will be silently dropped rather than delivered. The steering hook in tool_execution.py calls _check_steering_messages(), which dequeues the message unconditionally. For non-urgent messages the formatted string is returned but never injected into any prompt — the guidance is consumed and lost. execution_mixin.py already ran its pre-chat check before the tool fired, so no subsequent check will pick it up. tool_execution.py — the steering check before each tool call dequeues non-urgent messages without using them. Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant Agent
participant SteeringQueue as MessageSteering Queue
participant ExecutionMixin
participant LLM
participant ToolExecution
User->>Agent: "steer("guidance", priority=5)"
Agent->>SteeringQueue: queue_message
Note over ExecutionMixin: Next agent turn begins
ExecutionMixin->>Agent: _check_steering_messages()
Agent->>SteeringQueue: process_steering() dequeue()
SteeringQueue-->>Agent: SteeringMessage (NORMAL)
Agent-->>ExecutionMixin: "[USER GUIDANCE]: guidance..."
ExecutionMixin->>LLM: chat(prompt + steering_msg)
LLM->>ToolExecution: call tool(fn, args)
ToolExecution->>Agent: _check_steering_messages()
alt URGENT/HIGH priority
Agent-->>ToolExecution: "[URGENT USER GUIDANCE]: ..."
ToolExecution-->>LLM: Tool interrupted by guidance
else LOW/NORMAL priority
Agent-->>ToolExecution: "[USER GUIDANCE]: ..."
Note over ToolExecution: Message dequeued but dropped
ToolExecution->>ToolExecution: executes tool normally
end
Reviews (2): Last reviewed commit: "fix: address critical bugs in message st..." | Re-trigger Greptile |
| # Process one message per check to avoid blocking | ||
| msg_content = self._message_queue.dequeue(timeout=0) |
There was a problem hiding this comment.
process_steering only dequeues a single message per invocation, but _check_steering_messages is called once before each LLM call, so any messages queued after the first one accumulate and are returned to the agent one-per-chat-turn. If users queue several steering messages rapidly during a slow task, all but the oldest remain pending for potentially many turns. Consider draining all queued messages (or at least a bounded batch) in a single call so guidance is not arbitrarily delayed.
| # Process one message per check to avoid blocking | |
| msg_content = self._message_queue.dequeue(timeout=0) | |
| # Process one message per check to avoid blocking | |
| # Note: only one message is dequeued per call; callers must invoke | |
| # repeatedly (or drain in a loop) to process all pending messages. | |
| msg_content = self._message_queue.dequeue(timeout=0) |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| from .message_steering_protocols import ( | ||
| MessageSteeringProtocol, | ||
| SteeringMessage, | ||
| SteeringPriority, | ||
| AgentSteeringProtocol | ||
| ) |
There was a problem hiding this comment.
MessageSteeringProtocol and AgentSteeringProtocol are imported but never used in this module. MessageSteering does not declare conformance to MessageSteeringProtocol, and the unused import adds noise. Consider removing the unused names from the import.
| from .message_steering_protocols import ( | |
| MessageSteeringProtocol, | |
| SteeringMessage, | |
| SteeringPriority, | |
| AgentSteeringProtocol | |
| ) | |
| from .message_steering_protocols import ( | |
| SteeringMessage, | |
| SteeringPriority, | |
| ) |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (1)
src/praisonai-agents/praisonaiagents/agent/message_steering_protocols.py (1)
30-31: 🏗️ Heavy liftMove these interfaces into a
protocols.pymodule.This repo’s core-SDK convention requires extension-point interfaces to live in a
protocols.pyfile within their module. Keeping them inmessage_steering_protocols.pymakes this feature a one-off pattern. As per coding guidelines, "All protocols must be placed in a protocols.py file within their module and be suffixed with 'Protocol'".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/praisonai-agents/praisonaiagents/agent/message_steering_protocols.py` around lines 30 - 31, Move the MessageSteeringProtocol (the `@runtime_checkable` Protocol class) out of message_steering_protocols.py into a new protocols.py within the same module; keep the class name and decorator unchanged, export it from the new protocols.py, update any local imports in this module (and other modules that import MessageSteeringProtocol) to reference the new protocols.py, and run tests/type-checking to ensure there are no unresolved imports or circular references after the move.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai-agents/praisonaiagents/agent/execution_mixin.py`:
- Around line 809-816: Wrap the steering check and chat invocation in a
try/except around the call to _check_steering_messages() and the creation of
prompt_with_steering so exceptions are caught and surfaced with context; on
exception set current_status[0] to a clear failure message and set
result_holder[0] to the exception (or raise a RuntimeError from the original)
including remediation hints and any agent/context identifiers, then re-raise to
fail fast. Also fix prompt formatting when creating prompt_with_steering by
trimming input and joining prompt and steering_msg with a clear separator (e.g.,
double newline and a short “Steering Guidance” delimiter) rather than simple
concatenation, and ensure you call self.chat(prompt_with_steering, **kwargs)
only after successful formatting. Include references to the symbols
_check_steering_messages, prompt_with_steering, current_status, result_holder,
and self.chat so the change is localized.
In `@src/praisonai-agents/praisonaiagents/agent/message_steering_protocols.py`:
- Around line 13-19: SteeringPriority.INTERRUPT currently advertises stronger
semantics but is clamped to the same numeric level as URGENT when enqueued (see
MessageSteering.queue_message and MessagePriority in message_queue.py), so
INTERRUPT doesn't outrank URGENT end-to-end; fix by either
removing/degeneralizing INTERRUPT from SteeringPriority or by updating the
queuing logic to special-case INTERRUPT (e.g., treat SteeringPriority.INTERRUPT
as an immediate interrupt path or map it to a distinct MessagePriority value not
clamped by MessagePriority.URGENT) so that messages created with
SteeringPriority.INTERRUPT truly preempt URGENT messages in
MessageSteering.queue_message and the underlying message_queue handling.
In `@src/praisonai-agents/praisonaiagents/agent/message_steering.py`:
- Around line 165-257: The mixin currently assumes more than
MessageSteeringProtocol guarantees: update either the protocol or the mixin;
preferred fix is to extend MessageSteeringProtocol to include the status/message
surface used here (add enabled: bool, get_pending_messages() -> List[Dict],
has_pending_messages() -> bool, and make process_steering(context: Dict) -> bool
populate context["steering_messages"] with dicts containing "content" and
"priority"), then ensure _init_message_steering accepts MessageSteeringProtocol
implementations that implement those symbols; alternatively (if you cannot
change the protocol) make get_steering_status(), message_steering_enabled and
_check_steering_messages() defensive by using getattr/hasattr and calling only
protocol methods (e.g., a process_steering() return value that directly returns
messages) so the mixin no longer assumes .enabled, .get_pending_messages(),
.has_pending_messages() or context mutation semantics used in
_check_steering_messages.
In `@src/praisonai-agents/praisonaiagents/agent/tool_execution.py`:
- Around line 198-205: Replace the fragile string-match and
missing-error-handling around _check_steering_messages(): call
_check_steering_messages() inside a try/except, log and return a clear failure
message if it raises, then inspect the returned steering message's priority
field using the SteeringPriority enum (e.g., compare steering_msg.priority or
steering_msg.get("priority") to SteeringPriority.INTERRUPT) instead of searching
for the substring "INTERRUPT"; if the priority equals INTERRUPT (or equivalent
enum value), log the interruption and return the user-guidance interruption
message.
In `@test_message_steering.py`:
- Around line 1-13: Move the test file from the repository root into the
integration tests directory (e.g.,
src/praisonai-agents/tests/integration/test_message_steering.py), remove or
adjust the manual sys.path modification (sys.path.insert(...)) so test discovery
works with pytest, and update any relative imports or paths in the file to use
package imports or pytest fixtures instead of modifying sys.path; ensure the
filename and test functions follow pytest conventions so test discovery finds
them.
- Around line 18-88: The test suite is missing a mandatory live agentic test
that actually runs the LLM and verifies steering injection; add a new test
(e.g., test_message_steering_live_execution) that creates an Agent with
message_steering=True and a real LLM (llm="gpt-4o-mini"), starts execution via
Agent.start("Summarize the benefits of async programming") in a background
thread, calls Agent.steer("Keep it under 50 words", priority=HIGH) while the
agent is running, waits for the thread to finish, then assert the returned
response is non-empty and that the steering guidance appears in the response or
in the agent’s prompt/tracing/hook output (use Agent._check_steering_messages or
any tracing hook to validate injection); ensure timing (sleep) and thread join
timeout are used so the steer call occurs during execution and the test cleans
up reliably.
---
Nitpick comments:
In `@src/praisonai-agents/praisonaiagents/agent/message_steering_protocols.py`:
- Around line 30-31: Move the MessageSteeringProtocol (the `@runtime_checkable`
Protocol class) out of message_steering_protocols.py into a new protocols.py
within the same module; keep the class name and decorator unchanged, export it
from the new protocols.py, update any local imports in this module (and other
modules that import MessageSteeringProtocol) to reference the new protocols.py,
and run tests/type-checking to ensure there are no unresolved imports or
circular references after the move.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: db20aa4b-f642-4ebc-9652-887cabb02113
📒 Files selected for processing (7)
src/praisonai-agents/praisonaiagents/agent/__init__.pysrc/praisonai-agents/praisonaiagents/agent/agent.pysrc/praisonai-agents/praisonaiagents/agent/execution_mixin.pysrc/praisonai-agents/praisonaiagents/agent/message_steering.pysrc/praisonai-agents/praisonaiagents/agent/message_steering_protocols.pysrc/praisonai-agents/praisonaiagents/agent/tool_execution.pytest_message_steering.py
| # Check for steering messages before starting chat | ||
| if hasattr(self, '_check_steering_messages'): | ||
| steering_msg = self._check_steering_messages() | ||
| if steering_msg: | ||
| # Inject steering message into prompt | ||
| prompt_with_steering = f"{prompt}{steering_msg}" | ||
| current_status[0] = "Processing steering guidance..." | ||
| result_holder[0] = self.chat(prompt_with_steering, **kwargs) |
There was a problem hiding this comment.
Add error handling and improve prompt formatting for steering integration.
The steering check lacks defensive error handling and could produce malformed prompts:
- If
_check_steering_messages()raises an exception, the entire chat thread fails with no recovery - The f-string concatenation
f"{prompt}{steering_msg}"directly appends steering text without a separator, potentially causing parsing issues for the LLM
🛡️ Proposed fix: Add error handling and formatting
# Check for steering messages before starting chat
if hasattr(self, '_check_steering_messages'):
- steering_msg = self._check_steering_messages()
- if steering_msg:
- # Inject steering message into prompt
- prompt_with_steering = f"{prompt}{steering_msg}"
- current_status[0] = "Processing steering guidance..."
- result_holder[0] = self.chat(prompt_with_steering, **kwargs)
- else:
- result_holder[0] = self.chat(prompt, **kwargs)
+ try:
+ steering_msg = self._check_steering_messages()
+ if steering_msg:
+ # Inject steering message into prompt with clear separator
+ prompt_with_steering = f"{prompt}\n\n{steering_msg}"
+ current_status[0] = "Processing steering guidance..."
+ result_holder[0] = self.chat(prompt_with_steering, **kwargs)
+ else:
+ result_holder[0] = self.chat(prompt, **kwargs)
+ except Exception as e:
+ logger.warning(f"Steering check failed, continuing without steering: {e}")
+ result_holder[0] = self.chat(prompt, **kwargs)
else:
result_holder[0] = self.chat(prompt, **kwargs)Based on coding guidelines: error handling should fail fast with clear error messages and include remediation hints; propagate context for debugging.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai-agents/praisonaiagents/agent/execution_mixin.py` around lines
809 - 816, Wrap the steering check and chat invocation in a try/except around
the call to _check_steering_messages() and the creation of prompt_with_steering
so exceptions are caught and surfaced with context; on exception set
current_status[0] to a clear failure message and set result_holder[0] to the
exception (or raise a RuntimeError from the original) including remediation
hints and any agent/context identifiers, then re-raise to fail fast. Also fix
prompt formatting when creating prompt_with_steering by trimming input and
joining prompt and steering_msg with a clear separator (e.g., double newline and
a short “Steering Guidance” delimiter) rather than simple concatenation, and
ensure you call self.chat(prompt_with_steering, **kwargs) only after successful
formatting. Include references to the symbols _check_steering_messages,
prompt_with_steering, current_status, result_holder, and self.chat so the change
is localized.
| class SteeringPriority(Enum): | ||
| """Priority levels for steering messages.""" | ||
| LOW = 1 | ||
| NORMAL = 5 | ||
| HIGH = 10 | ||
| URGENT = 20 | ||
| INTERRUPT = 30 # Immediate interruption |
There was a problem hiding this comment.
INTERRUPT does not actually outrank URGENT end-to-end.
src/praisonai-agents/praisonaiagents/agent/message_queue.py only supports priorities up to URGENT=20, and MessageSteering.queue_message() later clamps the enqueued value with min(priority, MessagePriority.URGENT.value). So an INTERRUPT message is queued with the same priority as URGENT, which means an earlier urgent message can still be processed first. That breaks the stronger interruption semantics exposed by this public enum.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai-agents/praisonaiagents/agent/message_steering_protocols.py`
around lines 13 - 19, SteeringPriority.INTERRUPT currently advertises stronger
semantics but is clamped to the same numeric level as URGENT when enqueued (see
MessageSteering.queue_message and MessagePriority in message_queue.py), so
INTERRUPT doesn't outrank URGENT end-to-end; fix by either
removing/degeneralizing INTERRUPT from SteeringPriority or by updating the
queuing logic to special-case INTERRUPT (e.g., treat SteeringPriority.INTERRUPT
as an immediate interrupt path or map it to a distinct MessagePriority value not
clamped by MessagePriority.URGENT) so that messages created with
SteeringPriority.INTERRUPT truly preempt URGENT messages in
MessageSteering.queue_message and the underlying message_queue handling.
| def _init_message_steering(self, message_steering=False): | ||
| """Initialize message steering if enabled.""" | ||
| if message_steering is True: | ||
| # Use default implementation | ||
| self._message_steering = MessageSteering() | ||
| elif message_steering is False or message_steering is None: | ||
| # Disabled | ||
| self._message_steering = None | ||
| else: | ||
| # Custom implementation provided | ||
| self._message_steering = message_steering | ||
|
|
||
| def steer(self, message: str, priority: int = 5) -> str: | ||
| """ | ||
| Send a steering message to the agent during execution. | ||
|
|
||
| This can be called while the agent is running to provide | ||
| real-time guidance or course corrections. | ||
|
|
||
| Args: | ||
| message: The steering message | ||
| priority: Message priority (1=low, 5=normal, 10=high, 20=urgent, 30=interrupt) | ||
|
|
||
| Returns: | ||
| Message ID for tracking, empty string if steering disabled | ||
|
|
||
| Example: | ||
| ```python | ||
| agent = Agent(name="research", message_steering=True) | ||
|
|
||
| # Start long-running task | ||
| import threading | ||
| def run_task(): | ||
| return agent.start("Research AI trends comprehensively") | ||
| thread = threading.Thread(target=run_task) | ||
| thread.start() | ||
|
|
||
| # Send steering messages while running | ||
| agent.steer("Focus on the business impact, not technical details") | ||
| agent.steer("Also include information about market size", priority=10) | ||
| ``` | ||
| """ | ||
| if self._message_steering is None: | ||
| logger.warning("Message steering not enabled - call ignored") | ||
| return "" | ||
|
|
||
| return self._message_steering.queue_message(message, priority) | ||
|
|
||
| def get_steering_status(self) -> Dict[str, Any]: | ||
| """Get current steering status.""" | ||
| if self._message_steering is None: | ||
| return {"enabled": False, "pending_count": 0} | ||
|
|
||
| return { | ||
| "enabled": self._message_steering.enabled, | ||
| "pending_count": len(self._message_steering.get_pending_messages()), | ||
| "has_pending": self._message_steering.has_pending_messages() | ||
| } | ||
|
|
||
| @property | ||
| def message_steering_enabled(self) -> bool: | ||
| """Whether message steering is enabled for this agent.""" | ||
| return self._message_steering is not None and self._message_steering.enabled | ||
|
|
||
| def _check_steering_messages(self) -> Optional[str]: | ||
| """ | ||
| Check for and process steering messages. | ||
|
|
||
| This is called during agent execution loops to process | ||
| any pending steering messages. | ||
|
|
||
| Returns: | ||
| Steering message to inject into conversation, or None | ||
| """ | ||
| if self._message_steering is None: | ||
| return None | ||
|
|
||
| # Process steering messages | ||
| context = {} | ||
| if self._message_steering.process_steering(context): | ||
| steering_messages = context.get("steering_messages", []) | ||
| if steering_messages: | ||
| # Get the most recent message | ||
| latest = steering_messages[-1] | ||
| content = latest["content"] | ||
| priority = latest["priority"] | ||
|
|
||
| # Format as system message | ||
| if priority in ("HIGH", "URGENT", "INTERRUPT"): | ||
| return f"\n[URGENT USER GUIDANCE]: {content}\nPlease acknowledge and adjust your approach accordingly." | ||
| else: | ||
| return f"\n[USER GUIDANCE]: {content}\nPlease consider this feedback as you continue." | ||
|
|
There was a problem hiding this comment.
The custom steering backend contract is stricter than MessageSteeringProtocol.
_init_message_steering() accepts any MessageSteeringProtocol, but get_steering_status() / message_steering_enabled read .enabled, and _check_steering_messages() assumes process_steering() mutates context["steering_messages"] with content and priority keys. Neither requirement exists in the published protocol, so a backend that satisfies the public type can still raise AttributeError or silently drop steering during execution. Please either extend the protocol with the status/message surface the mixin needs, or make the mixin depend only on protocol methods.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai-agents/praisonaiagents/agent/message_steering.py` around lines
165 - 257, The mixin currently assumes more than MessageSteeringProtocol
guarantees: update either the protocol or the mixin; preferred fix is to extend
MessageSteeringProtocol to include the status/message surface used here (add
enabled: bool, get_pending_messages() -> List[Dict], has_pending_messages() ->
bool, and make process_steering(context: Dict) -> bool populate
context["steering_messages"] with dicts containing "content" and "priority"),
then ensure _init_message_steering accepts MessageSteeringProtocol
implementations that implement those symbols; alternatively (if you cannot
change the protocol) make get_steering_status(), message_steering_enabled and
_check_steering_messages() defensive by using getattr/hasattr and calling only
protocol methods (e.g., a process_steering() return value that directly returns
messages) so the mixin no longer assumes .enabled, .get_pending_messages(),
.has_pending_messages() or context mutation semantics used in
_check_steering_messages.
| # Check for steering messages before tool execution | ||
| if hasattr(self, '_check_steering_messages'): | ||
| steering_msg = self._check_steering_messages() | ||
| if steering_msg and "INTERRUPT" in steering_msg: | ||
| # High priority steering - interrupt tool execution | ||
| logger.info(f"Tool {function_name} execution interrupted by steering message") | ||
| return f"Tool execution interrupted by user guidance: {steering_msg}" | ||
|
|
There was a problem hiding this comment.
Use priority-level check instead of string matching for steering interruption.
The current implementation searches for the string "INTERRUPT" in the steering message content, which is fragile and doesn't align with the protocol-driven design. According to the PR objectives and review stack context, SteeringPriority enum exists with priority levels (LOW, NORMAL, HIGH, URGENT, INTERRUPT), and the steering message should contain priority metadata.
Additionally, the check lacks error handling—if _check_steering_messages() raises, tool execution fails entirely.
🔧 Proposed fix: Check priority level from steering protocol
# Check for steering messages before tool execution
if hasattr(self, '_check_steering_messages'):
- steering_msg = self._check_steering_messages()
- if steering_msg and "INTERRUPT" in steering_msg:
- # High priority steering - interrupt tool execution
- logger.info(f"Tool {function_name} execution interrupted by steering message")
- return f"Tool execution interrupted by user guidance: {steering_msg}"
+ try:
+ steering_msg = self._check_steering_messages()
+ if steering_msg:
+ # Check if steering message indicates interruption priority
+ # (SteeringMixin formats HIGH/URGENT/INTERRUPT with prefix)
+ if any(marker in steering_msg for marker in ["[INTERRUPT]", "[URGENT]", "[HIGH]"]):
+ logger.info(f"Tool {function_name} execution interrupted by high-priority steering")
+ return f"Tool execution interrupted by user guidance: {steering_msg}"
+ except Exception as e:
+ logger.warning(f"Steering check failed, continuing with tool execution: {e}")Based on coding guidelines: use protocol-driven design for extension points; error handling should include context propagation and fail gracefully with clear messages.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai-agents/praisonaiagents/agent/tool_execution.py` around lines
198 - 205, Replace the fragile string-match and missing-error-handling around
_check_steering_messages(): call _check_steering_messages() inside a try/except,
log and return a clear failure message if it raises, then inspect the returned
steering message's priority field using the SteeringPriority enum (e.g., compare
steering_msg.priority or steering_msg.get("priority") to
SteeringPriority.INTERRUPT) instead of searching for the substring "INTERRUPT";
if the priority equals INTERRUPT (or equivalent enum value), log the
interruption and return the user-guidance interruption message.
| #!/usr/bin/env python3 | ||
| """ | ||
| Test script for message steering capability. | ||
|
|
||
| This tests the real-time message steering implementation. | ||
| """ | ||
| import sys | ||
| import os | ||
| import time | ||
| import threading | ||
|
|
||
| # Add the package to the path | ||
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src', 'praisonai-agents')) |
There was a problem hiding this comment.
Move test file to proper test directory.
The test file is located at the repository root instead of within the tests/ subdirectory. According to coding guidelines, tests should be organized into unit/, integration/, e2e/ subdirectories.
Suggested location:
src/praisonai-agents/tests/integration/test_message_steering.py
This change will:
- Follow project conventions for test organization
- Allow test discovery tools to find it automatically
- Separate test code from production code
Based on coding guidelines: organize tests into unit/, integration/, e2e/ subdirectories with fixtures/ for shared test data.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@test_message_steering.py` around lines 1 - 13, Move the test file from the
repository root into the integration tests directory (e.g.,
src/praisonai-agents/tests/integration/test_message_steering.py), remove or
adjust the manual sys.path modification (sys.path.insert(...)) so test discovery
works with pytest, and update any relative imports or paths in the file to use
package imports or pytest fixtures instead of modifying sys.path; ensure the
filename and test functions follow pytest conventions so test discovery finds
them.
| def test_message_steering_basic(): | ||
| """Test basic message steering functionality.""" | ||
| print("Testing basic message steering...") | ||
|
|
||
| # Create agent with message steering enabled | ||
| agent = Agent( | ||
| name="test_agent", | ||
| instructions="You are a helpful assistant. Acknowledge any user guidance.", | ||
| message_steering=True, | ||
| llm="gpt-4o-mini" | ||
| ) | ||
|
|
||
| # Verify steering is enabled | ||
| assert agent.message_steering_enabled, "Message steering should be enabled" | ||
|
|
||
| # Test queueing messages | ||
| msg_id = agent.steer("Focus on being concise") | ||
| assert msg_id, "Should return message ID" | ||
|
|
||
| status = agent.get_steering_status() | ||
| assert status["enabled"], "Steering should be enabled" | ||
| assert status["pending_count"] > 0, "Should have pending messages" | ||
|
|
||
| print("✅ Basic message steering test passed") | ||
| return True | ||
|
|
||
| def test_message_steering_disabled(): | ||
| """Test that steering is disabled by default.""" | ||
| print("Testing disabled message steering...") | ||
|
|
||
| agent = Agent(name="test_agent", instructions="You are helpful") | ||
|
|
||
| # Verify steering is disabled | ||
| assert not agent.message_steering_enabled, "Message steering should be disabled by default" | ||
|
|
||
| # Test steering call returns empty ID | ||
| msg_id = agent.steer("This should be ignored") | ||
| assert msg_id == "", "Should return empty string when disabled" | ||
|
|
||
| status = agent.get_steering_status() | ||
| assert not status["enabled"], "Steering should be disabled" | ||
| assert status["pending_count"] == 0, "Should have no pending messages" | ||
|
|
||
| print("✅ Disabled message steering test passed") | ||
| return True | ||
|
|
||
| def test_message_steering_integration(): | ||
| """Test integration with execution (smoke test only - no actual LLM call).""" | ||
| print("Testing message steering integration...") | ||
|
|
||
| agent = Agent( | ||
| name="integration_test", | ||
| instructions="You are helpful", | ||
| message_steering=True | ||
| ) | ||
|
|
||
| # Add a steering message | ||
| msg_id = agent.steer("Please be very brief", priority=10) | ||
| assert msg_id, "Should queue message" | ||
|
|
||
| # Check that steering check method exists | ||
| assert hasattr(agent, '_check_steering_messages'), "Should have steering check method" | ||
|
|
||
| # Test the steering check method | ||
| steering_msg = agent._check_steering_messages() | ||
| assert steering_msg is not None, "Should return steering message" | ||
| assert "USER GUIDANCE" in steering_msg, "Should format as guidance" | ||
| assert "brief" in steering_msg.lower(), "Should contain original message" | ||
|
|
||
| print("✅ Integration test passed") | ||
| return True |
There was a problem hiding this comment.
Add mandatory real agentic test with actual LLM execution.
The current tests are smoke tests that only validate object construction and state checks. According to coding guidelines, real agentic tests are MANDATORY for every feature: the agent must call agent.start() with a real prompt, call the LLM, and produce actual text response.
Missing test case:
- Create agent with
message_steering=True - Start execution with
agent.start("Summarize the benefits of async programming") - During execution, call
agent.steer("Keep it under 50 words", priority=HIGH)from a background thread - Verify the agent's response respects the steering guidance
- Validate that steering was actually injected into the prompt via hooks or tracing
Example structure:
def test_message_steering_live_execution():
"""Test steering injection during live LLM execution."""
agent = Agent(
name="steering_test",
instructions="You are a helpful assistant",
message_steering=True,
llm="gpt-4o-mini"
)
# Start execution in background thread
import threading
result_holder = [None]
def run_agent():
result_holder[0] = agent.start("Explain quantum computing")
thread = threading.Thread(target=run_agent)
thread.start()
# Send steering while running
time.sleep(0.5) # Let execution start
msg_id = agent.steer("Be very concise", priority=10)
thread.join(timeout=30)
result = result_holder[0]
# Validate result contains response
assert result, "Should return LLM response"
# Additional assertions based on actual behaviorBased on coding guidelines: Real agentic tests are MANDATORY for every feature—Agent must call agent.start() with a real prompt, call the LLM, and produce actual text response, not just smoke tests of object construction.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@test_message_steering.py` around lines 18 - 88, The test suite is missing a
mandatory live agentic test that actually runs the LLM and verifies steering
injection; add a new test (e.g., test_message_steering_live_execution) that
creates an Agent with message_steering=True and a real LLM (llm="gpt-4o-mini"),
starts execution via Agent.start("Summarize the benefits of async programming")
in a background thread, calls Agent.steer("Keep it under 50 words",
priority=HIGH) while the agent is running, waits for the thread to finish, then
assert the returned response is non-empty and that the steering guidance appears
in the response or in the agent’s prompt/tracing/hook output (use
Agent._check_steering_messages or any tracing hook to validate injection);
ensure timing (sleep) and thread join timeout are used so the steer call occurs
during execution and the test cleans up reliably.
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job ✅ Final Architecture Review CompleteAPPROVED ✅ - All critical issues have been addressed and the PR is ready for merge. 📋 Review Results
🏗️ Architecture Compliance ✅Protocol-Driven Design:
Backward Compatibility:
Agent-Centric Design:
✅ Critical Bugs Fixed🔧 All 7 bugs identified by reviewers have been resolved:
🧪 Validation ResultsImport Test: ✅ Package imports successfully 📝 Files Modified & SummaryCore Implementation:
Testing:
🎯 Final Verdict✅ APPROVE FOR MERGE This PR successfully implements real-time message steering with:
The implementation follows AGENTS.md guidelines perfectly and provides a robust foundation for real-time agent steering capabilities. Commit: |
- Fix INTERRUPT detection in tool_execution.py (was using broken substring check) - Fix rate limiter blocking urgent messages during tool execution - Add INTERRUPT priority bypass in message queue (URGENT.value + 1) - Add proper error handling in execution_mixin.py with graceful degradation - Extend MessageSteeringProtocol with enabled property for contract compliance - Move test file to proper tests/integration/ directory structure - Add mandatory live agentic test with real LLM execution - Improve prompt formatting with clear separators Fixes critical bugs identified by Greptile, CodeRabbit reviewers. All changes maintain backward compatibility. Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
| try: | ||
| # Check for steering messages before tool execution | ||
| if hasattr(self, '_check_steering_messages'): | ||
| try: | ||
| steering_msg = self._check_steering_messages() | ||
| if steering_msg: | ||
| # Check if steering message indicates interruption priority | ||
| # (SteeringMixin formats HIGH/URGENT/INTERRUPT with specific prefixes) | ||
| if any(marker in steering_msg for marker in ["[URGENT USER GUIDANCE]", "[INTERRUPT]"]): | ||
| logger.info(f"Tool {function_name} execution interrupted by high-priority steering") | ||
| return f"Tool execution interrupted by user guidance: {steering_msg}" | ||
| except Exception as e: | ||
| logger.warning(f"Steering check failed, continuing with tool execution: {e}") |
There was a problem hiding this comment.
Non-urgent steering messages silently discarded during tool execution
_check_steering_messages() calls process_steering(), which dequeues the message before returning. When the returned string does not match [URGENT USER GUIDANCE], execution falls through — the message is consumed and permanently lost. A LOW or NORMAL priority message that arrives while a tool is running will never reach the LLM prompt: it was dequeued here, and execution_mixin.py already ran its own check (before chat() started), so it will not pick it up again. The fix is to skip the steering dequeue entirely for non-interrupt cases in tool_execution.py — let the message remain queued so execution_mixin.py can inject it on the next turn, and only interrupt the tool for genuinely high-priority messages.
Fixes #1799
Summary
Implements real-time message steering capability for PraisonAI agents, allowing users to send guidance messages during long-running executions without interrupting the agent.
Implementation Details
Protocol-Driven Architecture
Core Implementation
Agent Integration
Usage
Test Plan
Architecture Compliance
✅ Follows AGENTS.md protocol-driven design
✅ Core SDK contains only protocols and base implementations
✅ Uses existing message queue infrastructure
✅ Lazy imports for performance
✅ Zero overhead when disabled
✅ Backward compatible API
🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Tests