From 181bf599cb3d2b70ee418a9ddc8400e967869f28 Mon Sep 17 00:00:00 2001 From: Ankit raj <113342181+ankit-v2-3@users.noreply.github.com> Date: Tue, 23 Jun 2026 12:53:16 +0530 Subject: [PATCH 01/11] feat: add indexing v2 --- videodb/__init__.py | 4 + videodb/_constants.py | 20 ++++ videodb/index.py | 209 ++++++++++++++++++++++++++++++++++++++++++ videodb/video.py | 169 ++++++++++++++++++++++++++++++++++ 4 files changed, 402 insertions(+) create mode 100644 videodb/index.py diff --git a/videodb/__init__.py b/videodb/__init__.py index 59d2a1b..924c179 100644 --- a/videodb/__init__.py +++ b/videodb/__init__.py @@ -8,6 +8,8 @@ from videodb._constants import ( VIDEO_DB_API, IndexType, + IndexCapability, + FieldGroup, SceneExtractionType, MediaType, SearchType, @@ -53,6 +55,8 @@ "AuthenticationError", "InvalidRequestError", "IndexType", + "IndexCapability", + "FieldGroup", "SearchError", "play_stream", "build_iframe_embed_code", diff --git a/videodb/_constants.py b/videodb/_constants.py index f7de578..69abefd 100644 --- a/videodb/_constants.py +++ b/videodb/_constants.py @@ -36,6 +36,24 @@ class IndexType: scene = "scene" +class IndexCapability: + """Retrieval capabilities an index can be built for (``use_for``).""" + + semantic = "semantic" + query = "query" + aggregate = "aggregate" + + +class FieldGroup: + """Field groups that map artifact fields to retrieval capabilities.""" + + semantic = "semantic" + text = "text" + filter = "filter" + aggregate = "aggregate" + sort = "sort" + + class SceneExtractionType: shot_based = "shot" time_based = "time" @@ -80,6 +98,8 @@ class ApiPath: upload_url = "upload_url" transcription = "transcription" index = "index" + indexes = "indexes" + records = "records" search = "search" compile = "compile" workflow = "workflow" diff --git a/videodb/index.py b/videodb/index.py new file mode 100644 index 0000000..6c03d9c --- /dev/null +++ b/videodb/index.py @@ -0,0 +1,209 @@ +from typing import List, Optional + +from videodb._constants import ApiPath + + +class FieldSchema: + """Schema details for a single indexed field. + + :ivar str type: Data type of the field (e.g. ``"string"``, ``"string[]"``, ``"number"``) + :ivar list groups: Field groups this field belongs to (e.g. ``["semantic", "filter"]``) + :ivar list operators: Filter operators supported by the field (for filterable fields) + """ + + def __init__( + self, + type: Optional[str] = None, + groups: Optional[List[str]] = None, + operators: Optional[List[str]] = None, + ) -> None: + self.type = type + self.groups = groups or [] + self.operators = operators or [] + + def __repr__(self) -> str: + return ( + f"FieldSchema(" + f"type={self.type}, " + f"groups={self.groups}, " + f"operators={self.operators})" + ) + + def __getitem__(self, key): + return self.__dict__[key] + + +class IndexRecord: + """A single indexed record (one temporal segment of an index). + + :ivar str video_id: ID of the video the record belongs to + :ivar str understanding_id: ID of the understanding run that produced the record + :ivar str segment_id: ID of the segment within the artifact + :ivar float start_sec: Start time of the segment in seconds + :ivar float end_sec: End time of the segment in seconds + :ivar dict data: Indexed field values for the segment + """ + + def __init__( + self, + video_id: Optional[str] = None, + understanding_id: Optional[str] = None, + segment_id: Optional[str] = None, + start_sec: Optional[float] = None, + end_sec: Optional[float] = None, + data: Optional[dict] = None, + ) -> None: + self.video_id = video_id + self.understanding_id = understanding_id + self.segment_id = segment_id + self.start_sec = start_sec + self.end_sec = end_sec + self.data = data or {} + + def __repr__(self) -> str: + return ( + f"IndexRecord(" + f"video_id={self.video_id}, " + f"segment_id={self.segment_id}, " + f"start_sec={self.start_sec}, " + f"end_sec={self.end_sec}, " + f"data={self.data})" + ) + + def __getitem__(self, key): + return self.__dict__[key] + + +class RecordPage: + """A page of indexed records returned by :meth:`Index.records`. + + :ivar list[IndexRecord] records: Records in this page + :ivar str next_cursor: Cursor for the next page, ``None`` if there are no more records + """ + + def __init__( + self, + records: Optional[List[IndexRecord]] = None, + next_cursor: Optional[str] = None, + ) -> None: + self.records = records or [] + self.next_cursor = next_cursor + + def __repr__(self) -> str: + return ( + f"RecordPage(records={len(self.records)}, next_cursor={self.next_cursor})" + ) + + def __iter__(self): + return iter(self.records) + + def __getitem__(self, key): + return self.records[key] + + +class Index: + """Index manifest for a retrieval-ready index built from an understanding artifact. + + Note: Users should not initialize this class directly. + Instead use :meth:`Video.index() `, + :meth:`Video.get_index() `, or + :meth:`Video.list_indexes() `. + + :ivar str index_id: Unique identifier for the index + :ivar str video_id: ID of the video this index belongs to + :ivar str collection_id: ID of the collection this index belongs to + :ivar str name: User-facing name of the index + :ivar str status: Build status of the index (e.g. ``"building"``, ``"ready"``, ``"failed"``) + :ivar list use_for: Retrieval capabilities the index supports + (subset of ``"semantic"``, ``"query"``, ``"aggregate"``) + :ivar source: Source artifact reference or records the index was built from + :ivar int record_count: Number of records in the index + :ivar dict fields: Field groups mapping (``semantic``, ``text``, ``filter``, + ``aggregate``, ``sort``) to lists of field names + :ivar dict field_schema: Mapping of field name to :class:`FieldSchema ` + """ + + def __init__( + self, _connection, video_id: str, collection_id: str = None, **kwargs + ) -> None: + self._connection = _connection + self.video_id = video_id + self.collection_id = collection_id + self.index_id = kwargs.get("index_id") + self.name = kwargs.get("name") + self.status = kwargs.get("status") + self.use_for = kwargs.get("use_for", []) + self.source = kwargs.get("source") + self.record_count = kwargs.get("record_count") + self.fields = kwargs.get("fields", {}) + self.field_schema = { + field: FieldSchema( + type=schema.get("type"), + groups=schema.get("groups"), + operators=schema.get("operators"), + ) + for field, schema in (kwargs.get("field_schema") or {}).items() + } + + def __repr__(self) -> str: + return ( + f"Index(" + f"index_id={self.index_id}, " + f"video_id={self.video_id}, " + f"name={self.name}, " + f"status={self.status}, " + f"use_for={self.use_for}, " + f"record_count={self.record_count})" + ) + + def __getitem__(self, key): + return self.__dict__[key] + + def records(self, limit: int = 20, cursor: Optional[str] = None) -> RecordPage: + """Preview the records stored in the index. + + Intended for inspection and debugging. Records are paginated via a cursor. + + :param int limit: (optional) Maximum number of records to return (default: 20) + :param str cursor: (optional) Cursor returned by a previous page to fetch the next page + :return: A page of indexed records, :class:`RecordPage ` object + :rtype: :class:`videodb.index.RecordPage` + """ + params = {"limit": limit, "collection_id": self.collection_id} + if cursor is not None: + params["cursor"] = cursor + records_data = self._connection.get( + path=f"{ApiPath.video}/{self.video_id}/{ApiPath.indexes}/{self.index_id}/{ApiPath.records}", + params={key: value for key, value in params.items() if value is not None}, + ) + if not records_data: + return RecordPage() + records = [ + IndexRecord( + video_id=record.get("video_id"), + understanding_id=record.get("understanding_id"), + segment_id=record.get("segment_id"), + start_sec=record.get("start_sec"), + end_sec=record.get("end_sec"), + data=record.get("data"), + ) + for record in records_data.get("records", []) + ] + return RecordPage(records=records, next_cursor=records_data.get("next_cursor")) + + def delete(self) -> None: + """Delete the index. + + Removes the index's retrieval structures. It does not delete the original + video or stored understanding artifacts. + + :raises InvalidRequestError: If the delete fails + :return: None if the delete is successful + :rtype: None + """ + self._connection.delete( + path=f"{ApiPath.video}/{self.video_id}/{ApiPath.indexes}/{self.index_id}", + params={"collection_id": self.collection_id} + if self.collection_id + else None, + ) diff --git a/videodb/video.py b/videodb/video.py index 37d0526..8db9477 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -12,6 +12,7 @@ Workflows, ) from videodb.image import Image, Frame +from videodb.index import Index from videodb.scene import Scene, SceneCollection from videodb.search import SearchFactory, SearchResult from videodb.shot import Shot @@ -733,6 +734,174 @@ def delete_scene_index(self, scene_index_id: str) -> None: path=f"{ApiPath.video}/{self.id}/{ApiPath.index}/{ApiPath.scene}/{scene_index_id}" ) + @staticmethod + def _format_index_source(source: Union[object, Dict, List]) -> Dict: + """Format an index *source* into the request payload. + + The ``source`` identifies where the index data comes from. A single + artifact reference is ambiguous on its own, since the same ``extract_type`` + (e.g. ``"scene"``) can be produced by many understanding runs, so an + understanding source must carry the ``understanding_id`` of the run that + produced it. + + Accepts: + - an understanding artifact object exposing ``to_index_source()`` + (forward-compatible with the future ``understand()`` outputs) + - a dict that already carries a ``type`` (``"understanding"``, ``"s3"``, + or ``"inline"``), passed through as-is after light normalization + - a dict with ``understanding_id`` (normalized to an understanding source) + - a list of user-provided temporal records (wrapped as an inline source) + + :param source: The understanding artifact, source dict, or temporal records + :raises ValueError: If the source type is unsupported or empty + :return: The serialized ``source`` payload + :rtype: dict + """ + if source is None: + raise ValueError("source is required") + + # Understanding artifact object (forward-compatible duck typing). + if hasattr(source, "to_index_source"): + return source.to_index_source() + + # User-provided temporal records -> inline source. + if isinstance(source, list): + return {"type": "inline", "data": source} + + if isinstance(source, dict): + # Already a fully-formed source dict (e.g. understanding/s3/inline). + if source.get("type"): + return source + # Understanding artifact reference. + if source.get("understanding_id"): + return { + "type": "understanding", + "understanding_id": source["understanding_id"], + "extract_type": source.get("extract_type"), + } + # Otherwise treat the dict as a single inline temporal record. + return {"type": "inline", "data": [source]} + + raise ValueError( + "source must be an understanding artifact, a source dict, or a list of records" + ) + + def _format_index(self, index_data: dict) -> Index: + return Index( + self._connection, + video_id=self.id, + collection_id=self.collection_id, + **index_data, + ) + + def index( + self, + source: Union[object, Dict, List], + name: Optional[str] = None, + use_for: Optional[List[str]] = None, + fields: Optional[Dict[str, List[str]]] = None, + callback_url: Optional[str] = None, + ) -> Optional[Index]: + """Create a retrieval-ready index from an understanding artifact. + + Turns an understanding artifact (or user-provided temporal records) into an + index that declares retrieval capabilities (``use_for``) and field-level + indexing configuration (``fields``). + + :param source: The understanding artifact, an artifact-reference dict, or a + list of user-provided temporal record dicts to index + :param str name: (optional) User-facing index name. Defaults to the + artifact/source name on the server. + :param list use_for: (optional) Retrieval capabilities to enable, any of + :attr:`IndexCapability.semantic `, + :attr:`IndexCapability.query `, + :attr:`IndexCapability.aggregate `. + Defaults to the artifact's defaults on the server. + :param dict fields: (optional) Field-level indexing configuration mapping + field groups (``semantic``, ``text``, ``filter``, ``aggregate``, + ``sort``) to lists of field names + :param str callback_url: (optional) URL called when indexing completes + :raises ValueError: If ``source`` is missing or of an unsupported type + :raises InvalidRequestError: If the index creation fails + :return: The created index, :class:`Index ` object + :rtype: :class:`videodb.index.Index` + """ + index_data = self._connection.post( + path=f"{ApiPath.video}/{self.id}/{ApiPath.indexes}", + data={ + "source": self._format_index_source(source), + "name": name, + "use_for": use_for, + "fields": fields, + "callback_url": callback_url, + }, + ) + if not index_data: + return None + return self._format_index(index_data) + + def get_index( + self, index_id: Optional[str] = None, name: Optional[str] = None + ) -> Optional[Index]: + """Get an index manifest by its ID or name. + + :param str index_id: (optional) The id of the index + :param str name: (optional) The name of the index + :raises ValueError: If neither ``index_id`` nor ``name`` is provided + :return: The index, :class:`Index ` object + :rtype: :class:`videodb.index.Index` + """ + if not index_id and not name: + raise ValueError("Either index_id or name is required") + params = {"collection_id": self.collection_id} + if index_id: + path = f"{ApiPath.video}/{self.id}/{ApiPath.indexes}/{index_id}" + else: + path = f"{ApiPath.video}/{self.id}/{ApiPath.indexes}" + params["name"] = name + index_data = self._connection.get(path=path, params=params) + if not index_data: + return None + return self._format_index(index_data) + + def list_indexes(self, use_for: Optional[str] = None) -> List[Index]: + """List all the indexes of the video. + + :param str use_for: (optional) Filter by retrieval capability, any of + :attr:`IndexCapability.semantic `, + :attr:`IndexCapability.query `, + :attr:`IndexCapability.aggregate ` + :return: List of :class:`Index ` objects + :rtype: list[:class:`videodb.index.Index`] + """ + params = {"collection_id": self.collection_id} + if use_for is not None: + params["use_for"] = use_for + index_data = self._connection.get( + path=f"{ApiPath.video}/{self.id}/{ApiPath.indexes}", + params=params, + ) + return [self._format_index(index) for index in index_data.get("indexes", [])] + + def delete_index(self, index_id: str) -> None: + """Delete an index. + + Removes the index's retrieval structures. It does not delete the original + video or stored understanding artifacts. + + :param str index_id: The id of the index to be deleted + :raises ValueError: If ``index_id`` is not provided + :raises InvalidRequestError: If the delete fails + :return: None if the delete is successful + :rtype: None + """ + if not index_id: + raise ValueError("index_id is required") + self._connection.delete( + path=f"{ApiPath.video}/{self.id}/{ApiPath.indexes}/{index_id}", + params={"collection_id": self.collection_id}, + ) + def add_subtitle(self, style: SubtitleStyle = SubtitleStyle()) -> str: """Add subtitles to the video. From d32073d8c60f6c60117d153490f0e451f348fd89 Mon Sep 17 00:00:00 2001 From: Ankit raj <113342181+ankit-v2-3@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:57:37 +0530 Subject: [PATCH 02/11] fix: formatting --- videodb/video.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/videodb/video.py b/videodb/video.py index 8db9477..1e71dfa 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -787,10 +787,13 @@ def _format_index_source(source: Union[object, Dict, List]) -> Dict: ) def _format_index(self, index_data: dict) -> Index: + index_data = dict(index_data) + video_id = index_data.pop("video_id", None) or self.id + collection_id = index_data.pop("collection_id", None) or self.collection_id return Index( self._connection, - video_id=self.id, - collection_id=self.collection_id, + video_id=video_id, + collection_id=collection_id, **index_data, ) From c902768ca8196779d8547915ca303fb19c4926e2 Mon Sep 17 00:00:00 2001 From: sankalp nagaonkar Date: Tue, 23 Jun 2026 18:15:04 +0530 Subject: [PATCH 03/11] feat: add search v2 SDK interface --- videodb/__init__.py | 3 + videodb/_constants.py | 3 + videodb/collection.py | 139 +++++++++++++++++++++++++++++++++++++++++- videodb/search.py | 74 +++++++++++++++++++++- videodb/video.py | 132 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 344 insertions(+), 7 deletions(-) diff --git a/videodb/__init__.py b/videodb/__init__.py index 924c179..489e8e8 100644 --- a/videodb/__init__.py +++ b/videodb/__init__.py @@ -27,6 +27,7 @@ RTStreamChannelType, ) from videodb.client import Connection +from videodb.search import SearchResponse, SearchResult from videodb.capture_session import CaptureSession from videodb.websocket_client import WebSocketConnection from videodb.capture import CaptureClient, Channel, AudioChannel, VideoChannel, Channels, ChannelList @@ -58,6 +59,8 @@ "IndexCapability", "FieldGroup", "SearchError", + "SearchResult", + "SearchResponse", "play_stream", "build_iframe_embed_code", "MediaType", diff --git a/videodb/_constants.py b/videodb/_constants.py index 69abefd..d47a22a 100644 --- a/videodb/_constants.py +++ b/videodb/_constants.py @@ -101,6 +101,9 @@ class ApiPath: indexes = "indexes" records = "records" search = "search" + semantic_search = "semantic-search" + query = "query" + aggregate = "aggregate" compile = "compile" workflow = "workflow" timeline = "timeline" diff --git a/videodb/collection.py b/videodb/collection.py index 7f57fc3..17e36c1 100644 --- a/videodb/collection.py +++ b/videodb/collection.py @@ -1,6 +1,6 @@ import logging -from typing import Optional, Union, List, Dict, Any, Literal +from typing import Optional, Union, List, Dict, Any, Literal, Tuple from videodb._upload import ( upload, ) @@ -17,7 +17,7 @@ from videodb.meeting import Meeting from videodb.capture_session import CaptureSession from videodb.rtstream import RTStream, RTStreamSearchResult, RTStreamShot -from videodb.search import SearchFactory, SearchResult +from videodb.search import SearchFactory, SearchResponse, SearchResult, warn_legacy_search_once logger = logging.getLogger(__name__) @@ -462,7 +462,135 @@ def dub_video( if dub_data: return Video(self._connection, **dub_data) - def search( + def search(self, query: str, *args, **kwargs) -> Union[SearchResponse, SearchResult, RTStreamSearchResult]: + """Search the collection. + + New search is used by default. Calls that use legacy-shaped parameters are + routed to :meth:`legacy_search` with a warning. + """ + old_params = { + "search_type", + "index_type", + "result_threshold", + "dynamic_score_percentage", + "scene_index_id", + "index_id", + "algorithm", + "sort_docs_on", + "namespace", + } + new_params = { + "top_k", + "mode", + "index_names", + "index_name", + "return_fields", + "include_clip", + } + + if args: + legacy_arg_names = [ + "search_type", + "index_type", + "result_threshold", + "score_threshold", + "dynamic_score_percentage", + "filter", + "sort_docs_on", + "namespace", + "scene_index_id", + ] + for name, value in zip(legacy_arg_names, args): + kwargs.setdefault(name, value) + + has_old = bool(args) or any(k in kwargs and kwargs[k] is not None for k in old_params) + has_new = any(k in kwargs and kwargs[k] is not None for k in new_params) + + if has_old and has_new: + raise ValueError( + "Cannot mix legacy search params with new search params. " + "Use search(...) for new search or legacy_search(...) for legacy search." + ) + + if has_old: + warn_legacy_search_once() + return self.legacy_search(query=query, **kwargs) + + return self._new_search(query=query, **kwargs) + + def _new_search(self, query: str, **kwargs) -> SearchResponse: + payload = {"query": query, **{k: v for k, v in kwargs.items() if v is not None}} + search_data = self._connection.post( + path=f"{ApiPath.collection}/{self.id}/{ApiPath.search}/v2", + data=payload, + show_progress=True, + ) + return SearchResponse(self._connection, **search_data) + + def semantic_search( + self, + query: str, + index_names: List[str], + top_k: int = 10, + score_threshold: Optional[float] = None, + filter: Optional[Union[List, Dict]] = None, + return_fields: Optional[Union[List, Dict, str]] = None, + ) -> SearchResult: + search_data = self._connection.post( + path=f"{ApiPath.collection}/{self.id}/{ApiPath.semantic_search}", + data={ + "query": query, + "index_names": index_names, + "top_k": top_k, + "score_threshold": score_threshold, + "filter": filter, + "return_fields": return_fields, + }, + ) + return SearchResult(self._connection, **search_data) + + def query( + self, + index_name: str, + filter: Optional[Union[List, Dict]] = None, + limit: int = 100, + return_fields: Optional[Union[List, Dict, str]] = None, + sort: Optional[Union[str, List[Tuple[str, str]]]] = None, + ) -> SearchResult: + query_data = self._connection.post( + path=f"{ApiPath.collection}/{self.id}/{ApiPath.query}", + data={ + "index_name": index_name, + "filter": filter, + "limit": limit, + "return_fields": return_fields, + "sort": sort, + }, + ) + return SearchResult(self._connection, **query_data) + + def aggregate( + self, + index_name: str, + filter: Optional[Union[List, Dict]] = None, + group_by: Optional[str] = None, + metric: str = "count", + limit: int = 100, + sort: Optional[Union[str, List[Tuple[str, str]]]] = None, + ) -> Union[Dict, List[Dict]]: + return self._connection.post( + path=f"{ApiPath.collection}/{self.id}/{ApiPath.aggregate}", + data={ + "index_name": index_name, + "filter": filter, + "group_by": group_by, + "metric": metric, + "limit": limit, + "sort": sort, + }, + ) + + def legacy_search( self, query: str, search_type: Optional[str] = SearchType.semantic, @@ -474,6 +602,8 @@ def search( sort_docs_on: Optional[str] = None, namespace: Optional[str] = None, scene_index_id: Optional[str] = None, + index_id: Optional[str] = None, + algorithm: Optional[str] = None, ) -> Union[SearchResult, RTStreamSearchResult]: """Search for a query in the collection. @@ -493,6 +623,9 @@ def search( :rtype: Union[:class:`videodb.search.SearchResult`, :class:`videodb.rtstream.RTStreamSearchResult`] """ + if scene_index_id is None and index_id is not None: + scene_index_id = index_id + if namespace == "rtstream": data = {"query": query} if scene_index_id is not None: diff --git a/videodb/search.py b/videodb/search.py index 7cb87f9..bf5ed5e 100644 --- a/videodb/search.py +++ b/videodb/search.py @@ -1,4 +1,6 @@ from abc import ABC, abstractmethod +import warnings + from videodb._utils._video import play_stream, build_iframe_embed_code from videodb._constants import ( IndexType, @@ -14,6 +16,21 @@ from videodb.shot import Shot +_LEGACY_SEARCH_WARNING = ( + "Legacy search parameters detected. This call is routed to legacy search. " + "Use legacy_search(...) to keep legacy behavior, or update to the new search interface." +) +_LEGACY_SEARCH_WARNING_EMITTED = False + + +def warn_legacy_search_once(): + global _LEGACY_SEARCH_WARNING_EMITTED + if _LEGACY_SEARCH_WARNING_EMITTED: + return + _LEGACY_SEARCH_WARNING_EMITTED = True + warnings.warn(_LEGACY_SEARCH_WARNING, UserWarning, stacklevel=3) + + class SearchResult: """SearchResult class to interact with search results @@ -35,7 +52,7 @@ def __init__(self, _connection, **kwargs): def _format_results(self): for result in self._results: self.collection_id = result.get("collection_id") - for doc in result.get("docs"): + for doc in result.get("docs") or []: self.shots.append( Shot( self._connection, @@ -49,7 +66,7 @@ def _format_results(self): scene_index_id=doc.get("scene_index_id"), scene_index_name=doc.get("scene_index_name"), metadata=doc.get("metadata"), - stream_url=doc.get("stream_link"), + stream_url=doc.get("stream_link") or doc.get("stream_url"), player_url=doc.get("player_url"), ) ) @@ -63,6 +80,15 @@ def __repr__(self) -> str: f"shots={self.shots})" ) + def __iter__(self): + return iter(self.shots) + + def __len__(self): + return len(self.shots) + + def __getitem__(self, index): + return self.shots[index] + def get_shots(self) -> List[Shot]: return self.shots @@ -139,6 +165,50 @@ def get_embed_code( ) +class SearchResponse: + """Envelope returned by high-level Search v2. + + For ``response_type='shots'``, ``results`` is a :class:`SearchResult`. + For ``response_type='aggregate'``, ``results`` is the aggregate dict/list returned by the server. + """ + + def __init__(self, _connection, **kwargs): + self._connection = _connection + self.response_type = kwargs.get("response_type") + raw_results = kwargs.get("results", []) + if self.response_type == "shots": + self.results = SearchResult(_connection, results=raw_results) + self.shots = self.results.shots + else: + self.results = raw_results + self.shots = [] + + def __repr__(self) -> str: + return f"SearchResponse(response_type={self.response_type}, results={self.results})" + + def __iter__(self): + if self.response_type == "shots": + return iter(self.results) + if isinstance(self.results, list): + return iter(self.results) + return iter([self.results]) + + def __len__(self): + if self.response_type == "shots": + return len(self.results) + if isinstance(self.results, list): + return len(self.results) + return 1 if self.results is not None else 0 + + def __getitem__(self, index): + if self.response_type == "shots": + return self.results[index] + return self.results[index] + + def get_shots(self) -> List[Shot]: + return self.shots + + class Search(ABC): """Search interface inside video or collection""" diff --git a/videodb/video.py b/videodb/video.py index 1e71dfa..893e481 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -14,7 +14,7 @@ from videodb.image import Image, Frame from videodb.index import Index from videodb.scene import Scene, SceneCollection -from videodb.search import SearchFactory, SearchResult +from videodb.search import SearchFactory, SearchResponse, SearchResult, warn_legacy_search_once from videodb.shot import Shot _VALID_SEGMENTERS = {Segmenter.word, Segmenter.sentence, Segmenter.time} @@ -83,7 +83,132 @@ def update(self, name: Optional[str] = None) -> None: if name is not None: self.name = response_data.get("name", name) - def search( + def search(self, query: str, *args, **kwargs) -> Union[SearchResponse, SearchResult]: + """Search this video. + + New search is used by default. Calls that use legacy-shaped parameters are + routed to :meth:`legacy_search` with a warning. + """ + old_params = { + "search_type", + "index_type", + "result_threshold", + "dynamic_score_percentage", + "scene_index_id", + "index_id", + "algorithm", + "sort_docs_on", + "namespace", + } + new_params = { + "top_k", + "mode", + "index_names", + "index_name", + "return_fields", + "include_clip", + } + + if args: + legacy_arg_names = [ + "search_type", + "index_type", + "result_threshold", + "score_threshold", + "dynamic_score_percentage", + "filter", + ] + for name, value in zip(legacy_arg_names, args): + kwargs.setdefault(name, value) + + has_old = bool(args) or any(k in kwargs and kwargs[k] is not None for k in old_params) + has_new = any(k in kwargs and kwargs[k] is not None for k in new_params) + + if has_old and has_new: + raise ValueError( + "Cannot mix legacy search params with new search params. " + "Use search(...) for new search or legacy_search(...) for legacy search." + ) + + if has_old: + warn_legacy_search_once() + return self.legacy_search(query=query, **kwargs) + + return self._new_search(query=query, **kwargs) + + def _new_search(self, query: str, **kwargs) -> SearchResponse: + payload = {"query": query, **{k: v for k, v in kwargs.items() if v is not None}} + search_data = self._connection.post( + path=f"{ApiPath.video}/{self.id}/{ApiPath.search}/v2", + data=payload, + show_progress=True, + ) + return SearchResponse(self._connection, **search_data) + + def semantic_search( + self, + query: str, + index_names: List[str], + top_k: int = 10, + score_threshold: Optional[float] = None, + filter: Optional[Union[List, Dict]] = None, + return_fields: Optional[Union[List, Dict, str]] = None, + ) -> SearchResult: + search_data = self._connection.post( + path=f"{ApiPath.video}/{self.id}/{ApiPath.semantic_search}", + data={ + "query": query, + "index_names": index_names, + "top_k": top_k, + "score_threshold": score_threshold, + "filter": filter, + "return_fields": return_fields, + }, + ) + return SearchResult(self._connection, **search_data) + + def query( + self, + index_name: str, + filter: Optional[Union[List, Dict]] = None, + limit: int = 100, + return_fields: Optional[Union[List, Dict, str]] = None, + sort: Optional[Union[str, List[Tuple[str, str]]]] = None, + ) -> SearchResult: + query_data = self._connection.post( + path=f"{ApiPath.video}/{self.id}/{ApiPath.query}", + data={ + "index_name": index_name, + "filter": filter, + "limit": limit, + "return_fields": return_fields, + "sort": sort, + }, + ) + return SearchResult(self._connection, **query_data) + + def aggregate( + self, + index_name: str, + filter: Optional[Union[List, Dict]] = None, + group_by: Optional[str] = None, + metric: str = "count", + limit: int = 100, + sort: Optional[Union[str, List[Tuple[str, str]]]] = None, + ) -> Union[Dict, List[Dict]]: + return self._connection.post( + path=f"{ApiPath.video}/{self.id}/{ApiPath.aggregate}", + data={ + "index_name": index_name, + "filter": filter, + "group_by": group_by, + "metric": metric, + "limit": limit, + "sort": sort, + }, + ) + + def legacy_search( self, query: str, search_type: Optional[str] = SearchType.semantic, @@ -106,6 +231,9 @@ def search( :return: :class:`SearchResult ` object :rtype: :class:`videodb.search.SearchResult` """ + if kwargs.get("scene_index_id") is None and kwargs.get("index_id") is not None: + kwargs["scene_index_id"] = kwargs.get("index_id") + kwargs.pop("index_id", None) search = SearchFactory(self._connection).get_search(search_type) return search.search_inside_video( video_id=self.id, From b5053e43a21b16ee53c7ebaace9481a9d12bef6f Mon Sep 17 00:00:00 2001 From: sankalp nagaonkar Date: Tue, 23 Jun 2026 19:39:58 +0530 Subject: [PATCH 04/11] chore: improve legacy search warning --- videodb/search.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/videodb/search.py b/videodb/search.py index bf5ed5e..dde612d 100644 --- a/videodb/search.py +++ b/videodb/search.py @@ -17,8 +17,10 @@ _LEGACY_SEARCH_WARNING = ( - "Legacy search parameters detected. This call is routed to legacy search. " - "Use legacy_search(...) to keep legacy behavior, or update to the new search interface." + "This search() call uses parameters from the previous search API, so VideoDB " + "is using legacy_search() for compatibility. To keep this behavior and avoid " + "this warning, call legacy_search(...). To use Search v2, call search(...) " + "with the new parameters, or use query() / semantic_search()." ) _LEGACY_SEARCH_WARNING_EMITTED = False From c0acb08ac8c1d9ca6e673eb37c4c97bdec931f65 Mon Sep 17 00:00:00 2001 From: sankalp nagaonkar Date: Tue, 23 Jun 2026 19:40:40 +0530 Subject: [PATCH 05/11] Revert "chore: improve legacy search warning" This reverts commit b5053e43a21b16ee53c7ebaace9481a9d12bef6f. --- videodb/search.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/videodb/search.py b/videodb/search.py index dde612d..bf5ed5e 100644 --- a/videodb/search.py +++ b/videodb/search.py @@ -17,10 +17,8 @@ _LEGACY_SEARCH_WARNING = ( - "This search() call uses parameters from the previous search API, so VideoDB " - "is using legacy_search() for compatibility. To keep this behavior and avoid " - "this warning, call legacy_search(...). To use Search v2, call search(...) " - "with the new parameters, or use query() / semantic_search()." + "Legacy search parameters detected. This call is routed to legacy search. " + "Use legacy_search(...) to keep legacy behavior, or update to the new search interface." ) _LEGACY_SEARCH_WARNING_EMITTED = False From e64f66771a31bf3dca92a5b882598f02f4e168e8 Mon Sep 17 00:00:00 2001 From: Rohit Garg Date: Wed, 24 Jun 2026 16:23:43 +0530 Subject: [PATCH 06/11] feat: understanding module --- videodb/__init__.py | 3 + videodb/_constants.py | 1 + videodb/understanding.py | 302 +++++++++++++++++++++++++++++++++++++++ videodb/video.py | 89 ++++++++++++ 4 files changed, 395 insertions(+) create mode 100644 videodb/understanding.py diff --git a/videodb/__init__.py b/videodb/__init__.py index 489e8e8..93c45dc 100644 --- a/videodb/__init__.py +++ b/videodb/__init__.py @@ -28,6 +28,7 @@ ) from videodb.client import Connection from videodb.search import SearchResponse, SearchResult +from videodb.understanding import Understanding, UnderstandingAnalyzer from videodb.capture_session import CaptureSession from videodb.websocket_client import WebSocketConnection from videodb.capture import CaptureClient, Channel, AudioChannel, VideoChannel, Channels, ChannelList @@ -61,6 +62,8 @@ "SearchError", "SearchResult", "SearchResponse", + "Understanding", + "UnderstandingAnalyzer", "play_stream", "build_iframe_embed_code", "MediaType", diff --git a/videodb/_constants.py b/videodb/_constants.py index d47a22a..df089d0 100644 --- a/videodb/_constants.py +++ b/videodb/_constants.py @@ -100,6 +100,7 @@ class ApiPath: index = "index" indexes = "indexes" records = "records" + understand = "understand" search = "search" semantic_search = "semantic-search" query = "query" diff --git a/videodb/understanding.py b/videodb/understanding.py new file mode 100644 index 0000000..7a28b80 --- /dev/null +++ b/videodb/understanding.py @@ -0,0 +1,302 @@ +import time +from typing import Any, Dict, List, Optional + +from videodb._constants import ApiPath + + +UNDERSTANDING_TERMINAL_STATUSES = {"done", "failed", "partial"} +ANALYZER_TERMINAL_STATUSES = {"done", "failed", "skipped", "cancelled"} + +ANALYZER_TYPE_ALIASES = { + "spoken_words": "speech_transcription", +} + +DEFAULT_ANALYZER_NAMES = { + "speech_transcription": "transcript", + "object_detection": "objects", + "vlm": "scene", +} + + +class UnderstandingAnalyzer: + """Analyzer status and output handle for one analyzer in an understanding run.""" + + def __init__( + self, + understanding: "Understanding", + id: Optional[str] = None, + name: Optional[str] = None, + type: Optional[str] = None, + status: Optional[str] = None, + **kwargs, + ) -> None: + self.understanding = understanding + self.id = id + self.name = name + self.type = type + self.status = status + self.extra = kwargs + + def __repr__(self) -> str: + return ( + f"UnderstandingAnalyzer(" + f"id={self.id}, " + f"name={self.name}, " + f"type={self.type}, " + f"status={self.status})" + ) + + def __getitem__(self, key): + return self.__dict__[key] + + @property + def is_complete(self) -> bool: + """Return True when the analyzer is in a terminal status.""" + return self.status in ANALYZER_TERMINAL_STATUSES + + @property + def is_successful(self) -> bool: + """Return True when the analyzer completed successfully.""" + return self.status == "done" + + def refresh(self) -> "UnderstandingAnalyzer": + """Refresh this analyzer's status from the API.""" + analyzer = self.understanding.get_analyzer(self.name or self.id, refresh=True) + self.id = analyzer.id + self.name = analyzer.name + self.type = analyzer.type + self.status = analyzer.status + self.extra = analyzer.extra + return self + + def wait_until_complete( + self, + timeout: int = 1800, + poll_interval: int = 10, + ) -> "UnderstandingAnalyzer": + """Poll this analyzer until it reaches a terminal status. + + :param int timeout: Maximum time to wait, in seconds + :param int poll_interval: Seconds between status checks + :raises TimeoutError: If the analyzer does not complete before timeout + :return: This analyzer with refreshed status + """ + deadline = time.time() + timeout + while True: + self.refresh() + if self.is_complete: + return self + if time.time() >= deadline: + raise TimeoutError( + f"Analyzer {self.name or self.id} did not complete within {timeout}s" + ) + time.sleep(poll_interval) + + def get_output(self) -> Any: + """Return this analyzer's output. + + The current API returns the analyzer's segments output. + """ + identifier = self.name or self.id + if not identifier: + raise ValueError("Analyzer id or name is required") + return self.understanding.get_analyzer_output(identifier) + + +class Understanding: + """A video understanding run. + + Use :meth:`list_analyzers` or :meth:`get_analyzer` to inspect analyzers and + fetch analyzer outputs. + """ + + def __init__( + self, + _connection, + video_id: str, + collection_id: Optional[str] = None, + understanding_id: Optional[str] = None, + id: Optional[str] = None, + status: Optional[str] = None, + analyzers: Optional[List[Dict[str, Any]]] = None, + output_url: Optional[str] = None, + **kwargs, + ) -> None: + self._connection = _connection + self.video_id = video_id + self.collection_id = collection_id + self.id = understanding_id or id + self.status = status + self.output_url = output_url + self.extra = kwargs + self.analyzers = [self.create_analyzer(item) for item in (analyzers or [])] + + def __repr__(self) -> str: + return ( + f"Understanding(" + f"id={self.id}, " + f"video_id={self.video_id}, " + f"status={self.status}, " + f"analyzers={len(self.analyzers)})" + ) + + def __getitem__(self, key): + return self.__dict__[key] + + @property + def is_complete(self) -> bool: + """Return True when the understanding run is in a terminal status.""" + return self.status in UNDERSTANDING_TERMINAL_STATUSES + + @property + def is_successful(self) -> bool: + """Return True when the understanding run completed successfully.""" + return self.status == "done" + + def create_analyzer(self, data: Dict[str, Any]) -> UnderstandingAnalyzer: + return UnderstandingAnalyzer(self, **(data or {})) + + def update_from_response(self, data: Dict[str, Any]) -> "Understanding": + data = data or {} + self.status = data.get("status", self.status) + if data.get("understanding_id") or data.get("id"): + self.id = data.get("understanding_id") or data.get("id") + if data.get("video_id"): + self.video_id = data.get("video_id") + if data.get("collection_id"): + self.collection_id = data.get("collection_id") + if data.get("output_url"): + self.output_url = data.get("output_url") + if "analyzers" in data: + self.analyzers = [self.create_analyzer(item) for item in data.get("analyzers") or []] + return self + + def refresh(self) -> "Understanding": + """Refresh understanding and analyzer statuses from the API.""" + data = self._connection.get( + path=f"{ApiPath.video}/{self.video_id}/{ApiPath.understand}/{self.id}" + ) + return self.update_from_response(data) + + def wait_until_complete( + self, + timeout: int = 1800, + poll_interval: int = 10, + ) -> "Understanding": + """Poll this understanding until it reaches a terminal status. + + Terminal statuses are ``done``, ``failed``, and ``partial``. + + :param int timeout: Maximum time to wait, in seconds + :param int poll_interval: Seconds between status checks + :raises TimeoutError: If the run does not complete before timeout + :return: This understanding with refreshed status + """ + deadline = time.time() + timeout + while True: + self.refresh() + if self.is_complete: + return self + if time.time() >= deadline: + raise TimeoutError(f"Understanding {self.id} did not complete within {timeout}s") + time.sleep(poll_interval) + + def list_analyzers(self) -> List[UnderstandingAnalyzer]: + """Return analyzers in this understanding run.""" + return list(self.analyzers) + + def get_analyzer(self, name_or_id: str, refresh: bool = False) -> UnderstandingAnalyzer: + """Return an analyzer by user-facing name or internal analyzer id. + + :param str name_or_id: Analyzer name, e.g. ``"transcript"``, or id, e.g. ``"an_..."`` + :param bool refresh: When True, fetch the latest analyzer status first + :raises ValueError: If no analyzer matches + :return: :class:`UnderstandingAnalyzer ` object + """ + if refresh: + data = self._connection.get( + path=f"{ApiPath.video}/{self.video_id}/{ApiPath.understand}/{self.id}", + params={"analyzer": name_or_id}, + ) or {} + self.status = data.get("status", self.status) + analyzer_data = (data.get("analyzers") or [None])[0] + if analyzer_data: + refreshed = self.create_analyzer(analyzer_data) + for index, analyzer in enumerate(self.analyzers): + if analyzer.name == refreshed.name or analyzer.id == refreshed.id: + self.analyzers[index] = refreshed + return refreshed + self.analyzers.append(refreshed) + return refreshed + + for analyzer in self.analyzers: + if analyzer.name == name_or_id or analyzer.id == name_or_id: + return analyzer + raise ValueError(f"Analyzer not found: {name_or_id}") + + def get_analyzer_output(self, name_or_id: str) -> Any: + """Return output for an analyzer by name or id.""" + return self._connection.get( + path=( + f"{ApiPath.video}/{self.video_id}/{ApiPath.understand}/{self.id}" + f"/analyzers/{name_or_id}/output" + ) + ) + + def delete(self) -> None: + """Delete this understanding run.""" + self._connection.delete( + path=f"{ApiPath.video}/{self.video_id}/{ApiPath.understand}/{self.id}" + ) + + +def normalize_analyzer_type(analyzer_type: str) -> str: + return ANALYZER_TYPE_ALIASES.get(analyzer_type, analyzer_type) + + +def default_analyzer_name(analyzer_type: str) -> Optional[str]: + return DEFAULT_ANALYZER_NAMES.get(normalize_analyzer_type(analyzer_type)) + + +def normalize_understanding_analyzers(analyzers: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Normalize analyzer payloads to the server contract. + + The public SDK accepts friendly analyzer names like ``spoken_words`` and + automatically fills stable output names for built-ins when omitted. + """ + if not isinstance(analyzers, list) or not analyzers: + raise ValueError("analyzers must be a non-empty list") + + normalized = [] + names = set() + generated_names = set() + + for index, analyzer in enumerate(analyzers): + if not isinstance(analyzer, dict): + raise ValueError(f"analyzers[{index}] must be a dict") + if not analyzer.get("type"): + raise ValueError(f"analyzers[{index}].type is required") + + item = dict(analyzer) + original_type = item["type"] + item["type"] = normalize_analyzer_type(original_type) + + if not item.get("name"): + generated_name = default_analyzer_name(original_type) + if generated_name: + if generated_name in generated_names: + raise ValueError( + f"Multiple analyzers would use default name {generated_name!r}. " + "Provide explicit analyzer names." + ) + item["name"] = generated_name + generated_names.add(generated_name) + + if item.get("name"): + if item["name"] in names: + raise ValueError(f"Duplicate analyzer name: {item['name']}") + names.add(item["name"]) + + normalized.append(item) + + return normalized diff --git a/videodb/video.py b/videodb/video.py index 893e481..1a20500 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -13,6 +13,7 @@ ) from videodb.image import Image, Frame from videodb.index import Index +from videodb.understanding import Understanding, normalize_understanding_analyzers from videodb.scene import Scene, SceneCollection from videodb.search import SearchFactory, SearchResponse, SearchResult, warn_legacy_search_once from videodb.shot import Shot @@ -862,6 +863,94 @@ def delete_scene_index(self, scene_index_id: str) -> None: path=f"{ApiPath.video}/{self.id}/{ApiPath.index}/{ApiPath.scene}/{scene_index_id}" ) + def understand( + self, + analyzers: List[Dict[str, Any]], + segmentation: Optional[Dict[str, Any]] = None, + sampling: Optional[Dict[str, Any]] = None, + transform: Optional[Dict[str, Any]] = None, + audio_chunking: Optional[Dict[str, Any]] = None, + callback_url: Optional[str] = None, + **kwargs, + ) -> Understanding: + """Create an understanding run for this video. + + :param list analyzers: Analyzer definitions. The SDK accepts friendly + analyzer type ``spoken_words`` and maps it to the server analyzer. + :param dict segmentation: Optional run-level segmentation config + :param dict sampling: Optional run-level sampling config + :param dict transform: Optional run-level transform config + :param dict audio_chunking: Optional run-level audio chunking config + :param str callback_url: Optional URL called when the run completes + :return: :class:`Understanding ` object + """ + normalized_analyzers = normalize_understanding_analyzers(analyzers) + payload = {"analyzers": normalized_analyzers} + optional_fields = { + "segmentation": segmentation, + "sampling": sampling, + "transform": transform, + "audio_chunking": audio_chunking, + "callback_url": callback_url, + **kwargs, + } + payload.update({key: value for key, value in optional_fields.items() if value is not None}) + + data = self._connection.post( + path=f"{ApiPath.video}/{self.id}/{ApiPath.understand}", + data=payload, + ) or {} + data.setdefault( + "analyzers", + [ + { + "name": analyzer.get("name"), + "type": analyzer.get("type"), + "status": "pending", + } + for analyzer in normalized_analyzers + ], + ) + data.setdefault("video_id", self.id) + data.setdefault("collection_id", self.collection_id) + return Understanding(self._connection, **data) + + def get_understanding(self, understanding_id: str) -> Understanding: + """Get an understanding run by id. + + :param str understanding_id: Understanding run id + :return: :class:`Understanding ` object + """ + if not understanding_id: + raise ValueError("understanding_id is required") + data = self._connection.get( + path=f"{ApiPath.video}/{self.id}/{ApiPath.understand}/{understanding_id}" + ) or {} + data.setdefault("video_id", self.id) + data.setdefault("collection_id", self.collection_id) + data.setdefault("understanding_id", understanding_id) + return Understanding(self._connection, **data) + + def list_understandings(self) -> List[Understanding]: + """List understanding runs for this video.""" + data = self._connection.get(path=f"{ApiPath.video}/{self.id}/{ApiPath.understand}") + results = (data or {}).get("understanding_results") or [] + understandings = [] + for item in results: + data = dict(item) + data.setdefault("video_id", self.id) + data.setdefault("collection_id", self.collection_id) + understandings.append(Understanding(self._connection, **data)) + return understandings + + def delete_understanding(self, understanding_id: str) -> None: + """Delete an understanding run.""" + if not understanding_id: + raise ValueError("understanding_id is required") + self._connection.delete( + path=f"{ApiPath.video}/{self.id}/{ApiPath.understand}/{understanding_id}" + ) + @staticmethod def _format_index_source(source: Union[object, Dict, List]) -> Dict: """Format an index *source* into the request payload. From b9b6f26e080a0a6e7dd61ba423d48999339316d9 Mon Sep 17 00:00:00 2001 From: sankalp nagaonkar Date: Fri, 26 Jun 2026 03:03:24 +0530 Subject: [PATCH 07/11] Allow semantic_search without index_names --- videodb/collection.py | 13 +++++++++---- videodb/video.py | 13 +++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/videodb/collection.py b/videodb/collection.py index 17e36c1..f2e88e0 100644 --- a/videodb/collection.py +++ b/videodb/collection.py @@ -482,11 +482,10 @@ def search(self, query: str, *args, **kwargs) -> Union[SearchResponse, SearchRes new_params = { "top_k", "mode", - "index_names", - "index_name", "return_fields", "include_clip", } + unsupported_params = {"index_name", "index_names"} if args: legacy_arg_names = [ @@ -505,12 +504,18 @@ def search(self, query: str, *args, **kwargs) -> Union[SearchResponse, SearchRes has_old = bool(args) or any(k in kwargs and kwargs[k] is not None for k in old_params) has_new = any(k in kwargs and kwargs[k] is not None for k in new_params) + has_unsupported = any(k in kwargs and kwargs[k] is not None for k in unsupported_params) - if has_old and has_new: + if has_old and (has_new or has_unsupported): raise ValueError( "Cannot mix legacy search params with new search params. " "Use search(...) for new search or legacy_search(...) for legacy search." ) + if has_unsupported: + raise ValueError( + "index_name/index_names are not supported in search(). " + "Use semantic_search(), query(), or aggregate() for index-specific calls." + ) if has_old: warn_legacy_search_once() @@ -530,7 +535,7 @@ def _new_search(self, query: str, **kwargs) -> SearchResponse: def semantic_search( self, query: str, - index_names: List[str], + index_names: Optional[Union[List[str], str]] = None, top_k: int = 10, score_threshold: Optional[float] = None, filter: Optional[Union[List, Dict]] = None, diff --git a/videodb/video.py b/videodb/video.py index 1a20500..1d11bff 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -104,11 +104,10 @@ def search(self, query: str, *args, **kwargs) -> Union[SearchResponse, SearchRes new_params = { "top_k", "mode", - "index_names", - "index_name", "return_fields", "include_clip", } + unsupported_params = {"index_name", "index_names"} if args: legacy_arg_names = [ @@ -124,12 +123,18 @@ def search(self, query: str, *args, **kwargs) -> Union[SearchResponse, SearchRes has_old = bool(args) or any(k in kwargs and kwargs[k] is not None for k in old_params) has_new = any(k in kwargs and kwargs[k] is not None for k in new_params) + has_unsupported = any(k in kwargs and kwargs[k] is not None for k in unsupported_params) - if has_old and has_new: + if has_old and (has_new or has_unsupported): raise ValueError( "Cannot mix legacy search params with new search params. " "Use search(...) for new search or legacy_search(...) for legacy search." ) + if has_unsupported: + raise ValueError( + "index_name/index_names are not supported in search(). " + "Use semantic_search(), query(), or aggregate() for index-specific calls." + ) if has_old: warn_legacy_search_once() @@ -149,7 +154,7 @@ def _new_search(self, query: str, **kwargs) -> SearchResponse: def semantic_search( self, query: str, - index_names: List[str], + index_names: Optional[Union[List[str], str]] = None, top_k: int = 10, score_threshold: Optional[float] = None, filter: Optional[Union[List, Dict]] = None, From 6a813e429bf3e5f54b14923073c4a58ed3b5b3f5 Mon Sep 17 00:00:00 2001 From: Ankit raj <113342181+ankit-v2-3@users.noreply.github.com> Date: Fri, 26 Jun 2026 13:52:49 +0530 Subject: [PATCH 08/11] fix: rename start, end and segment_id --- videodb/index.py | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/videodb/index.py b/videodb/index.py index 6c03d9c..e80f2b5 100644 --- a/videodb/index.py +++ b/videodb/index.py @@ -34,39 +34,50 @@ def __getitem__(self, key): class IndexRecord: - """A single indexed record (one temporal segment of an index). + """A single indexed record (one temporal scene of an index). :ivar str video_id: ID of the video the record belongs to :ivar str understanding_id: ID of the understanding run that produced the record - :ivar str segment_id: ID of the segment within the artifact - :ivar float start_sec: Start time of the segment in seconds - :ivar float end_sec: End time of the segment in seconds - :ivar dict data: Indexed field values for the segment + :ivar str scene_id: ID of the scene within the artifact + :ivar float start: Start time of the scene in seconds + :ivar float end: End time of the scene in seconds + :ivar dict data: Indexed field values for the scene + :ivar str segment_id: Deprecated alias of ``scene_id`` + :ivar float start_sec: Deprecated alias of ``start`` + :ivar float end_sec: Deprecated alias of ``end`` """ def __init__( self, video_id: Optional[str] = None, understanding_id: Optional[str] = None, + scene_id: Optional[str] = None, + start: Optional[float] = None, + end: Optional[float] = None, + data: Optional[dict] = None, segment_id: Optional[str] = None, start_sec: Optional[float] = None, end_sec: Optional[float] = None, - data: Optional[dict] = None, ) -> None: self.video_id = video_id self.understanding_id = understanding_id - self.segment_id = segment_id - self.start_sec = start_sec - self.end_sec = end_sec + # Prefer the V2 contract names; fall back to legacy aliases for older payloads. + self.scene_id = scene_id if scene_id is not None else segment_id + self.start = start if start is not None else start_sec + self.end = end if end is not None else end_sec self.data = data or {} + # Backward-compatible aliases. + self.segment_id = self.scene_id + self.start_sec = self.start + self.end_sec = self.end def __repr__(self) -> str: return ( f"IndexRecord(" f"video_id={self.video_id}, " - f"segment_id={self.segment_id}, " - f"start_sec={self.start_sec}, " - f"end_sec={self.end_sec}, " + f"scene_id={self.scene_id}, " + f"start={self.start}, " + f"end={self.end}, " f"data={self.data})" ) @@ -182,10 +193,13 @@ def records(self, limit: int = 20, cursor: Optional[str] = None) -> RecordPage: IndexRecord( video_id=record.get("video_id"), understanding_id=record.get("understanding_id"), + scene_id=record.get("scene_id"), + start=record.get("start"), + end=record.get("end"), + data=record.get("data"), segment_id=record.get("segment_id"), start_sec=record.get("start_sec"), end_sec=record.get("end_sec"), - data=record.get("data"), ) for record in records_data.get("records", []) ] From 03510cbf0df5d05b6cc1fb9d76f2b7193bd0f525 Mon Sep 17 00:00:00 2001 From: sankalp nagaonkar Date: Fri, 26 Jun 2026 15:47:54 +0530 Subject: [PATCH 09/11] Add ask API --- videodb/__init__.py | 3 ++- videodb/_constants.py | 1 + videodb/collection.py | 21 ++++++++++++++++++++- videodb/search.py | 16 ++++++++++++++++ videodb/video.py | 21 ++++++++++++++++++++- 5 files changed, 59 insertions(+), 3 deletions(-) diff --git a/videodb/__init__.py b/videodb/__init__.py index 93c45dc..b736c88 100644 --- a/videodb/__init__.py +++ b/videodb/__init__.py @@ -27,7 +27,7 @@ RTStreamChannelType, ) from videodb.client import Connection -from videodb.search import SearchResponse, SearchResult +from videodb.search import AskResponse, SearchResponse, SearchResult from videodb.understanding import Understanding, UnderstandingAnalyzer from videodb.capture_session import CaptureSession from videodb.websocket_client import WebSocketConnection @@ -62,6 +62,7 @@ "SearchError", "SearchResult", "SearchResponse", + "AskResponse", "Understanding", "UnderstandingAnalyzer", "play_stream", diff --git a/videodb/_constants.py b/videodb/_constants.py index df089d0..5e2f59b 100644 --- a/videodb/_constants.py +++ b/videodb/_constants.py @@ -102,6 +102,7 @@ class ApiPath: records = "records" understand = "understand" search = "search" + ask = "ask" semantic_search = "semantic-search" query = "query" aggregate = "aggregate" diff --git a/videodb/collection.py b/videodb/collection.py index f2e88e0..67a2fe5 100644 --- a/videodb/collection.py +++ b/videodb/collection.py @@ -17,7 +17,7 @@ from videodb.meeting import Meeting from videodb.capture_session import CaptureSession from videodb.rtstream import RTStream, RTStreamSearchResult, RTStreamShot -from videodb.search import SearchFactory, SearchResponse, SearchResult, warn_legacy_search_once +from videodb.search import AskResponse, SearchFactory, SearchResponse, SearchResult, warn_legacy_search_once logger = logging.getLogger(__name__) @@ -532,6 +532,25 @@ def _new_search(self, query: str, **kwargs) -> SearchResponse: ) return SearchResponse(self._connection, **search_data) + def ask( + self, + question: str, + top_k: int = 15, + mode: str = "default", + include_sources: bool = False, + ) -> AskResponse: + ask_data = self._connection.post( + path=f"{ApiPath.collection}/{self.id}/{ApiPath.ask}", + data={ + "question": question, + "top_k": top_k, + "mode": mode, + "include_sources": include_sources, + }, + show_progress=True, + ) + return AskResponse(self._connection, **ask_data) + def semantic_search( self, query: str, diff --git a/videodb/search.py b/videodb/search.py index bf5ed5e..111f518 100644 --- a/videodb/search.py +++ b/videodb/search.py @@ -165,6 +165,22 @@ def get_embed_code( ) +class AskResponse: + """Response returned by ``ask()``. + + :ivar str answer: Text answer generated from retrieved video context. + :ivar list[Shot] sources: Source shots selected by the LLM when requested. + """ + + def __init__(self, _connection, **kwargs): + self._connection = _connection + self.answer = kwargs.get("answer") or "" + self.sources = SearchResult(_connection, results=kwargs.get("sources") or []).shots + + def __repr__(self) -> str: + return f"AskResponse(answer={self.answer!r}, sources={self.sources})" + + class SearchResponse: """Envelope returned by high-level Search v2. diff --git a/videodb/video.py b/videodb/video.py index 1d11bff..3f0435f 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -15,7 +15,7 @@ from videodb.index import Index from videodb.understanding import Understanding, normalize_understanding_analyzers from videodb.scene import Scene, SceneCollection -from videodb.search import SearchFactory, SearchResponse, SearchResult, warn_legacy_search_once +from videodb.search import AskResponse, SearchFactory, SearchResponse, SearchResult, warn_legacy_search_once from videodb.shot import Shot _VALID_SEGMENTERS = {Segmenter.word, Segmenter.sentence, Segmenter.time} @@ -151,6 +151,25 @@ def _new_search(self, query: str, **kwargs) -> SearchResponse: ) return SearchResponse(self._connection, **search_data) + def ask( + self, + question: str, + top_k: int = 15, + mode: str = "default", + include_sources: bool = False, + ) -> AskResponse: + ask_data = self._connection.post( + path=f"{ApiPath.video}/{self.id}/{ApiPath.ask}", + data={ + "question": question, + "top_k": top_k, + "mode": mode, + "include_sources": include_sources, + }, + show_progress=True, + ) + return AskResponse(self._connection, **ask_data) + def semantic_search( self, query: str, From a9715739de9c1c662d49271d73a83a02d2b2935b Mon Sep 17 00:00:00 2001 From: Ankit raj <113342181+ankit-v2-3@users.noreply.github.com> Date: Fri, 26 Jun 2026 17:43:59 +0530 Subject: [PATCH 10/11] fix: extract_type --- videodb/video.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/videodb/video.py b/videodb/video.py index 1d11bff..685fdad 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -969,8 +969,10 @@ def _format_index_source(source: Union[object, Dict, List]) -> Dict: Accepts: - an understanding artifact object exposing ``to_index_source()`` (forward-compatible with the future ``understand()`` outputs) - - a dict that already carries a ``type`` (``"understanding"``, ``"s3"``, - or ``"inline"``), passed through as-is after light normalization + - an analyzer output envelope (``{name, type, status, scenes}`` from + ``get_analyzer(...).get_output()``) -> understanding or inline source + - a dict that already carries a source ``type`` (``"understanding"`` or + ``"inline"``), passed through as-is - a dict with ``understanding_id`` (normalized to an understanding source) - a list of user-provided temporal records (wrapped as an inline source) @@ -979,6 +981,9 @@ def _format_index_source(source: Union[object, Dict, List]) -> Dict: :return: The serialized ``source`` payload :rtype: dict """ + # Valid source-dispatch types (NOT analyzer/node types like "object_detection"). + _SOURCE_TYPES = {"understanding", "inline"} + if source is None: raise ValueError("source is required") @@ -991,15 +996,28 @@ def _format_index_source(source: Union[object, Dict, List]) -> Dict: return {"type": "inline", "data": source} if isinstance(source, dict): - # Already a fully-formed source dict (e.g. understanding/s3/inline). - if source.get("type"): + # An analyzer output envelope carries `scenes` and a `type` that is an + # analyzer/node type (e.g. "object_detection"), which must NOT be sent as a + # source-dispatch type. Translate it: prefer an understanding reference when + # an understanding_id is known, else send its scenes as inline records. + if "scenes" in source and source.get("type") not in _SOURCE_TYPES: + understanding_id = source.get("understanding_id") or source.get("understanding") + if understanding_id: + return { + "type": "understanding", + "understanding_id": understanding_id, + "extract_type": source.get("name") or source.get("type"), + } + return {"type": "inline", "data": source.get("scenes") or []} + # Already a fully-formed source dict (understanding/inline). + if source.get("type") in _SOURCE_TYPES: return source # Understanding artifact reference. if source.get("understanding_id"): return { "type": "understanding", "understanding_id": source["understanding_id"], - "extract_type": source.get("extract_type"), + "extract_type": source.get("extract_type") or source.get("name"), } # Otherwise treat the dict as a single inline temporal record. return {"type": "inline", "data": [source]} From 8d3857ce92417bef6189d73139edfa0dec8db1d1 Mon Sep 17 00:00:00 2001 From: Ankit raj <113342181+ankit-v2-3@users.noreply.github.com> Date: Fri, 26 Jun 2026 18:42:12 +0530 Subject: [PATCH 11/11] fix: add error state --- videodb/index.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/videodb/index.py b/videodb/index.py index e80f2b5..c0a342b 100644 --- a/videodb/index.py +++ b/videodb/index.py @@ -143,6 +143,7 @@ def __init__( self.index_id = kwargs.get("index_id") self.name = kwargs.get("name") self.status = kwargs.get("status") + self.error = kwargs.get("error") # failure reason when status == "failed" self.use_for = kwargs.get("use_for", []) self.source = kwargs.get("source") self.record_count = kwargs.get("record_count") @@ -163,7 +164,8 @@ def __repr__(self) -> str: f"video_id={self.video_id}, " f"name={self.name}, " f"status={self.status}, " - f"use_for={self.use_for}, " + + (f"error={self.error}, " if self.error else "") + + f"use_for={self.use_for}, " f"record_count={self.record_count})" )