Fix concurrent sendMessage race in StdioClientTransport#876
Open
AbbasNS wants to merge 2 commits intomodelcontextprotocol:mainfrom
Open
Fix concurrent sendMessage race in StdioClientTransport#876AbbasNS wants to merge 2 commits intomodelcontextprotocol:mainfrom
AbbasNS wants to merge 2 commits intomodelcontextprotocol:mainfrom
Conversation
When two threads call sendMessage() concurrently on the same StdioClientTransport, the unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED via its CAS guard, causing "Failed to enqueue message". This test reproduces the race: 19/20 repetitions fail. Closes modelcontextprotocol#875
Replace tryEmitNext (fail-fast) with emitNext + busyLooping(100ms) in StdioClientTransport.sendMessage(). The unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently. busyLooping retries the CAS spin instead of immediately failing, making concurrent sends safe. The contention window is microseconds (single CAS operation), so the 100ms duration is just a generous upper bound for pathological cases like GC pauses. Before: 19/20 test repetitions fail After: 20/20 pass Closes modelcontextprotocol#875
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
StdioClientTransport.sendMessage()usestryEmitNext()on a unicast Reactor sink (Sinks.many().unicast().onBackpressureBuffer()). When two threads callsendMessage()concurrently on the same transport, one fails with"Failed to enqueue message".Root cause
Reactor wraps the unicast sink in
SinkManySerialized, which uses a CAS-based guard intryEmitNext(). When two threads race on the CAS, the loser immediately getsFAIL_NON_SERIALIZED— the method does not retry. The current code checks only.isSuccess()and returnsMono.errorfor any failure, discarding the retryableFAIL_NON_SERIALIZEDresult.The existing TODO at line 230-235 acknowledges this:
It is not enough when an MCP server proxies concurrent tool calls to a downstream server via stdio — each call dispatches on a separate reactor thread and both call
sendMessage()on the same transport.Fix
Replace
tryEmitNextwithemitNext+Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)).emitNextwithbusyLoopingspin-retries onFAIL_NON_SERIALIZEDuntil the competing thread finishes its CAS (microseconds in practice). The 100ms duration is just a generous upper bound for pathological cases like GC pauses. This is the approach recommended by Reactor's ownemitNextJavadoc:For non-retryable failures (
FAIL_TERMINATED,FAIL_CANCELLED),emitNextthrowsEmissionExceptionwhich is caught and converted toMono.error, preserving the existing error behavior.Test
StdioClientTransportConcurrencyTestreproduces the race with two threads sending through the same transport concurrently (20 repetitions):Closes #875