diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e04dde6..3c2be854 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Change Log +## [v2.1.4](https://github.com/ably/ably-python/tree/v2.1.4) + +[Full Changelog](https://github.com/ably/ably-python/compare/v2.1.3...v2.1.4) + +### What's Changed + +- Fixed handling of normal WebSocket close frames and improved reconnection logic [#672](https://github.com/ably/ably-python/pull/672) + ## [v2.1.3](https://github.com/ably/ably-python/tree/v2.1.3) [Full Changelog](https://github.com/ably/ably-python/compare/v2.1.2...v2.1.3) diff --git a/ably/__init__.py b/ably/__init__.py index b77548b7..c364ffe6 100644 --- a/ably/__init__.py +++ b/ably/__init__.py @@ -16,4 +16,4 @@ logger.addHandler(logging.NullHandler()) api_version = '3' -lib_version = '2.1.3' +lib_version = '2.1.4' diff --git a/ably/transport/websockettransport.py b/ably/transport/websockettransport.py index 140b9d25..54b423c6 100644 --- a/ably/transport/websockettransport.py +++ b/ably/transport/websockettransport.py @@ -66,7 +66,8 @@ def __init__(self, connection_manager: ConnectionManager, host: str, params: dic def connect(self): headers = HttpUtils.default_headers() query_params = urllib.parse.urlencode(self.params) - ws_url = (f'wss://{self.host}?{query_params}') + scheme = 'wss' if self.options.tls else 'ws' + ws_url = f'{scheme}://{self.host}?{query_params}' log.info(f'connect(): attempting to connect to {ws_url}') self.ws_connect_task = asyncio.create_task(self.ws_connect(ws_url, headers)) self.ws_connect_task.add_done_callback(self.on_ws_connect_done) @@ -110,6 +111,11 @@ async def _handle_websocket_connection(self, ws_url, websocket): if not self.is_disposed: await self.dispose() self.connection_manager.deactivate_transport(err) + else: + # Read loop exited normally (e.g., server sent normal WS close frame) + if not self.is_disposed: + await self.dispose() + self.connection_manager.deactivate_transport() async def on_protocol_message(self, msg): self.on_activity() @@ -214,8 +220,9 @@ async def send(self, message: dict): await self.websocket.send(raw_msg) def set_idle_timer(self, timeout: float): - if not self.idle_timer: - self.idle_timer = Timer(timeout, self.on_idle_timer_expire) + if self.idle_timer: + self.idle_timer.cancel() + self.idle_timer = Timer(timeout, self.on_idle_timer_expire) async def on_idle_timer_expire(self): self.idle_timer = None diff --git a/pyproject.toml b/pyproject.toml index 9f265656..da053830 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ably" -version = "2.1.3" +version = "2.1.4" description = "Python REST and Realtime client library SDK for Ably realtime messaging service" readme = "LONG_DESCRIPTION.rst" requires-python = ">=3.7" diff --git a/test/ably/realtime/realtimeconnection_test.py b/test/ably/realtime/realtimeconnection_test.py index b4e53ed7..3db53909 100644 --- a/test/ably/realtime/realtimeconnection_test.py +++ b/test/ably/realtime/realtimeconnection_test.py @@ -1,6 +1,14 @@ import asyncio import pytest +from websockets import connect as _ws_connect + +try: + # websockets 15+ preferred import + from websockets.asyncio.server import serve as ws_serve +except ImportError: + # websockets 14 and earlier fallback + from websockets.server import serve as ws_serve from ably.realtime.connection import ConnectionEvent, ConnectionState from ably.transport.defaults import Defaults @@ -10,6 +18,68 @@ from test.ably.utils import BaseAsyncTestCase +async def _relay(src, dst): + try: + async for msg in src: + await dst.send(msg) + except Exception: + pass + + +class WsProxy: + """Local WS proxy that forwards to real Ably and lets tests trigger a normal close.""" + + def __init__(self, target_host: str): + self.target_host = target_host + self.server = None + self.port: int | None = None + self._close_event: asyncio.Event | None = None + + async def _handler(self, client_ws): + # Create a fresh event for this connection; signal to drop the connection cleanly + self._close_event = asyncio.Event() + path = client_ws.request.path # e.g. "/?key=...&format=json" + target_url = f"wss://{self.target_host}{path}" + try: + async with _ws_connect(target_url, ping_interval=None) as server_ws: + c2s = asyncio.create_task(_relay(client_ws, server_ws)) + s2c = asyncio.create_task(_relay(server_ws, client_ws)) + close_task = asyncio.create_task(self._close_event.wait()) + try: + await asyncio.wait([c2s, s2c, close_task], return_when=asyncio.FIRST_COMPLETED) + finally: + c2s.cancel() + s2c.cancel() + close_task.cancel() + except Exception: + pass + # After _handler returns the websockets server sends a normal close frame (1000) + + async def close_active_connection(self): + """Trigger a normal WS close (code 1000) on the currently active client connection. + + Signals the handler to exit; the websockets server framework then sends the + close frame automatically when the handler coroutine returns. + """ + if self._close_event: + self._close_event.set() + + @property + def endpoint(self) -> str: + """Endpoint string to pass to AblyRealtime (combine with tls=False).""" + return f"127.0.0.1:{self.port}" + + async def __aenter__(self): + self.server = await ws_serve(self._handler, "127.0.0.1", 0, ping_interval=None) + self.port = self.server.sockets[0].getsockname()[1] + return self + + async def __aexit__(self, *args): + if self.server: + self.server.close() + await self.server.wait_closed() + + class TestRealtimeConnection(BaseAsyncTestCase): async def asyncSetUp(self): self.test_vars = await TestApp.get_test_vars() @@ -399,3 +469,37 @@ async def on_protocol_message(msg): await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) await ably.close() + + async def test_normal_ws_close_triggers_immediate_reconnection(self): + """Server normal WS close (code 1000) must trigger immediate reconnection. + + Regression test: ConnectionClosedOK was silently swallowed and deactivate_transport + was never called, leaving the client disconnected until the idle timer fired. + """ + async with WsProxy(self.test_vars["host"]) as proxy: + ably = await TestApp.get_ably_realtime( + disconnected_retry_timeout=500_000, + suspended_retry_timeout=500_000, + tls=False, + realtime_host=proxy.endpoint, + ) + + try: + await asyncio.wait_for( + ably.connection.once_async(ConnectionState.CONNECTED), timeout=10 + ) + + # Simulate server sending a normal WS close frame + await proxy.close_active_connection() + + # Must go CONNECTING quickly — not after the 25 s idle timer + await asyncio.wait_for( + ably.connection.once_async(ConnectionState.CONNECTING), timeout=1 + ) + + # Must reconnect immediately — not after the 500 s retry timer + await asyncio.wait_for( + ably.connection.once_async(ConnectionState.CONNECTED), timeout=10 + ) + finally: + await ably.close() diff --git a/uv.lock b/uv.lock index 0a0c446a..456f8d0f 100644 --- a/uv.lock +++ b/uv.lock @@ -10,7 +10,7 @@ resolution-markers = [ [[package]] name = "ably" -version = "2.1.2" +version = "2.1.4" source = { editable = "." } dependencies = [ { name = "h2", version = "4.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" },