Skip to content

WIP: flexible rerun vis#2474

Draft
TomCC7 wants to merge 7 commits into
mainfrom
cc/foxglove-like-rerun
Draft

WIP: flexible rerun vis#2474
TomCC7 wants to merge 7 commits into
mainfrom
cc/foxglove-like-rerun

Conversation

@TomCC7

@TomCC7 TomCC7 commented Jun 11, 2026

Copy link
Copy Markdown
Member

Problem

Closes DIM-XXX

Solution

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

@codecov

codecov Bot commented Jun 11, 2026

Copy link
Copy Markdown

❌ 15 Tests Failed:

Tests completed Failed Passed Skipped
1895 15 1880 156
View the top 3 failed test(s) by shortest run time
dimos.protocol.rpc.test_spec::test_basic_sync_call[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    def test_basic_sync_call(rpc_context, impl_name: str) -> None:
        """Test basic synchronous RPC calls."""
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a985163f0>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98515b50>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98516f30>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98516f30>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98516f30>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_callback_call[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    def test_callback_call(rpc_context, impl_name: str) -> None:
        """Test callback-based RPC calls."""
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:181: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a9853fa40>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a9853f290>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a9853ede0>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a9853ede0>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a9853ede0>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_concurrent_calls[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    @pytest.mark.skipif_macos_bug
    def test_concurrent_calls(rpc_context, impl_name: str) -> None:
        """Test making multiple concurrent RPC calls."""
        # Skip for SharedMemory - double-buffered architecture can't handle concurrent bursts
        # The channel only holds 2 frames, so 1000 rapid concurrent responses overwrite each other
        if impl_name == "shm":
            pytest.skip("SharedMemory uses double-buffering; can't handle 1000 concurrent responses")
    
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:367: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a981ba4e0>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a981bb920>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a981bb620>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a981bb620>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a981bb620>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_exception_handling_callback[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    def test_exception_handling_callback(rpc_context, impl_name: str) -> None:
        """Test that exceptions are properly passed through callback-based RPC calls."""
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a98390260>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98390050>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983918b0>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983918b0>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983918b0>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_exception_handling_sync[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    def test_exception_handling_sync(rpc_context, impl_name: str) -> None:
        """Test that exceptions are properly passed through sync RPC calls."""
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:206: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a983f2e10>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f3d70>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f1250>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f1250>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f1250>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_multiple_services[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    @pytest.mark.skipif_macos_bug
    def test_multiple_services(rpc_context, impl_name: str) -> None:
        """Test serving multiple RPC functions simultaneously."""
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:335: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a983f2270>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f1ac0>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f33e0>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f33e0>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f33e0>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_nonexistent_service[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    def test_nonexistent_service(rpc_context, impl_name: str) -> None:
        """Test calling a service that doesn't exist."""
>       with rpc_context() as (_server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:323: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a983d4560>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983d40b0>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983d6c00>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983d6c00>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983d6c00>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_timeout[redis_rpc_context-redis]
Stack Traces | 0.001s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    @pytest.mark.skipif_macos_bug
    def test_timeout(rpc_context, impl_name: str) -> None:
        """Test that RPC calls properly timeout."""
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:301: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a983f1fa0>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f1e20>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f3ad0>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f3ad0>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a983f3ad0>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.protocol.rpc.test_spec::test_exception_handling_async[redis_rpc_context-redis]
Stack Traces | 0.002s run time
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>
impl_name = 'redis'

    @pytest.mark.parametrize("rpc_context, impl_name", testdata)
    @pytest.mark.asyncio
    async def test_exception_handling_async(rpc_context, impl_name: str) -> None:
        """Test that exceptions are properly passed through async RPC calls."""
>       with rpc_context() as (server, client):

impl_name  = 'redis'
rpc_context = <function redis_rpc_context at 0xff6a9e2e7880>

.../protocol/rpc/test_spec.py:238: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a98392060>
.../protocol/rpc/test_spec.py:92: in redis_rpc_context
    server.start()
        client     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98393b30>
        server     = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98391880>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98391880>

    def start(self) -> None:
        """Start the Redis pub/sub service."""
>       if self._running:
E       AttributeError: 'RedisRPC' object has no attribute '_running'

self       = <dimos.protocol.rpc.redisrpc.RedisRPC object at 0xff6a98391880>

.../pubsub/impl/redispubsub.py:57: AttributeError
dimos.project.test_no_init_files::test_no_init_files
Stack Traces | 0.016s run time
def test_no_init_files():
        dimos_dir = DIMOS_PROJECT_ROOT / "dimos"
        init_files = sorted(dimos_dir.rglob("__init__.py"))
        # The root dimos/__init__.py is allowed for the porcelain lazy import.
        init_files = [f for f in init_files if f != dimos_dir / "__init__.py"]
        if init_files:
            listing = "\n".join(f"  - {f.relative_to(dimos_dir)}" for f in init_files)
>           raise AssertionError(
                f"Found __init__.py files in dimos/:\n{listing}\n\n"
                "__init__.py files are not allowed because they lead to unnecessary "
                "extraneous imports. Everything should be imported straight from the "
                "source module."
            )
E           AssertionError: Found __init__.py files in dimos/:
E             - .../reflex_selector_app/selector_app/__init__.py
E           
E           __init__.py files are not allowed because they lead to unnecessary extraneous imports. Everything should be imported straight from the source module.

dimos_dir  = PosixPath('.../dimos/dimos/dimos')
init_files = [PosixPath('.../dimos/dimos/dimos/.../reflex_selector_app/selector_app/__init__.py')]
listing    = '  - .../reflex_selector_app/selector_app/__init__.py'

dimos/project/test_no_init_files.py:25: AssertionError
dimos.protocol.pubsub.test_spec::test_async_iterator[redis_context-redis_topic-values1]
Stack Traces | 2.7s run time
self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
>               sock = self._connect()

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b36980>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1003: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12...................../site-packages/redis/connection.py:1515: in _connect
    raise err
        canonname  = ''
        err        = ConnectionRefusedError(111, 'Connection refused')
        family     = <AddressFamily.AF_INET: 2>
        proto      = 6
        res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
        socket_address = ('127.0.0.1', 6379)
        socktype   = <SocketKind.SOCK_STREAM: 1>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>

    def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
    
        for res in socket.getaddrinfo(
            self.host, self.port, self.socket_type, socket.SOCK_STREAM
        ):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    
                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in self.socket_keepalive_options.items():
                        sock.setsockopt(socket.IPPROTO_TCP, k, v)
    
                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)
    
                # connect
>               sock.connect(socket_address)
E               ConnectionRefusedError: [Errno 111] Connection refused

canonname  = ''
err        = ConnectionRefusedError(111, 'Connection refused')
family     = <AddressFamily.AF_INET: 2>
proto      = 6
res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
socket_address = ('127.0.0.1', 6379)
socktype   = <SocketKind.SOCK_STREAM: 1>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1499: ConnectionRefusedError

During handling of the above exception, another exception occurred:

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a988ca330>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
>           self._client.ping()  # type: ignore[attr-defined]

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a988ca330>

.../pubsub/impl/redispubsub.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12.../redis/commands/core.py:1250: in ping
    return self.execute_command("PING", **kwargs)
        kwargs     = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:716: in execute_command
    return self._execute_command(*args, **options)
        args       = ('PING',)
        options    = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:722: in _execute_command
    conn = self.connection or pool.get_connection()
        args       = ('PING',)
        command_name = 'PING'
        options    = {}
        pool       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12.../site-packages/redis/utils.py:236: in wrapper
    return func(*args, **kwargs)
        _check_deprecated_args = <function deprecated_args.<locals>._check_deprecated_args at 0xff6a9eea7ec0>
        allowed_args = []
        args       = (<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_tim...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>,)
        filterable_args = {}
        func       = <function ConnectionPool.get_connection at 0xff6a9ef08040>
        kwargs     = {}
.venv/lib/python3.12...................../site-packages/redis/connection.py:3041: in get_connection
    connection.connect()
        command_name = None
        connection = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        is_created = True
        keys       = ()
        options    = {}
        pool_name  = 'localhost:6379_7179be18'
        self       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        start_time_created = 4828.44045825
.venv/lib/python3.12...................../site-packages/redis/connection.py:976: in connect
    self.retry.call_with_retry(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
.venv/lib/python3.12....../site-packages/redis/retry.py:132: in call_with_retry
    raise error
        backoff    = 1.4158408868428314
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b36660>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b36700>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a988ca8d0>
        with_failure_count = False
.venv/lib/python3.12....../site-packages/redis/retry.py:120: in call_with_retry
    return do()
        backoff    = 1.4158408868428314
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b36660>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b36700>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a988ca8d0>
        with_failure_count = False
.venv/lib/python3.12...................../site-packages/redis/connection.py:977: in <lambda>
    lambda: self.connect_check_health(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
                sock = self._connect()
        except socket.timeout:
            e = TimeoutError("Timeout connecting to server")
            record_error_count(
                server_address=self.host,
                server_port=self.port,
                network_peer_address=self.host,
                network_peer_port=self.port,
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
            raise e
        except OSError as e:
            e = ConnectionError(self._error_message(e))
            record_error_count(
                server_address=getattr(self, "host", None),
                server_port=getattr(self, "port", None),
                network_peer_address=getattr(self, "host", None),
                network_peer_port=getattr(self, "port", None),
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
>           raise e
E           redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b36980>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1025: ConnectionError

During handling of the above exception, another exception occurred:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic = 'redis_topic', values = ['redis_value1', 'redis_value2', 'redis_value3']

    @pytest.mark.parametrize("pubsub_context, topic, values", testdata)
    async def test_async_iterator(
        pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
    ) -> None:
        """Test that async iterator receives messages correctly."""
>       with pubsub_context() as x:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic      = 'redis_topic'
values     = ['redis_value1', 'redis_value2', 'redis_value3']

.../protocol/pubsub/test_spec.py:238: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a988c9dc0>
.../protocol/pubsub/test_spec.py:53: in redis_context
    redis_pubsub.start()
        redis_pubsub = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a988ca330>
.../pubsub/impl/redispubsub.py:59: in start
    self._connect()  # type: ignore[no-untyped-call]
        self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a988ca330>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a988ca330>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
            self._client.ping()  # type: ignore[attr-defined]
    
            self._pubsub = self._client.pubsub()  # type: ignore[attr-defined]
            self._running = True
    
            # Start listener thread
            self._listener_thread = threading.Thread(target=self._listen_loop, daemon=True)  # type: ignore[assignment]
            self._listener_thread.start()  # type: ignore[attr-defined]
    
        except Exception as e:
>           raise ConnectionError(
                f"Failed to connect to Redis at {self.config.host}:{self.config.port}: {e}"
            )
E           ConnectionError: Failed to connect to Redis at localhost:6379: Error 111 connecting to localhost:6379. Connection refused.

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a988ca330>

.../pubsub/impl/redispubsub.py:86: ConnectionError
dimos.protocol.pubsub.test_spec::test_multiple_messages[redis_context-redis_topic-values1]
Stack Traces | 3.8s run time
self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
>               sock = self._connect()

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b34040>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1003: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12...................../site-packages/redis/connection.py:1515: in _connect
    raise err
        canonname  = ''
        err        = ConnectionRefusedError(111, 'Connection refused')
        family     = <AddressFamily.AF_INET: 2>
        proto      = 6
        res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
        socket_address = ('127.0.0.1', 6379)
        socktype   = <SocketKind.SOCK_STREAM: 1>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>

    def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
    
        for res in socket.getaddrinfo(
            self.host, self.port, self.socket_type, socket.SOCK_STREAM
        ):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    
                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in self.socket_keepalive_options.items():
                        sock.setsockopt(socket.IPPROTO_TCP, k, v)
    
                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)
    
                # connect
>               sock.connect(socket_address)
E               ConnectionRefusedError: [Errno 111] Connection refused

canonname  = ''
err        = ConnectionRefusedError(111, 'Connection refused')
family     = <AddressFamily.AF_INET: 2>
proto      = 6
res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
socket_address = ('127.0.0.1', 6379)
socktype   = <SocketKind.SOCK_STREAM: 1>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1499: ConnectionRefusedError

During handling of the above exception, another exception occurred:

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a98473c80>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
>           self._client.ping()  # type: ignore[attr-defined]

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a98473c80>

.../pubsub/impl/redispubsub.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12.../redis/commands/core.py:1250: in ping
    return self.execute_command("PING", **kwargs)
        kwargs     = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:716: in execute_command
    return self._execute_command(*args, **options)
        args       = ('PING',)
        options    = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:722: in _execute_command
    conn = self.connection or pool.get_connection()
        args       = ('PING',)
        command_name = 'PING'
        options    = {}
        pool       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12.../site-packages/redis/utils.py:236: in wrapper
    return func(*args, **kwargs)
        _check_deprecated_args = <function deprecated_args.<locals>._check_deprecated_args at 0xff6a9eea7ec0>
        allowed_args = []
        args       = (<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_tim...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>,)
        filterable_args = {}
        func       = <function ConnectionPool.get_connection at 0xff6a9ef08040>
        kwargs     = {}
.venv/lib/python3.12...................../site-packages/redis/connection.py:3041: in get_connection
    connection.connect()
        command_name = None
        connection = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        is_created = True
        keys       = ()
        options    = {}
        pool_name  = 'localhost:6379_30859107'
        self       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        start_time_created = 4823.711898118
.venv/lib/python3.12...................../site-packages/redis/connection.py:976: in connect
    self.retry.call_with_retry(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
.venv/lib/python3.12....../site-packages/redis/retry.py:132: in call_with_retry
    raise error
        backoff    = 2.113668272437905
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b777e0>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b77a60>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a98471a90>
        with_failure_count = False
.venv/lib/python3.12....../site-packages/redis/retry.py:120: in call_with_retry
    return do()
        backoff    = 2.113668272437905
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b777e0>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b77a60>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a98471a90>
        with_failure_count = False
.venv/lib/python3.12...................../site-packages/redis/connection.py:977: in <lambda>
    lambda: self.connect_check_health(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
                sock = self._connect()
        except socket.timeout:
            e = TimeoutError("Timeout connecting to server")
            record_error_count(
                server_address=self.host,
                server_port=self.port,
                network_peer_address=self.host,
                network_peer_port=self.port,
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
            raise e
        except OSError as e:
            e = ConnectionError(self._error_message(e))
            record_error_count(
                server_address=getattr(self, "host", None),
                server_port=getattr(self, "port", None),
                network_peer_address=getattr(self, "host", None),
                network_peer_port=getattr(self, "port", None),
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
>           raise e
E           redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b34040>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1025: ConnectionError

During handling of the above exception, another exception occurred:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic = 'redis_topic', values = ['redis_value1', 'redis_value2', 'redis_value3']

    @pytest.mark.parametrize("pubsub_context, topic, values", testdata)
    @pytest.mark.skipif_macos_bug
    def test_multiple_messages(
        pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
    ) -> None:
        """Test that subscribers receive multiple messages in order."""
>       with pubsub_context() as x:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic      = 'redis_topic'
values     = ['redis_value1', 'redis_value2', 'redis_value3']

.../protocol/pubsub/test_spec.py:217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a984715b0>
.../protocol/pubsub/test_spec.py:53: in redis_context
    redis_pubsub.start()
        redis_pubsub = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a98473c80>
.../pubsub/impl/redispubsub.py:59: in start
    self._connect()  # type: ignore[no-untyped-call]
        self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a98473c80>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a98473c80>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
            self._client.ping()  # type: ignore[attr-defined]
    
            self._pubsub = self._client.pubsub()  # type: ignore[attr-defined]
            self._running = True
    
            # Start listener thread
            self._listener_thread = threading.Thread(target=self._listen_loop, daemon=True)  # type: ignore[assignment]
            self._listener_thread.start()  # type: ignore[attr-defined]
    
        except Exception as e:
>           raise ConnectionError(
                f"Failed to connect to Redis at {self.config.host}:{self.config.port}: {e}"
            )
E           ConnectionError: Failed to connect to Redis at localhost:6379: Error 111 connecting to localhost:6379. Connection refused.

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a98473c80>

.../pubsub/impl/redispubsub.py:86: ConnectionError
dimos.protocol.pubsub.test_spec::test_unsubscribe[redis_context-redis_topic-values1]
Stack Traces | 6.83s run time
self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
>               sock = self._connect()

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b76160>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1003: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12...................../site-packages/redis/connection.py:1515: in _connect
    raise err
        canonname  = ''
        err        = ConnectionRefusedError(111, 'Connection refused')
        family     = <AddressFamily.AF_INET: 2>
        proto      = 6
        res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
        socket_address = ('127.0.0.1', 6379)
        socktype   = <SocketKind.SOCK_STREAM: 1>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>

    def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
    
        for res in socket.getaddrinfo(
            self.host, self.port, self.socket_type, socket.SOCK_STREAM
        ):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    
                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in self.socket_keepalive_options.items():
                        sock.setsockopt(socket.IPPROTO_TCP, k, v)
    
                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)
    
                # connect
>               sock.connect(socket_address)
E               ConnectionRefusedError: [Errno 111] Connection refused

canonname  = ''
err        = ConnectionRefusedError(111, 'Connection refused')
family     = <AddressFamily.AF_INET: 2>
proto      = 6
res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
socket_address = ('127.0.0.1', 6379)
socktype   = <SocketKind.SOCK_STREAM: 1>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1499: ConnectionRefusedError

During handling of the above exception, another exception occurred:

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9867dbb0>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
>           self._client.ping()  # type: ignore[attr-defined]

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9867dbb0>

.../pubsub/impl/redispubsub.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12.../redis/commands/core.py:1250: in ping
    return self.execute_command("PING", **kwargs)
        kwargs     = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:716: in execute_command
    return self._execute_command(*args, **options)
        args       = ('PING',)
        options    = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:722: in _execute_command
    conn = self.connection or pool.get_connection()
        args       = ('PING',)
        command_name = 'PING'
        options    = {}
        pool       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12.../site-packages/redis/utils.py:236: in wrapper
    return func(*args, **kwargs)
        _check_deprecated_args = <function deprecated_args.<locals>._check_deprecated_args at 0xff6a9eea7ec0>
        allowed_args = []
        args       = (<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_tim...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>,)
        filterable_args = {}
        func       = <function ConnectionPool.get_connection at 0xff6a9ef08040>
        kwargs     = {}
.venv/lib/python3.12...................../site-packages/redis/connection.py:3041: in get_connection
    connection.connect()
        command_name = None
        connection = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        is_created = True
        keys       = ()
        options    = {}
        pool_name  = 'localhost:6379_43be29c6'
        self       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        start_time_created = 4816.033308172
.venv/lib/python3.12...................../site-packages/redis/connection.py:976: in connect
    self.retry.call_with_retry(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
.venv/lib/python3.12....../site-packages/redis/retry.py:132: in call_with_retry
    raise error
        backoff    = 5.676637006822097
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75440>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75760>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a987a7920>
        with_failure_count = False
.venv/lib/python3.12....../site-packages/redis/retry.py:120: in call_with_retry
    return do()
        backoff    = 5.676637006822097
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75440>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75760>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a987a7920>
        with_failure_count = False
.venv/lib/python3.12...................../site-packages/redis/connection.py:977: in <lambda>
    lambda: self.connect_check_health(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
                sock = self._connect()
        except socket.timeout:
            e = TimeoutError("Timeout connecting to server")
            record_error_count(
                server_address=self.host,
                server_port=self.port,
                network_peer_address=self.host,
                network_peer_port=self.port,
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
            raise e
        except OSError as e:
            e = ConnectionError(self._error_message(e))
            record_error_count(
                server_address=getattr(self, "host", None),
                server_port=getattr(self, "port", None),
                network_peer_address=getattr(self, "host", None),
                network_peer_port=getattr(self, "port", None),
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
>           raise e
E           redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b76160>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1025: ConnectionError

During handling of the above exception, another exception occurred:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic = 'redis_topic', values = ['redis_value1', 'redis_value2', 'redis_value3']

    @pytest.mark.parametrize("pubsub_context, topic, values", testdata)
    def test_unsubscribe(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None:
        """Test that unsubscribed callbacks don't receive messages."""
>       with pubsub_context() as x:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic      = 'redis_topic'
values     = ['redis_value1', 'redis_value2', 'redis_value3']

.../protocol/pubsub/test_spec.py:187: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a9867fa40>
.../protocol/pubsub/test_spec.py:53: in redis_context
    redis_pubsub.start()
        redis_pubsub = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9867dbb0>
.../pubsub/impl/redispubsub.py:59: in start
    self._connect()  # type: ignore[no-untyped-call]
        self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9867dbb0>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9867dbb0>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
            self._client.ping()  # type: ignore[attr-defined]
    
            self._pubsub = self._client.pubsub()  # type: ignore[attr-defined]
            self._running = True
    
            # Start listener thread
            self._listener_thread = threading.Thread(target=self._listen_loop, daemon=True)  # type: ignore[assignment]
            self._listener_thread.start()  # type: ignore[attr-defined]
    
        except Exception as e:
>           raise ConnectionError(
                f"Failed to connect to Redis at {self.config.host}:{self.config.port}: {e}"
            )
E           ConnectionError: Failed to connect to Redis at localhost:6379: Error 111 connecting to localhost:6379. Connection refused.

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9867dbb0>

.../pubsub/impl/redispubsub.py:86: ConnectionError
dimos.protocol.pubsub.test_spec::test_multiple_subscribers[redis_context-redis_topic-values1]
Stack Traces | 8.2s run time
self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
>               sock = self._connect()

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b754e0>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1003: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12...................../site-packages/redis/connection.py:1515: in _connect
    raise err
        canonname  = ''
        err        = ConnectionRefusedError(111, 'Connection refused')
        family     = <AddressFamily.AF_INET: 2>
        proto      = 6
        res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
        socket_address = ('127.0.0.1', 6379)
        socktype   = <SocketKind.SOCK_STREAM: 1>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>

    def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
    
        for res in socket.getaddrinfo(
            self.host, self.port, self.socket_type, socket.SOCK_STREAM
        ):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    
                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in self.socket_keepalive_options.items():
                        sock.setsockopt(socket.IPPROTO_TCP, k, v)
    
                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)
    
                # connect
>               sock.connect(socket_address)
E               ConnectionRefusedError: [Errno 111] Connection refused

canonname  = ''
err        = ConnectionRefusedError(111, 'Connection refused')
family     = <AddressFamily.AF_INET: 2>
proto      = 6
res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
socket_address = ('127.0.0.1', 6379)
socktype   = <SocketKind.SOCK_STREAM: 1>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1499: ConnectionRefusedError

During handling of the above exception, another exception occurred:

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9888fa40>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
>           self._client.ping()  # type: ignore[attr-defined]

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9888fa40>

.../pubsub/impl/redispubsub.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12.../redis/commands/core.py:1250: in ping
    return self.execute_command("PING", **kwargs)
        kwargs     = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:716: in execute_command
    return self._execute_command(*args, **options)
        args       = ('PING',)
        options    = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:722: in _execute_command
    conn = self.connection or pool.get_connection()
        args       = ('PING',)
        command_name = 'PING'
        options    = {}
        pool       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12.../site-packages/redis/utils.py:236: in wrapper
    return func(*args, **kwargs)
        _check_deprecated_args = <function deprecated_args.<locals>._check_deprecated_args at 0xff6a9eea7ec0>
        allowed_args = []
        args       = (<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_tim...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>,)
        filterable_args = {}
        func       = <function ConnectionPool.get_connection at 0xff6a9ef08040>
        kwargs     = {}
.venv/lib/python3.12...................../site-packages/redis/connection.py:3041: in get_connection
    connection.connect()
        command_name = None
        connection = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        is_created = True
        keys       = ()
        options    = {}
        pool_name  = 'localhost:6379_4f2e26b9'
        self       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        start_time_created = 4807.136164143
.venv/lib/python3.12...................../site-packages/redis/connection.py:976: in connect
    self.retry.call_with_retry(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
.venv/lib/python3.12....../site-packages/redis/retry.py:132: in call_with_retry
    raise error
        backoff    = 7.844992689993141
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75080>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75120>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a9842e570>
        with_failure_count = False
.venv/lib/python3.12....../site-packages/redis/retry.py:120: in call_with_retry
    return do()
        backoff    = 7.844992689993141
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75080>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2b75120>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a9842e570>
        with_failure_count = False
.venv/lib/python3.12...................../site-packages/redis/connection.py:977: in <lambda>
    lambda: self.connect_check_health(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
                sock = self._connect()
        except socket.timeout:
            e = TimeoutError("Timeout connecting to server")
            record_error_count(
                server_address=self.host,
                server_port=self.port,
                network_peer_address=self.host,
                network_peer_port=self.port,
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
            raise e
        except OSError as e:
            e = ConnectionError(self._error_message(e))
            record_error_count(
                server_address=getattr(self, "host", None),
                server_port=getattr(self, "port", None),
                network_peer_address=getattr(self, "host", None),
                network_peer_port=getattr(self, "port", None),
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
>           raise e
E           redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2b754e0>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1025: ConnectionError

During handling of the above exception, another exception occurred:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic = 'redis_topic', values = ['redis_value1', 'redis_value2', 'redis_value3']

    @pytest.mark.parametrize("pubsub_context, topic, values", testdata)
    def test_multiple_subscribers(
        pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
    ) -> None:
        """Test that multiple subscribers receive the same message."""
>       with pubsub_context() as x:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic      = 'redis_topic'
values     = ['redis_value1', 'redis_value2', 'redis_value3']

.../protocol/pubsub/test_spec.py:166: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a9888ef90>
.../protocol/pubsub/test_spec.py:53: in redis_context
    redis_pubsub.start()
        redis_pubsub = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9888fa40>
.../pubsub/impl/redispubsub.py:59: in start
    self._connect()  # type: ignore[no-untyped-call]
        self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9888fa40>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9888fa40>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
            self._client.ping()  # type: ignore[attr-defined]
    
            self._pubsub = self._client.pubsub()  # type: ignore[attr-defined]
            self._running = True
    
            # Start listener thread
            self._listener_thread = threading.Thread(target=self._listen_loop, daemon=True)  # type: ignore[assignment]
            self._listener_thread.start()  # type: ignore[attr-defined]
    
        except Exception as e:
>           raise ConnectionError(
                f"Failed to connect to Redis at {self.config.host}:{self.config.port}: {e}"
            )
E           ConnectionError: Failed to connect to Redis at localhost:6379: Error 111 connecting to localhost:6379. Connection refused.

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a9888fa40>

.../pubsub/impl/redispubsub.py:86: ConnectionError
dimos.protocol.pubsub.test_spec::test_store[redis_context-redis_topic-values1]
Stack Traces | 10.4s run time
self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
>               sock = self._connect()

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2bb6e80>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1003: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12...................../site-packages/redis/connection.py:1515: in _connect
    raise err
        canonname  = ''
        err        = ConnectionRefusedError(111, 'Connection refused')
        family     = <AddressFamily.AF_INET: 2>
        proto      = 6
        res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
        socket_address = ('127.0.0.1', 6379)
        socktype   = <SocketKind.SOCK_STREAM: 1>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>

    def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
    
        for res in socket.getaddrinfo(
            self.host, self.port, self.socket_type, socket.SOCK_STREAM
        ):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    
                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in self.socket_keepalive_options.items():
                        sock.setsockopt(socket.IPPROTO_TCP, k, v)
    
                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)
    
                # connect
>               sock.connect(socket_address)
E               ConnectionRefusedError: [Errno 111] Connection refused

canonname  = ''
err        = ConnectionRefusedError(111, 'Connection refused')
family     = <AddressFamily.AF_INET: 2>
proto      = 6
res        = (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
sock       = <socket.socket [closed] fd=-1, family=2, type=1, proto=6>
socket_address = ('127.0.0.1', 6379)
socktype   = <SocketKind.SOCK_STREAM: 1>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1499: ConnectionRefusedError

During handling of the above exception, another exception occurred:

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a99ab9e20>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
>           self._client.ping()  # type: ignore[attr-defined]

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a99ab9e20>

.../pubsub/impl/redispubsub.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12.../redis/commands/core.py:1250: in ping
    return self.execute_command("PING", **kwargs)
        kwargs     = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:716: in execute_command
    return self._execute_command(*args, **options)
        args       = ('PING',)
        options    = {}
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12....../site-packages/redis/client.py:722: in _execute_command
    conn = self.connection or pool.get_connection()
        args       = ('PING',)
        command_name = 'PING'
        options    = {}
        pool       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        self       = <redis.client.Redis(<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<R...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>)>
.venv/lib/python3.12.../site-packages/redis/utils.py:236: in wrapper
    return func(*args, **kwargs)
        _check_deprecated_args = <function deprecated_args.<locals>._check_deprecated_args at 0xff6a9eea7ec0>
        allowed_args = []
        args       = (<redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_tim...otocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>,)
        filterable_args = {}
        func       = <function ConnectionPool.get_connection at 0xff6a9ef08040>
        kwargs     = {}
.venv/lib/python3.12...................../site-packages/redis/connection.py:3041: in get_connection
    connection.connect()
        command_name = None
        connection = <redis.connection.Connection(host=localhost,port=6379,db=0)>
        is_created = True
        keys       = ()
        options    = {}
        pool_name  = 'localhost:6379_2d6fdbbc'
        self       = <redis.connection.ConnectionPool(<redis.connection.Connection(db=0,username=<REDACTED>,password=<REDACTED>,socket_time...protocol=2,host=localhost,port=6379,socket_connect_timeout=None,socket_keepalive=None,socket_keepalive_options=None)>)>
        start_time_created = 4796.146530413
.venv/lib/python3.12...................../site-packages/redis/connection.py:976: in connect
    self.retry.call_with_retry(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
.venv/lib/python3.12....../site-packages/redis/retry.py:132: in call_with_retry
    raise error
        backoff    = 7.377617265662873
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2bb67a0>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2bb6840>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a99a429c0>
        with_failure_count = False
.venv/lib/python3.12....../site-packages/redis/retry.py:120: in call_with_retry
    return do()
        backoff    = 7.377617265662873
        do         = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2bb67a0>
        fail       = <function AbstractConnection.connect.<locals>.<lambda> at 0xff6ab2bb6840>
        failures   = 4
        is_retryable = None
        self       = <redis.retry.Retry object at 0xff6a99a429c0>
        with_failure_count = False
.venv/lib/python3.12...................../site-packages/redis/connection.py:977: in <lambda>
    lambda: self.connect_check_health(
        self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redis.connection.Connection(host=localhost,port=6379,db=0)>
check_health = True, retry_socket_connect = False

    def connect_check_health(
        self, check_health: bool = True, retry_socket_connect: bool = True
    ):
        if self._sock:
            return
        # Track actual retry attempts for error reporting
        actual_retry_attempts = [0]
    
        def failure_callback(error, failure_count):
            actual_retry_attempts[0] = failure_count
            self.disconnect(error=error, failure_count=failure_count)
    
        try:
            if retry_socket_connect:
                sock = self.retry.call_with_retry(
                    self._connect,
                    failure_callback,
                    with_failure_count=True,
                )
            else:
                sock = self._connect()
        except socket.timeout:
            e = TimeoutError("Timeout connecting to server")
            record_error_count(
                server_address=self.host,
                server_port=self.port,
                network_peer_address=self.host,
                network_peer_port=self.port,
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
            raise e
        except OSError as e:
            e = ConnectionError(self._error_message(e))
            record_error_count(
                server_address=getattr(self, "host", None),
                server_port=getattr(self, "port", None),
                network_peer_address=getattr(self, "host", None),
                network_peer_port=getattr(self, "port", None),
                error_type=e,
                retry_attempts=actual_retry_attempts[0],
            )
>           raise e
E           redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

actual_retry_attempts = [0]
check_health = True
failure_callback = <function AbstractConnection.connect_check_health.<locals>.failure_callback at 0xff6ab2bb6e80>
retry_socket_connect = False
self       = <redis.connection.Connection(host=localhost,port=6379,db=0)>

.venv/lib/python3.12...................../site-packages/redis/connection.py:1025: ConnectionError

During handling of the above exception, another exception occurred:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic = 'redis_topic', values = ['redis_value1', 'redis_value2', 'redis_value3']

    @pytest.mark.parametrize("pubsub_context, topic, values", testdata)
    def test_store(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None:
>       with pubsub_context() as x:

pubsub_context = <function redis_context at 0xff6a9e2e5620>
topic      = 'redis_topic'
values     = ['redis_value1', 'redis_value2', 'redis_value3']

.../protocol/pubsub/test_spec.py:150: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../usr/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
        self       = <contextlib._GeneratorContextManager object at 0xff6a99ab9820>
.../protocol/pubsub/test_spec.py:53: in redis_context
    redis_pubsub.start()
        redis_pubsub = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a99ab9e20>
.../pubsub/impl/redispubsub.py:59: in start
    self._connect()  # type: ignore[no-untyped-call]
        self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a99ab9e20>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a99ab9e20>

    def _connect(self):  # type: ignore[no-untyped-def]
        """Connect to Redis and set up pub/sub."""
        try:
            self._client = redis.Redis(
                host=self.config.host,
                port=self.config.port,
                db=self.config.db,
                decode_responses=True,
                **self.config.kwargs,
            )
            # Test connection
            self._client.ping()  # type: ignore[attr-defined]
    
            self._pubsub = self._client.pubsub()  # type: ignore[attr-defined]
            self._running = True
    
            # Start listener thread
            self._listener_thread = threading.Thread(target=self._listen_loop, daemon=True)  # type: ignore[assignment]
            self._listener_thread.start()  # type: ignore[attr-defined]
    
        except Exception as e:
>           raise ConnectionError(
                f"Failed to connect to Redis at {self.config.host}:{self.config.port}: {e}"
            )
E           ConnectionError: Failed to connect to Redis at localhost:6379: Error 111 connecting to localhost:6379. Connection refused.

self       = <dimos.protocol.pubsub.impl.redispubsub.Redis object at 0xff6a99ab9e20>

.../pubsub/impl/redispubsub.py:86: ConnectionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant