diff --git a/backend/app/crud/model_config.py b/backend/app/crud/model_config.py index a7ee71ee1..6e9b999ed 100644 --- a/backend/app/crud/model_config.py +++ b/backend/app/crud/model_config.py @@ -120,7 +120,7 @@ def validate_blob_model_or_raise(session: Session, blob: ConfigBlob) -> None: if raw_provider is None: return - if raw_provider.endswith("-native"): + if raw_provider.endswith(NATIVE_PROVIDER_SUFFIX): return provider = _normalize_provider(raw_provider) diff --git a/backend/app/crud/rag/open_ai.py b/backend/app/crud/rag/open_ai.py index 2ae36f4f1..87dfc3aaa 100644 --- a/backend/app/crud/rag/open_ai.py +++ b/backend/app/crud/rag/open_ai.py @@ -4,6 +4,7 @@ from io import BytesIO from typing import Iterable +import openai from openai import OpenAI, OpenAIError from pydantic import BaseModel @@ -130,19 +131,112 @@ def update( files.append(f_obj) logger.info( - f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}" - ) - req = self.client.vector_stores.file_batches.upload_and_poll( - vector_store_id=vector_store_id, - files=files, + f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | " + f"{{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}" ) + + try: + req = self.client.vector_stores.file_batches.upload_and_poll( + vector_store_id=vector_store_id, + files=files, + ) + except openai.RateLimitError as e: + raise InterruptedError( + f"OpenAI rate limit exceeded (code: {e.status_code}): " + f"{e.message}. Try again in 1 minute. If issue persists, " + f"contact Kaapi." + ) + except openai.AuthenticationError as e: + raise InterruptedError( + f"OpenAI authentication failed (code: {e.status_code}): " + f"{e.message}. Check your OpenAI API key is valid and " + f"has not expired." + ) + except openai.NotFoundError as e: + raise InterruptedError( + f"OpenAI resource not found (code: {e.status_code}): " + f"{e.message}. Verify the vector store ID exists and " + f"hasn't been deleted." + ) + except openai.BadRequestError as e: + raise InterruptedError( + f"OpenAI bad request (code: {e.status_code}): {e.message}. " + f"Review the file payload and metadata; the request may " + f"be malformed." + ) + except openai.UnprocessableEntityError as e: + raise InterruptedError( + f"OpenAI unprocessable entity (code: {e.status_code}): " + f"{e.message}. The uploaded files may be in an " + f"unsupported format or exceed size limits." + ) + except openai.InternalServerError as e: + raise InterruptedError( + f"OpenAI server error (code: {e.status_code}): {e.message}. " + f"This is usually transient — retry in a few seconds. If " + f"issue persists, contact Kaapi." + ) + except openai.APITimeoutError as e: + raise InterruptedError( + f"OpenAI request timed out: {e}. Retry the upload, or " + f"split the batch into smaller chunks." + ) + except openai.OpenAIError as e: + raise InterruptedError( + f"OpenAI error: {e}. If this persists, contact Kaapi." + ) + logger.info( - f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" + f"[OpenAIVectorStoreCrud.update] File upload completed | " + f"{{'vector_store_id': '{vector_store_id}', " + f"'completed_files': {req.file_counts.completed}, " + f"'total_files': {req.file_counts.total}}}" ) + if req.file_counts.completed != req.file_counts.total: - error_msg = f"OpenAI document processing error: {req.file_counts.completed}/{req.file_counts.total} files completed" + # Enrich the error string by listing each failed file's + # `last_error.message` from OpenAI. Fall back to the + # count-only message if the follow-up list_files call + # itself fails. + failed_summary = "" + try: + page = self.client.vector_stores.file_batches.list_files( + batch_id=req.id, + vector_store_id=vector_store_id, + filter="failed", + limit=10, + ) + parts = [] + for f in page: + f_err = getattr(f, "last_error", None) + f_msg = ( + getattr(f_err, "message", None) if f_err else None + ) or "Unknown error" + parts.append(f"{f.id} ({f_msg})") + failed_summary = ", ".join(parts) + if getattr(page, "has_more", False): + failed_summary = f"{failed_summary}, ..." + if len(failed_summary) > 600: + failed_summary = failed_summary[:597] + "..." + except OpenAIError as list_err: + logger.warning( + f"[OpenAIVectorStoreCrud.update] Could not list failed " + f"files | {{'vector_store_id': '{vector_store_id}', " + f"'batch_id': '{req.id}', 'error': '{list_err}'}}" + ) + + error_msg = ( + f"OpenAI document processing error: " + f"{req.file_counts.completed}/{req.file_counts.total} " + f"files completed" + ) + if failed_summary: + error_msg = f"{error_msg}. Failed files: {failed_summary}" logger.error( - f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" + f"[OpenAIVectorStoreCrud.update] Document processing error | " + f"{{'vector_store_id': '{vector_store_id}', " + f"'completed_files': {req.file_counts.completed}, " + f"'total_files': {req.file_counts.total}}}" ) raise InterruptedError(error_msg) diff --git a/backend/app/services/llm/providers/oai.py b/backend/app/services/llm/providers/oai.py index cbbb47172..474a05eb6 100644 --- a/backend/app/services/llm/providers/oai.py +++ b/backend/app/services/llm/providers/oai.py @@ -138,21 +138,110 @@ def execute( error_message = f"Invalid or unexpected parameter in Config: {str(e)}" return None, error_message - except openai.OpenAIError as e: - # imported here to avoid circular imports - from app.utils import handle_openai_error + except openai.RateLimitError as e: + error_message = ( + f"OpenAI rate limit exceeded (code: {e.status_code}): " + f"{e.message}. Try again in 1 minute. If issue persists, " + f"contact Kaapi." + ) + logger.warning( + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", + exc_info=True, + ) + return None, error_message + + except openai.AuthenticationError as e: + error_message = ( + f"OpenAI authentication failed (code: {e.status_code}): " + f"{e.message}. Check your OpenAI API key is valid and has " + f"not expired." + ) + logger.warning( + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", + exc_info=True, + ) + return None, error_message - error_message = handle_openai_error(e) + except openai.NotFoundError as e: + error_message = ( + f"OpenAI resource not found (code: {e.status_code}): " + f"{e.message}. Verify the model name and any referenced IDs " + f"in your config are correct." + ) + logger.warning( + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", + exc_info=True, + ) + return None, error_message + + except openai.BadRequestError as e: + error_message = ( + f"OpenAI bad request (code: {e.status_code}): {e.message}. " + f"Review your config parameters; the request shape may be " + f"invalid." + ) + logger.warning( + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", + exc_info=True, + ) + return None, error_message + + except openai.UnprocessableEntityError as e: + error_message = ( + f"OpenAI unprocessable entity (code: {e.status_code}): " + f"{e.message}. The model rejected the request payload; " + f"check input format and limits." + ) + logger.warning( + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", + exc_info=True, + ) + return None, error_message + + except openai.InternalServerError as e: + error_message = ( + f"OpenAI server error (code: {e.status_code}): {e.message}. " + f"This is usually transient — retry in a few seconds. If " + f"issue persists, contact Kaapi." + ) + logger.warning( + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", + exc_info=True, + ) + return None, error_message + + except openai.APITimeoutError as e: + error_message = ( + f"OpenAI request timed out: {e}. Retry the request, or try " + f"with a smaller payload." + ) + logger.warning( + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", + exc_info=True, + ) + return None, error_message + + except openai.OpenAIError as e: + error_message = f"OpenAI error: {e}. If this persists, contact Kaapi." logger.warning( - f"[OpenAIProvider.execute] OpenAI API error: {error_message} | provider={completion_config.provider}", + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", exc_info=True, ) return None, error_message except Exception as e: - error_message = "Unexpected error occurred" + error_message = f"Unexpected error: {e}" logger.error( - f"[OpenAIProvider.execute] {error_message}: {str(e)} | provider={completion_config.provider}", + f"[OpenAIProvider.execute] {error_message} | " + f"provider={completion_config.provider}", exc_info=True, ) return None, error_message diff --git a/backend/app/tests/crud/rag/__init__.py b/backend/app/tests/crud/rag/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/app/tests/crud/rag/test_open_ai.py b/backend/app/tests/crud/rag/test_open_ai.py new file mode 100644 index 000000000..b71b1835b --- /dev/null +++ b/backend/app/tests/crud/rag/test_open_ai.py @@ -0,0 +1,336 @@ +""" +Tests for OpenAIVectorStoreCrud.update — focused on the failure-path error +enrichment added in this PR (per-file reasons + category-prefixed messages +for each specific OpenAI exception type). +""" + +from unittest.mock import MagicMock + +import openai +import pytest + +from app.crud.rag.open_ai import OpenAIVectorStoreCrud + + +@pytest.fixture +def mock_client(): + return MagicMock() + + +@pytest.fixture +def crud(mock_client): + return OpenAIVectorStoreCrud(client=mock_client) + + +@pytest.fixture +def mock_storage(): + storage = MagicMock() + storage.get.return_value = b"file content" + return storage + + +@pytest.fixture +def docs_batch(): + """One batch with two documents. update() loops `for docs in documents`.""" + doc1 = MagicMock(object_store_url="s3://bucket/file1.pdf", fname="file1.pdf") + doc2 = MagicMock(object_store_url="s3://bucket/file2.pdf", fname="file2.pdf") + return [[doc1, doc2]] + + +def _batch_result(*, completed: int, total: int, batch_id: str = "batch_abc"): + """Mock the return of vector_stores.file_batches.upload_and_poll.""" + counts = MagicMock(completed=completed, total=total) + return MagicMock(id=batch_id, file_counts=counts) + + +def _failed_file_page(files, has_more: bool = False): + """Mock the iterable returned by list_files(filter='failed').""" + page = MagicMock() + page.__iter__ = MagicMock(return_value=iter(files)) + page.has_more = has_more + return page + + +def _failed_file(file_id: str, error_message: str | None): + """Build one failed-file row. last_error=None means 'no reason recorded'.""" + f = MagicMock() + f.id = file_id + f.last_error = MagicMock(message=error_message) if error_message else None + return f + + +class TestOpenAIVectorStoreCrudUpdateSuccess: + def test_yields_docs_when_all_files_complete( + self, crud, mock_client, mock_storage, docs_batch + ): + mock_client.vector_stores.file_batches.upload_and_poll.return_value = ( + _batch_result(completed=2, total=2) + ) + + yielded = list(crud.update("vs_1", mock_storage, docs_batch)) + + # update yields from the inner docs list on success + assert len(yielded) == 2 + # list_files should not have been called on the happy path + mock_client.vector_stores.file_batches.list_files.assert_not_called() + + +class TestOpenAIVectorStoreCrudUpdatePartialFailure: + """Partial completion -> InterruptedError with enriched per-file reasons.""" + + def test_includes_failed_file_ids_and_messages( + self, crud, mock_client, mock_storage, docs_batch + ): + mock_client.vector_stores.file_batches.upload_and_poll.return_value = ( + _batch_result(completed=1, total=3) + ) + mock_client.vector_stores.file_batches.list_files.return_value = ( + _failed_file_page( + [ + _failed_file("file-abc", "Unsupported file type"), + _failed_file("file-xyz", "File too large"), + ] + ) + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + + msg = str(exc_info.value) + assert "OpenAI document processing error" in msg + assert "1/3 files completed" in msg + assert "Failed files:" in msg + assert "file-abc (Unsupported file type)" in msg + assert "file-xyz (File too large)" in msg + + def test_reports_unknown_error_when_last_error_missing( + self, crud, mock_client, mock_storage, docs_batch + ): + """A failed file with no `last_error` shouldn't drop out of the + summary — it gets 'Unknown error' so the user sees that something + was wrong with that file even if OpenAI didn't tell us what.""" + mock_client.vector_stores.file_batches.upload_and_poll.return_value = ( + _batch_result(completed=0, total=1) + ) + mock_client.vector_stores.file_batches.list_files.return_value = ( + _failed_file_page([_failed_file("file-noerr", None)]) + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + + assert "file-noerr (Unknown error)" in str(exc_info.value) + + def test_appends_ellipsis_when_has_more_results( + self, crud, mock_client, mock_storage, docs_batch + ): + """When OpenAI returns has_more=True we cap at the first 10 entries + and signal truncation with a trailing ', ...' so callers know more + failures exist beyond what's shown.""" + mock_client.vector_stores.file_batches.upload_and_poll.return_value = ( + _batch_result(completed=0, total=100) + ) + mock_client.vector_stores.file_batches.list_files.return_value = ( + _failed_file_page( + [_failed_file(f"file-{i}", "err") for i in range(10)], + has_more=True, + ) + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + + assert str(exc_info.value).endswith(", ...") + + def test_truncates_summary_when_over_600_chars( + self, crud, mock_client, mock_storage, docs_batch + ): + """Long error blobs are truncated at 597 chars + '...' so callback + payloads stay bounded regardless of what OpenAI returns.""" + mock_client.vector_stores.file_batches.upload_and_poll.return_value = ( + _batch_result(completed=0, total=10) + ) + # Inflate per-file strings to push the joined summary past 600 chars. + mock_client.vector_stores.file_batches.list_files.return_value = ( + _failed_file_page( + [ + _failed_file(f"file-{'x' * 80}-{i}", "long error " * 10) + for i in range(10) + ] + ) + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + + msg = str(exc_info.value) + marker = "Failed files: " + summary = msg[msg.index(marker) + len(marker) :] + assert len(summary) == 600 + assert summary.endswith("...") + + def test_falls_back_to_count_only_when_list_files_errors( + self, crud, mock_client, mock_storage, docs_batch + ): + """If the follow-up list_files lookup itself raises, the update + still raises InterruptedError but with the original count-only + message — no 'Failed files:' suffix, no transient list_files crash + masking the real upload problem.""" + mock_client.vector_stores.file_batches.upload_and_poll.return_value = ( + _batch_result(completed=0, total=3) + ) + mock_client.vector_stores.file_batches.list_files.side_effect = ( + openai.OpenAIError("list failed") + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + + msg = str(exc_info.value) + assert "0/3 files completed" in msg + assert "Failed files:" not in msg + + +class TestOpenAIVectorStoreCrudUpdateOpenAIExceptions: + """`upload_and_poll` raising each specific OpenAI exception type maps to + `InterruptedError` with a category-prefixed message that includes the + upstream status code and a remediation hint. + + Assertions are deliberately structural (prefix + code + original message) + rather than exact-string equality so future tweaks to the remediation + wording don't break the suite. + """ + + @pytest.mark.parametrize( + "exception_factory, expected_prefix, expected_status, original_message", + [ + ( + lambda: openai.RateLimitError( + message="quota exceeded", + response=MagicMock( + status_code=429, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI rate limit exceeded", + 429, + "quota exceeded", + ), + ( + lambda: openai.AuthenticationError( + message="bad api key", + response=MagicMock( + status_code=401, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI authentication failed", + 401, + "bad api key", + ), + ( + lambda: openai.NotFoundError( + message="missing resource", + response=MagicMock( + status_code=404, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI resource not found", + 404, + "missing resource", + ), + ( + lambda: openai.BadRequestError( + message="invalid file", + response=MagicMock( + status_code=400, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI bad request", + 400, + "invalid file", + ), + ( + lambda: openai.UnprocessableEntityError( + message="cannot process", + response=MagicMock( + status_code=422, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI unprocessable entity", + 422, + "cannot process", + ), + ( + lambda: openai.InternalServerError( + message="upstream boom", + response=MagicMock( + status_code=500, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI server error", + 500, + "upstream boom", + ), + ], + ) + def test_specific_openai_exception_maps_to_category_prefix( + self, + crud, + mock_client, + mock_storage, + docs_batch, + exception_factory, + expected_prefix, + expected_status, + original_message, + ): + mock_client.vector_stores.file_batches.upload_and_poll.side_effect = ( + exception_factory() + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + msg = str(exc_info.value) + assert msg.startswith(expected_prefix), msg + assert f"code: {expected_status}" in msg, msg + assert original_message in msg, msg + + def test_api_timeout_error(self, crud, mock_client, mock_storage, docs_batch): + """APITimeoutError doesn't expose .message — handler interpolates str(e).""" + mock_client.vector_stores.file_batches.upload_and_poll.side_effect = ( + openai.APITimeoutError(request=MagicMock()) + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + assert str(exc_info.value).startswith("OpenAI request timed out:") + + def test_generic_openai_error_falls_through( + self, crud, mock_client, mock_storage, docs_batch + ): + """Any OpenAIError subclass without a dedicated handler lands in the + bottom-most `except openai.OpenAIError` block — prefixed with the + generic "OpenAI error" tag but still carrying the original message. + """ + mock_client.vector_stores.file_batches.upload_and_poll.side_effect = ( + openai.OpenAIError("something else") + ) + + with pytest.raises(InterruptedError) as exc_info: + list(crud.update("vs_1", mock_storage, docs_batch)) + msg = str(exc_info.value) + assert msg.startswith("OpenAI error:"), msg + assert "something else" in msg, msg + + +class TestOpenAIVectorStoreCrudInit: + """The base OpenAICrud init rejects a None client; subclasses inherit it.""" + + def test_none_client_raises(self): + with pytest.raises(ValueError): + OpenAIVectorStoreCrud(client=None) diff --git a/backend/app/tests/services/llm/providers/test_openai.py b/backend/app/tests/services/llm/providers/test_openai.py index f86395f58..f80bdfc1b 100644 --- a/backend/app/tests/services/llm/providers/test_openai.py +++ b/backend/app/tests/services/llm/providers/test_openai.py @@ -3,7 +3,7 @@ """ import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import openai @@ -170,36 +170,152 @@ def test_execute_with_type_error( def test_execute_with_openai_api_error( self, provider, mock_client, completion_config, query_params ): - """Test handling of OpenAI API errors.""" + """Generic `openai.APIError` (no specific subclass) falls through to the + `except openai.OpenAIError` block, which prefixes with "OpenAI error:" + and uses the raw exception string. No wrapper helper is invoked.""" mock_client.responses.create.side_effect = openai.APIError( message="API request failed", request=MagicMock(), body=None, ) - with patch("app.utils.handle_openai_error") as mock_handler: - mock_handler.return_value = "API request failed: rate limit exceeded" - - result, error = provider.execute( - completion_config, query_params, "Test query" - ) + result, error = provider.execute(completion_config, query_params, "Test query") - assert result is None - assert error is not None - assert "API request failed" in error - mock_handler.assert_called_once() + assert result is None + assert error is not None + assert error.startswith("OpenAI error:") + assert "API request failed" in error def test_execute_with_generic_exception( self, provider, mock_client, completion_config, query_params ): - """Test handling of unexpected exceptions.""" + """Non-OpenAI exceptions land in the `except Exception` catch-all, + prefixed with "Unexpected error:" and carrying the exception text.""" mock_client.responses.create.side_effect = Exception("Timeout occurred") result, error = provider.execute(completion_config, query_params, "Test query") assert result is None assert error is not None - assert "Unexpected error occurred" in error + assert error.startswith("Unexpected error:") + assert "Timeout occurred" in error + + @pytest.mark.parametrize( + "exception_factory, expected_prefix, expected_status, original_message", + [ + ( + lambda: openai.RateLimitError( + message="quota exceeded", + response=MagicMock( + status_code=429, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI rate limit exceeded", + 429, + "quota exceeded", + ), + ( + lambda: openai.AuthenticationError( + message="bad api key", + response=MagicMock( + status_code=401, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI authentication failed", + 401, + "bad api key", + ), + ( + lambda: openai.NotFoundError( + message="model not found", + response=MagicMock( + status_code=404, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI resource not found", + 404, + "model not found", + ), + ( + lambda: openai.BadRequestError( + message="invalid model param", + response=MagicMock( + status_code=400, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI bad request", + 400, + "invalid model param", + ), + ( + lambda: openai.UnprocessableEntityError( + message="cannot process", + response=MagicMock( + status_code=422, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI unprocessable entity", + 422, + "cannot process", + ), + ( + lambda: openai.InternalServerError( + message="upstream boom", + response=MagicMock( + status_code=500, request=MagicMock(), headers={} + ), + body=None, + ), + "OpenAI server error", + 500, + "upstream boom", + ), + ], + ) + def test_execute_specific_openai_exceptions_use_category_prefix( + self, + provider, + mock_client, + completion_config, + query_params, + exception_factory, + expected_prefix, + expected_status, + original_message, + ): + """Each specific OpenAI exception with a dedicated handler emits a + category-prefixed message including the upstream status code and the + original error text. Structural assertions only — the remediation + suffix can evolve without breaking the suite. + """ + mock_client.responses.create.side_effect = exception_factory() + + result, error = provider.execute(completion_config, query_params, "Test query") + + assert result is None + assert error is not None + assert error.startswith(expected_prefix), error + assert f"code: {expected_status}" in error, error + assert original_message in error, error + + def test_execute_with_api_timeout_error( + self, provider, mock_client, completion_config, query_params + ): + """APITimeoutError doesn't expose .message — handler interpolates str(e).""" + mock_client.responses.create.side_effect = openai.APITimeoutError( + request=MagicMock() + ) + + result, error = provider.execute(completion_config, query_params, "Test query") + + assert result is None + assert error is not None + assert error.startswith("OpenAI request timed out:") def test_execute_with_conversation_config_without_id_or_auto_create( self, provider, mock_client, completion_config, query_params