fix: close all memory stream ends in client transport cleanup#2266
fix: close all memory stream ends in client transport cleanup#2266
Conversation
Client transports for SSE, WebSocket, and StreamableHTTP create 4 memory stream ends (2 paired streams) but only closed 2 in their finally blocks. anyio memory stream ends are independent — closing the writer does not close the reader. The unclosed ends leak and emit ResourceWarning when garbage collected. This caused flaky test failures in CI: a transport connection error in one test would leak streams, then GC in a later unrelated test would trigger ResourceWarning, which pytest promotes to a test failure. Fix follows the existing correct pattern in stdio.py: - sse.py: close all 4 stream ends in the existing finally block - streamable_http.py: close all 4 stream ends in the existing finally block (read_stream was previously never closed, even on happy path) - websocket.py: add try/finally wrapping the entire body, closing all 4 stream ends (previously had no cleanup at all — ws_connect failure leaked everything) Regression tests force gc.collect() after the transport context exits so leaked streams fail deterministically in the test that caused them.
The gc.collect() in these tests was picking up leaked PipeHandles from flaky stdio tests (TestChildProcessCleanup) on the same xdist worker, causing false failures on Windows CI. Now uses a custom sys.unraisablehook that filters for MemoryObject stream leaks specifically, ignoring unrelated resources leaked by other tests. Also adds explicit del exc_info in the SSE test since the traceback would otherwise keep leaked stream locals alive past gc.collect().
The filterwarnings on test_sse_client_handles_empty_keepalive_pings was added in PR #1753 with a comment claiming the leak was a test artifact from mocking aconnect_sse. It wasn't — it was the real sse_client bug where read_stream and write_stream_reader were never closed in the finally block. With that bug now fixed, the filter is no longer needed.
src/mcp/client/websocket.py
Outdated
| finally: | ||
| await read_stream_writer.aclose() | ||
| await write_stream.aclose() | ||
| await read_stream.aclose() | ||
| await write_stream_reader.aclose() |
There was a problem hiding this comment.
@agronholm Sorry to bother, but can you share a bit of knowledge here? Is this really necessary everywhere? Are we doing something wrong?
There was a problem hiding this comment.
I take it this is no longer relevant?
Replace explicit try/finally + aclose() chains with async with on all 4 stream ends. Memory stream context managers are idempotent and have no checkpoints in __aexit__, so this is semantically identical to the try/finally form with the same teardown ordering — but the ownership is stated once at creation time and can't drift. For websocket_client, also move stream creation inside ws_connect so a connection failure never creates streams in the first place. Matches the existing pattern in shared/memory.py.
There was a problem hiding this comment.
LGTM — straightforward resource leak fix with solid regression tests.
Extended reasoning...
Overview
This PR fixes a resource leak in three client transports (SSE, StreamableHTTP, WebSocket) where only 2 of 4 anyio memory stream ends were being closed in finally blocks. The fix adds the missing aclose() calls (2 lines each for sse.py and streamable_http.py) and wraps websocket.py in a try/finally (which previously had none). A new test file adds 3 regression tests that force GC and verify no ResourceWarning is emitted, and an existing filterwarnings decorator that was masking the bug is removed from test_sse.py.
Security risks
None. This is pure resource cleanup — adding aclose() calls to stream objects that are already being discarded. No data handling, no auth, no network surface changes.
Level of scrutiny
Low. The changes are mechanical and follow the existing correct pattern in stdio.py (which already closes all 4 stream ends at lines 131-134 and 208-211). The PR description correctly notes that anyio aclose() is idempotent — in fact, sse.py already relied on this before the PR (the sse_reader finally block and the outer finally block both close read_stream_writer), so this is not introducing new idempotence assumptions. The websocket.py change is just indentation plus a try/finally wrapper; the inner logic is unchanged.
Other factors
- The regression tests are well-designed: they use
sys.unraisablehookto capture onlyMemoryObjectstream warnings (explicitly ignoring unrelated leaks from other tests on the same xdist worker), trigger the error/exit paths, and forcegc.collect()to make leaks deterministic. - The PR description includes CI evidence of the flakiness this fixes, with links to runs where unrelated tests failed due to GC-timing of leaked streams.
- Removing the
filterwarningsdecorator fromtest_sse_client_handles_empty_keepalive_pingsdemonstrates the fix actually resolves the root cause rather than papering over it. - No outstanding reviewer comments, no CODEOWNERS file, no design decisions involved.
…rage Python 3.11's bytecode for nested async with blocks produces extra branch arcs that coverage.py tracks but the test suite doesn't exercise. Merging the stream context-managers with the task group into a single async with restores the same nesting depth as main, so branch count stays at 4 instead of 6. Teardown order is also slightly better this way: tg.__aexit__ runs first (waits for cancelled tasks), then stream ends close in reverse order — tasks are fully done before streams close.
The async with form triggers coverage.py false-positive branch arcs on Python 3.14 — nested multi-CM async with creates WITH_EXCEPT_START handlers whose POP_JUMP_IF_TRUE branches (did __aexit__ suppress?) get attributed to inner async with lines via the exception table. Memory stream __aexit__ never suppresses, so those arcs are unreachable, but coverage.py's static analysis expects them. try/finally has no suppression branch (it's unconditional RERAISE), so it sidesteps the issue entirely. Keeping the explicit aclose() chain for these two files; websocket.py stays on the merged async with form since its single-level nesting is clean on all tested Python versions (3.10-3.14).
| def _unused_tcp_port() -> int: | ||
| """Return a port with no listener. Binding then closing leaves the port unbound.""" | ||
| with socket.socket() as s: | ||
| s.bind(("127.0.0.1", 0)) | ||
| return s.getsockname()[1] |
There was a problem hiding this comment.
You could just use the free_tcp_port fixture.
The anyio pytest plugin's free_tcp_port fixture tracks allocated ports within a session so concurrent tests don't step on each other, which the hand-rolled bind-and-close approach doesn't guarantee.
| finally: | ||
| sys.unraisablehook = old_hook | ||
|
|
||
|
|
||
| @pytest.mark.anyio | ||
| async def test_sse_client_closes_all_streams_on_connection_error(free_tcp_port: int) -> None: | ||
| """sse_client must close all 4 stream ends when the connection fails. | ||
|
|
There was a problem hiding this comment.
🟡 The check "MemoryObject" in repr(args.object) works, but args.object here is the __del__ function (<function MemoryObjectSendStream.__del__ at 0x...>), not the stream instance — the match succeeds only because Python's function repr includes the qualname. The failure message will show function reprs instead of stream reprs. Checking str(args.exc_value) (which is Unclosed <MemoryObjectSendStream at ...>) would be semantically correct and give more useful failure output.
Extended reasoning...
What's happening
With filterwarnings = ["error"] active (pyproject.toml:179), anyio's ResourceWarning emitted inside MemoryObjectSendStream.__del__ is promoted to an exception. Python can't propagate an exception out of __del__, so it calls sys.unraisablehook instead. Per the Python docs, when the unraisable exception comes from __del__, the hook's args.object is the __del__ method object itself, not the instance being destructed.
Step-by-step proof
import sys, warnings, gc
warnings.filterwarnings('error', category=ResourceWarning)
captured = []
sys.unraisablehook = lambda args: captured.append(args)
class MemoryObjectSendStream:
def __del__(self):
warnings.warn(f'Unclosed <{type(self).__name__} at {id(self):#x}>',
ResourceWarning)
x = MemoryObjectSendStream(); del x; gc.collect()
args = captured[0]
print(repr(args.object)) # <function MemoryObjectSendStream.__del__ at 0x...>
print(str(args.exc_value)) # Unclosed <MemoryObjectSendStream at 0x...>Both strings contain "MemoryObject", so the check on line 53 passes — but repr(args.object) matches only because CPython's function __repr__ happens to include the qualified name (MemoryObjectSendStream.__del__). The code is inspecting the wrong attribute and succeeding by coincidence.
Why this is only a nit
Functionally, the test is fine: anyio defines __del__ directly on MemoryObjectSendStream and MemoryObjectReceiveStream, so both qualnames will always contain "MemoryObject", and unrelated leaks (e.g. PipeHandle.__del__) will not. Python's function repr format has included qualnames since 3.3 and is extremely stable. There are no false positives or false negatives today.
Impact
If a regression reintroduces the leak, the assertion message will read:
Memory streams leaked: ['<function MemoryObjectSendStream.__del__ at 0x7f...>', ...]
instead of the much more useful stream repr (with address/state). Anyone debugging the regression has to re-run with extra instrumentation to find out which streams leaked.
Suggested fix
if "MemoryObject" in str(args.exc_value):
leaked.append(args)
...
assert not leaked, f"Memory streams leaked: {[str(x.exc_value) for x in leaked]}"This inspects the actual warning message (Unclosed <MemoryObjectSendStream at 0x...>), which is what the code intends to be checking, and gives actionable failure output.
| await read_stream.aclose() | ||
| await write_stream_reader.aclose() |
There was a problem hiding this comment.
🟣 Not blocking — follow-up note. The same pattern this PR fixes exists in the server-side transports: server/stdio.py, server/websocket.py, and server/sse.py each create 4 stream ends but only close 2 (via inner async with in reader/writer tasks), with no outer try/finally — read_stream and write_stream are never closed by the transport itself. Ironically, server/stdio.py doesn't follow the client/stdio.py pattern this PR cites as the correct reference. This likely doesn't cause the current CI flakiness (server tests run in subprocesses, and tests/server/test_stdio.py manually closes both streams via async with), but it's worth a follow-up issue to apply the same fix on the server side for consistency.
Extended reasoning...
Summary
The server-side transport files have the exact same memory-stream leak pattern that this PR fixes on the client side. This is a pre-existing issue, not introduced by this PR, and does not need to block the PR — but it's directly relevant context that a reviewer should know about when approving a fix scoped to half of the affected code.
Affected files and the pattern
I verified each server transport against the code:
src/mcp/server/stdio.py (lines 52–83): Creates 4 stream ends via two create_memory_object_stream calls. The stdin_reader task uses async with read_stream_writer: and stdout_writer uses async with write_stream_reader:, so 2 ends are closed when the tasks complete. But read_stream and write_stream are yielded to the caller and never closed by the transport. There is no outer try/finally.
src/mcp/server/websocket.py (lines 28–58): Identical structure. ws_reader closes read_stream_writer, ws_writer closes write_stream_reader. read_stream and write_stream are never closed. No outer try/finally.
src/mcp/server/sse.py: Creates 6 stream ends (4 + an SSE pair). A subset is closed by inner task bodies; read_stream, write_stream, and sse_stream_reader are never explicitly closed.
Comparison with the "correct" pattern
The PR description cites client/stdio.py as the reference correct pattern. Lines 208–211 of client/stdio.py close all 4 ends in the finally block:
await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
await write_stream_reader.aclose()Ironically, server/stdio.py — the file most directly parallel to client/stdio.py — does not follow this pattern.
Why this doesn't cause current CI flakiness (addressing the counter-argument)
One might expect server-side leaks to cause the same knock-on ResourceWarning failures the PR describes. I investigated and this is largely not the case in the current test suite, for two reasons:
-
Subprocess isolation:
tests/shared/test_ws.pyandtests/shared/test_sse.pyrun the server viamultiprocessing.Process(see theserverfixture intests/shared/test_sse.pyaround line 119). Leaked memory streams live in the subprocess's memory and die with the subprocess whenproc.kill()is called — they never trigger aResourceWarningin the pytest process. -
Manual close in test code:
tests/server/test_stdio.pyrunsstdio_serverin-process, but the test explicitly wraps the yielded streams:async with read_stream:(line 30) andasync with write_stream:(line 49). So the test compensates for the transport's missing cleanup.
So the CI evidence cited in the PR description (client-side test_basic_resources → test_tool_progress cascade) is genuinely a client-side problem, and this PR fully addresses the observed flakiness.
Why it's still worth a follow-up
- Defense in depth: If someone adds an in-process server test that doesn't manually
async withthe yielded streams, the leak surfaces. - API contract: Relying on callers to close streams that the transport created is brittle. The transport owns the resource; it should own the cleanup.
- Consistency: After this PR, 4 client transports close all stream ends; 3 server transports don't. That asymmetry is a maintenance hazard.
Recommended fix (for the follow-up, not this PR)
Same pattern as this PR — add the missing aclose() calls in a finally block, or use the compound async with approach from client/websocket.py. Since anyio's aclose() is idempotent, double-close is safe when the caller has already closed its end.
|
|
||
|
|
||
| @pytest.mark.anyio | ||
| async def test_sse_client_closes_all_streams_on_connection_error(free_tcp_port: int) -> None: |
There was a problem hiding this comment.
🔴 The free_tcp_port fixture (used here and on line 91) was added in anyio 4.9.0, but pyproject.toml:28 declares anyio>=4.5. The lowest-direct CI matrix (.github/workflows/shared.yml:51-53) will resolve anyio below 4.9.0, so both tests will fail with fixture 'free_tcp_port' not found. Fix: add "anyio>=4.9" to the dev dependency group, or reuse the existing server_port fixture pattern from tests/shared/test_sse.py:49-52.
Extended reasoning...
What the bug is
Commit d375f3e switched the new regression tests to use anyio's free_tcp_port fixture (following @agronholm's review suggestion). This fixture was added in anyio 4.9.0. However, pyproject.toml:28 declares anyio>=4.5 as the project's floor, and the dev dependency group (lines 58-74) contains no tighter anyio constraint. Under the CI lowest-direct resolution strategy, anyio resolves to a version that lacks this fixture.
The code path that triggers it
.github/workflows/shared.yml defines the test matrix:
dep-resolution:
- name: lowest-direct
install-flags: "--upgrade --resolution lowest-direct"
- name: locked
install-flags: "--frozen"Line 68 applies this via uv sync ${{ matrix.dep-resolution.install-flags }} --all-extras. With --resolution lowest-direct, uv resolves direct dependencies to their lowest satisfying versions. Since anyio>=4.5 is a direct dependency, anyio is a candidate for downgrade.
No transitive dependency rescues this: httpx>=0.27.1 requires just anyio (unversioned), and starlette>=0.27 requires anyio<5,>=3.6.2. One verifier empirically ran uv sync --upgrade --resolution lowest-direct --all-extras and anyio landed on 4.7.0 — still below 4.9.0.
Why existing code doesn't prevent it
uv.lockpins anyio to 4.10.0, so thelockedmatrix (and local development) passes fine. This masks the issue.- No
conftest.pyin the repo defines a localfree_tcp_portfixture (grep confirms zero matches outside the new test file). - This is the first use of
free_tcp_portin the codebase — there was no prior constraint forcing anyio >= 4.9.
Step-by-step proof
I independently verified the version boundary by wheel inspection:
$ pip download anyio==4.8.0 --no-deps -d /tmp/anyio48
$ unzip -p /tmp/anyio48/anyio-4.8.0-py3-none-any.whl anyio/pytest_plugin.py | grep -c free_tcp_port
0 # ← absent in 4.8.0
$ pip download anyio==4.9.0 --no-deps -d /tmp/anyio49
$ unzip -p /tmp/anyio49/anyio-4.9.0-py3-none-any.whl anyio/pytest_plugin.py | grep -c free_tcp_port
5 # ← present in 4.9.0 (fixture + factory)So under lowest-direct resolution, pytest sees tests requesting free_tcp_port: int at lines 58 and 91, finds no matching fixture in the anyio plugin or any conftest, and fails collection:
ERRORS
fixture 'free_tcp_port' not found
Impact
test_sse_client_closes_all_streams_on_connection_error and test_websocket_client_closes_all_streams_on_connection_error will fail in every lowest-direct matrix entry (5 Python versions × 2 OSes = 10 jobs). Because shared.yml:47 sets continue-on-error: true, this won't hard-fail the workflow, but it defeats the purpose of the lowest-direct matrix and will show persistent red Xs.
How to fix
Two equally simple options:
(a) Add a tighter floor to the dev group:
[dependency-groups]
dev = [
"anyio>=4.9", # for the free_tcp_port pytest fixture
...
]This is the lightest fix — it doesn't bump the runtime floor, only the test-time one.
(b) Reuse the existing local pattern from tests/shared/test_sse.py:49-52 (bind a socket to port 0 and return the ephemeral port). This avoids the version dependency entirely.
There was a problem hiding this comment.
It's quite impressive that the bot picked up on that.
There was a problem hiding this comment.
haha yea pretty good. I was trying to think whether it's worth upgrading for this or not
Summary
Client transports for SSE, WebSocket, and StreamableHTTP create 4 anyio memory stream ends (2 paired streams) but only closed 2 in their
finallyblocks. anyio memory stream ends are independent — closing the writer does not close the reader. Unclosed stream ends leak and emitResourceWarningwhen garbage collected.This caused flaky CI failures: a transport connection error (404, 403,
ConnectError) in one test would leak streams, then GC in a later unrelated test would triggerResourceWarning, which pytest'sfilterwarnings = ["error"]promotes to a test failure — in whatever test happened to be running when GC fired, not the test that actually leaked.Fix
Follows the existing correct pattern in
stdio.py(which closes all 4 ends on both early-fail and normal-exit paths):sse.pyfinallyclosed 2 of 4finallycloses all 4streamable_http.pyfinallyclosed 2 of 4 —read_streamwas never closed, even on happy pathfinallycloses all 4websocket.pytry/finallyat all — ifws_connect()raised, all 4 leakedtry/finallythat closes all 4anyio's
aclose()is idempotent, so double-closing (e.g. when reader/writer tasks already closed their end) is safe.Tests
Added
tests/client/test_transport_stream_cleanup.pywith one regression test per transport. Each test triggers the error/exit path, then callsgc.collect()to force any leaked stream to emitResourceWarningdeterministically. All 3 tests fail onmainwithResourceWarning: Unclosed <MemoryObjectReceiveStream>and pass with this fix.CI Evidence of the Flakiness
test_tool_progressknock-on failure — streams leaked bytest_basic_resources(which got a 404), GC'd duringtest_tool_progresstest_call_toolknock-on failure — in-memory test that doesn't touch the network, failed because an earlier test on the same worker leaked resourcesAI Disclaimer