LCORE-2309: Added Pydantic AI Bridge#1817
Conversation
WalkthroughThis PR introduces a complete Pydantic AI integration for Llama Stack, adding a provider abstraction that wraps OpenAI-compatible clients, an httpx transport for in-process request routing, and utility functions for constructing AI agents. The feature supports both HTTP and in-process library modes. ChangesPydantic AI Llama Stack Integration
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/pydantic_ai_lightspeed/llamastack/_transport.py`:
- Around line 89-94: The header presence check treats names case-sensitively and
only checks two spellings, so normalize header names before deciding to inject
provider data: in the block that references self._client.provider_data and the
headers dict, compute a lowercase-key view (e.g., {k.lower(): v for k, v in
headers.items()} or use any(k.lower() == "x-llamastack-provider-data" for k in
headers)) and check for "x-llamastack-provider-data" in that normalized view;
only if absent, add the provider data using the canonical header
"X-LlamaStack-Provider-Data" to headers. Ensure you reference headers and
self._client.provider_data when applying the change.
- Around line 22-40: The _AsyncByteStream currently yields from the wrapped
async generator but doesn't forward closure; implement an async aclose(self) on
class _AsyncByteStream that calls and awaits self._gen.aclose() if the wrapped
generator has an aclose attribute to ensure httpx.Response.aclose() triggers
generator cleanup. In LlamaStackLibraryTransport.handle_async_request change the
provider-data header existence check to be case-insensitive (e.g., inspect
request.headers keys lowercased or use any(k.lower() ==
"x-llamastack-provider-data" for k in request.headers)) before injecting
"X-LlamaStack-Provider-Data" so you don't add a duplicate header when callers
use different casing.
In `@src/utils/pydantic_ai.py`:
- Around line 41-45: The helper _llama_stack_provider_from_client currently
reaches into AsyncLlamaStackClient._client (protected) when building a
LlamaStackProvider; change it to rely on the public HTTP-injection path instead:
update _llama_stack_provider_from_client to accept a public http_client
parameter (or obtain the client's configured http_client via its public API),
and pass that http_client into LlamaStackProvider(base_url=..., api_key=...,
http_client=...) rather than using client._client; ensure callers of
_llama_stack_provider_from_client are updated to provide the public http_client
argument.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2d1bdbfc-90ec-412a-b14c-474e9f95a843
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
pyproject.tomlsrc/pydantic_ai_lightspeed/__init__.pysrc/pydantic_ai_lightspeed/llamastack/__init__.pysrc/pydantic_ai_lightspeed/llamastack/_provider.pysrc/pydantic_ai_lightspeed/llamastack/_transport.pysrc/utils/pydantic_ai.pytests/unit/pydantic_ai_lightspeed/__init__.pytests/unit/pydantic_ai_lightspeed/llamastack/__init__.pytests/unit/pydantic_ai_lightspeed/llamastack/test_provider.pytests/unit/pydantic_ai_lightspeed/llamastack/test_transport.pytests/unit/utils/test_pydantic_ai.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
- GitHub Check: unit_tests (3.13)
- GitHub Check: E2E Tests for Lightspeed Evaluation job
- GitHub Check: build-pr
- GitHub Check: list_outdated_dependencies
- GitHub Check: spectral
- GitHub Check: E2E: server mode / ci / group 2
- GitHub Check: E2E: library mode / ci / group 1
- GitHub Check: E2E: server mode / ci / group 3
- GitHub Check: E2E: server mode / ci / group 1
- GitHub Check: E2E: library mode / ci / group 2
- GitHub Check: E2E: library mode / ci / group 3
🧰 Additional context used
📓 Path-based instructions (3)
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use absolute imports for internal modules:from authentication import get_auth_dependency
Llama Stack imports: Usefrom llama_stack_client import AsyncLlamaStackClient
Checkconstants.pyfor shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Uselogger = get_logger(__name__)fromlog.pyfor module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Useasync deffor I/O operations and external API calls
Use standard log levels with clear purposes:debug()for diagnostic info,info()for program execution,warning()for unexpected events,error()for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes:Configuration,Error/Exception,Resolver,Interface
Abstract classes must use ABC with@abstractmethoddecorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes
Files:
src/pydantic_ai_lightspeed/llamastack/__init__.pysrc/pydantic_ai_lightspeed/__init__.pysrc/utils/pydantic_ai.pysrc/pydantic_ai_lightspeed/llamastack/_provider.pysrc/pydantic_ai_lightspeed/llamastack/_transport.py
src/**/__init__.py
📄 CodeRabbit inference engine (AGENTS.md)
Package
__init__.pyfiles must contain brief package descriptions
Files:
src/pydantic_ai_lightspeed/llamastack/__init__.pysrc/pydantic_ai_lightspeed/__init__.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Usepytest.mark.asynciomarker for async tests
Files:
tests/unit/pydantic_ai_lightspeed/llamastack/__init__.pytests/unit/pydantic_ai_lightspeed/__init__.pytests/unit/pydantic_ai_lightspeed/llamastack/test_provider.pytests/unit/pydantic_ai_lightspeed/llamastack/test_transport.pytests/unit/utils/test_pydantic_ai.py
🔇 Additional comments (5)
pyproject.toml (1)
82-83: LGTM!src/pydantic_ai_lightspeed/__init__.py (1)
1-1: LGTM!src/pydantic_ai_lightspeed/llamastack/__init__.py (1)
1-6: LGTM!tests/unit/pydantic_ai_lightspeed/__init__.py (1)
1-1: LGTM!tests/unit/pydantic_ai_lightspeed/llamastack/__init__.py (1)
1-1: LGTM!
| class _AsyncByteStream(httpx.AsyncByteStream): | ||
| """Wraps an async byte generator as an httpx AsyncByteStream.""" | ||
|
|
||
| def __init__(self, gen: AsyncGenerator[bytes, None]) -> None: | ||
| """Store an async generator that yields raw bytes for streaming. | ||
|
|
||
| Args: | ||
| gen: An async generator producing byte chunks to stream. | ||
| """ | ||
| self._gen = gen | ||
|
|
||
| async def __aiter__(self) -> AsyncIterator[bytes]: | ||
| """Yield bytes chunks from the wrapped generator. | ||
|
|
||
| Returns: | ||
| An async iterator of bytes fulfilling the httpx.AsyncByteStream contract. | ||
| """ | ||
| async for chunk in self._gen: | ||
| yield chunk |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In httpx 0.27.0, what is the expected lifecycle for custom AsyncByteStream implementations? Specifically, should closing an httpx.Response propagate an aclose() call to the stream, and is implementing aclose() recommended when wrapping an async generator?
💡 Result:
In httpx 0.27.0, the expected lifecycle for custom AsyncByteStream implementations is that consuming the response body to completion automatically closes the stream, and explicit Response.aclose is what closes the response and releases the connection. When a custom stream needs cleanup (e.g., releasing resources), it should implement aclose (and ensure it’s safe to call). 1) Should closing an httpx.Response propagate an aclose call to the stream? Yes. - Response.aclose is implemented to “Close the response and release the connection” and it explicitly calls await self.stream.aclose for AsyncByteStream instances [1]. - The AsyncByteStream base class’s aread method ensures that, when the stream is read, it calls await self.aclose in a finally block [2]. - The Response streaming methods are documented as being closed automatically when used inside the client’s.stream context manager; alternatively, in manual mode you must call Response.aclose eventually, otherwise connections may be left open [3]. So: Response close/aclose drives stream cleanup via AsyncByteStream.aclose. 2) Is implementing aclose recommended when wrapping an async generator? Yes, if your wrapper is a custom AsyncByteStream that owns resources or needs generator cleanup. The AsyncByteStream interface includes an aclose method specifically for cleanup, and the framework relies on it to be awaited at the end of reading [2]. Additionally, httpx’s own AsyncIteratorByteStream wrapper records whether the provided iterable is an async generator and enforces “StreamConsumed” semantics for async generators after they’ve been iterated once [4]. That wrapper’s code also treats aread/iteration completion as a point at which aclose is expected to happen (via the base-class aread finally calling aclose) [2][4]. Practical guidance for custom wrappers around async generators: - Implement AsyncByteStream.aclose and ensure it properly closes the underlying generator (e.g., by calling await agen.aclose in your wrapper’s aclose, or using a pattern equivalent to that), so that early Response.aclose or early loop exit triggers generator cleanup via stream.aclose [1]. - If your wrapper doesn’t implement aclose, you won’t get cleanup when Response.aclose calls into the stream, beyond whatever implicit behavior happens to occur during iteration [2][1]. References (from httpx code/docs): AsyncByteStream.aclose contract and aread finally behavior [2]; Response.aclose calling await self.stream.aclose [1]; Response streaming context/manual-mode cleanup expectations [3]; AsyncIteratorByteStream’s handling of async generators [4].
Citations:
- 1: https://github.com/encode/httpx/blob/master/httpx/_models.py
- 2: https://github.com/encode/httpx/blob/397aad98fdc8b7580a5fc3e88f1578b4302c6382/httpx/_transports/base.py
- 3: https://www.python-httpx.org/async/
- 4: https://github.com/encode/httpx/blob/def4778d/httpx/_content.py
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate the target file and print the relevant sections around _AsyncByteStream and header logic.
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
echo "== File exists =="
ls -l "$FILE"
echo
echo "== _AsyncByteStream definition =="
rg -n "_AsyncByteStream" "$FILE"
rg -n "class _AsyncByteStream|def __aiter__" "$FILE"
echo
# Print a reasonable window around the class definition
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-20))
END=$((LINE+80))
sed -n "${START},${END}p" "$FILE" | nl -ba
echo
echo "== Search for provider-data header injection =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true
echo
echo "== Any header casing / duplicate handling nearby =="
# Print around the first match if found
FIRST=$(rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" | head -n1 | cut -d: -f1 || true)
if [[ -n "${FIRST:-}" ]]; then
START=$((FIRST-30))
END=$((FIRST+80))
sed -n "${START},${END}p" "$FILE" | nl -ba
fi
echo
echo "== Any existing aclose implementation in this file =="
rg -n "def aclose|aclose\(" "$FILE" || trueRepository: lightspeed-core/lightspeed-stack
Length of output: 505
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
echo "== _AsyncByteStream definition =="
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-15))
END=$((LINE+90))
sed -n "${START},${END}p" "$FILE" | cat -n -w3 -s ':'
echo
echo "== Search for provider-data header injection =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true
echo
echo "== Any existing aclose implementation in this file =="
rg -n "def aclose|aclose\(" "$FILE" || trueRepository: lightspeed-core/lightspeed-stack
Length of output: 194
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
echo "== _AsyncByteStream definition (context) =="
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-15))
END=$((LINE+90))
sed -n "${START},${END}p" "$FILE" | cat -n
echo
echo "== provider-data header injection hits =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true
echo
echo "== Any existing aclose implementation / calls =="
rg -n "def aclose|aclose\(" "$FILE" || trueRepository: lightspeed-core/lightspeed-stack
Length of output: 4951
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
sed -n '1,40p' "$FILE" | cat -nRepository: lightspeed-core/lightspeed-stack
Length of output: 1652
Add aclose() propagation for _AsyncByteStream and fix Provider-Data header casing normalization
-
_AsyncByteStreamonly implements__aiter__; implementaclose()to forward cleanup to the wrapped async generator sohttpx.Response.aclose()(especially on early-abandoned streaming) triggers generatoraclose().Proposed fix
class _AsyncByteStream(httpx.AsyncByteStream): """Wraps an async byte generator as an httpx AsyncByteStream.""" @@ async def __aiter__(self) -> AsyncIterator[bytes]: """Yield bytes chunks from the wrapped generator. @@ async for chunk in self._gen: yield chunk + + async def aclose(self) -> None: + """Close the wrapped generator when the response is closed early.""" + aclose = getattr(self._gen, "aclose", None) + if aclose is not None: + await aclose() -
LlamaStackLibraryTransport.handle_async_requestchecks for an existingX-LlamaStack-Provider-Datausing only two literal casings (X-LlamaStack-Provider-Dataandx-llamastack-provider-data); treat header names case-insensitively to avoid injecting a duplicate header when the caller uses different casing.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/pydantic_ai_lightspeed/llamastack/_transport.py` around lines 22 - 40,
The _AsyncByteStream currently yields from the wrapped async generator but
doesn't forward closure; implement an async aclose(self) on class
_AsyncByteStream that calls and awaits self._gen.aclose() if the wrapped
generator has an aclose attribute to ensure httpx.Response.aclose() triggers
generator cleanup. In LlamaStackLibraryTransport.handle_async_request change the
provider-data header existence check to be case-insensitive (e.g., inspect
request.headers keys lowercased or use any(k.lower() ==
"x-llamastack-provider-data" for k in request.headers)) before injecting
"X-LlamaStack-Provider-Data" so you don't add a duplicate header when callers
use different casing.
| if self._client.provider_data: | ||
| keys = ["X-LlamaStack-Provider-Data", "x-llamastack-provider-data"] | ||
| if all(key not in headers for key in keys): | ||
| headers["X-LlamaStack-Provider-Data"] = json.dumps( | ||
| self._client.provider_data | ||
| ) |
There was a problem hiding this comment.
Normalize header names before deciding whether to inject provider data.
HTTP header names are case-insensitive. This check only recognizes two spellings, so a request that already carries X-LLAMASTACK-PROVIDER-DATA or another casing variant will get a second header injected and may use the wrong provider data.
Proposed fix
if self._client.provider_data:
- keys = ["X-LlamaStack-Provider-Data", "x-llamastack-provider-data"]
- if all(key not in headers for key in keys):
+ normalized_headers = {key.lower() for key in headers}
+ if "x-llamastack-provider-data" not in normalized_headers:
headers["X-LlamaStack-Provider-Data"] = json.dumps(
self._client.provider_data
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/pydantic_ai_lightspeed/llamastack/_transport.py` around lines 89 - 94,
The header presence check treats names case-sensitively and only checks two
spellings, so normalize header names before deciding to inject provider data: in
the block that references self._client.provider_data and the headers dict,
compute a lowercase-key view (e.g., {k.lower(): v for k, v in headers.items()}
or use any(k.lower() == "x-llamastack-provider-data" for k in headers)) and
check for "x-llamastack-provider-data" in that normalized view; only if absent,
add the provider data using the canonical header "X-LlamaStack-Provider-Data" to
headers. Ensure you reference headers and self._client.provider_data when
applying the change.
| return LlamaStackProvider( | ||
| base_url=base_url, | ||
| api_key=api_key, | ||
| http_client=client._client, # pylint: disable=protected-access | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In llama-stack-client 0.6.0, does AsyncLlamaStackClient expose a public way to access or reuse its underlying httpx.AsyncClient, or is _client intentionally private/internal?
💡 Result:
In llama-stack-client 0.6.0, AsyncLlamaStackClient does not publicly expose (or promise) direct access to/reuse of its underlying httpx.AsyncClient; the underlying HTTP client is intended to be managed via the public http_client configuration/injection API, not by reaching into a private/internal attribute like _client. Evidence: - The official client Python README documents that for async usage you may configure the HTTP backend by passing an http_client when constructing AsyncLlamaStackClient (including using DefaultAioHttpClient) and shows using the client as an async context manager [1]. This is the supported “reuse/customize” mechanism for the underlying transport/client. - The same README also provides a documented mechanism to “directly override” the underlying HTTP client (via DefaultHttpxClient) and use with_options(http_client=...) for customizing on a per-request basis, indicating the library expects callers to inject/override the HTTP client rather than extracting it from internals [1]. So, _client is intentionally private/internal: callers should reuse the HTTP resources by passing the desired http_client (e.g., a DefaultHttpxClient with a custom httpx.AsyncClient/transport) into AsyncLlamaStackClient, or by using the async context manager lifecycle, rather than accessing a hidden _client attribute directly [1].
Citations:
Don’t depend on AsyncLlamaStackClient._client; use the public http_client injection path.
The helper passes a protected/internal attribute into LlamaStackProvider:
return LlamaStackProvider(
base_url=base_url,
api_key=api_key,
http_client=client._client, # pylint: disable=protected-access
)llama-stack-client’s docs indicate the underlying HTTP client should be managed via the public http_client configuration/override API, so reaching into _client is an unstable dependency that can break at runtime even if mocks keep CI green. Refactor _llama_stack_provider_from_client to accept/pass through a publicly injected http_client (or construct the needed client using that same public injection) rather than extracting it from AsyncLlamaStackClient.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/utils/pydantic_ai.py` around lines 41 - 45, The helper
_llama_stack_provider_from_client currently reaches into
AsyncLlamaStackClient._client (protected) when building a LlamaStackProvider;
change it to rely on the public HTTP-injection path instead: update
_llama_stack_provider_from_client to accept a public http_client parameter (or
obtain the client's configured http_client via its public API), and pass that
http_client into LlamaStackProvider(base_url=..., api_key=..., http_client=...)
rather than using client._client; ensure callers of
_llama_stack_provider_from_client are updated to provide the public http_client
argument.
| lib_http_client = httpx.AsyncClient( | ||
| transport=transport, |
Description
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
New Features
Chores
Tests