diff --git a/hyperforge/src/hyperforge/broker/redis.py b/hyperforge/src/hyperforge/broker/redis.py index aa0db40..0c74cd2 100644 --- a/hyperforge/src/hyperforge/broker/redis.py +++ b/hyperforge/src/hyperforge/broker/redis.py @@ -56,9 +56,8 @@ async def publish_activation( maxlen=100, ) - async def subscribe_activations( - self, - ) -> AsyncIterator[tuple[StartInteraction, dict[str, str]]]: + async def _ensure_consumer_group(self) -> None: + """Create the stream and consumer group if they don't exist.""" try: await self._client.xgroup_create( name=self._activate_subject, @@ -69,6 +68,11 @@ async def subscribe_activations( if "BUSYGROUP" not in str(e): raise + async def subscribe_activations( + self, + ) -> AsyncIterator[tuple[StartInteraction, dict[str, str]]]: + await self._ensure_consumer_group() + while True: try: response = await self._client.xreadgroup( @@ -90,6 +94,13 @@ async def subscribe_activations( except (asyncio.CancelledError, KeyboardInterrupt): logger.info("Activation subscription cancelled, exiting...") break + except ResponseError as e: + if "NOGROUP" in str(e): + logger.warning("Consumer group lost, re-creating...") + await self._ensure_consumer_group() + else: + logger.exception("Error while subscribing to activations, retrying...") + await asyncio.sleep(1) except Exception: logger.exception("Error while subscribing to activations, retrying...") await asyncio.sleep(1)