-
Notifications
You must be signed in to change notification settings - Fork 840
Description
Problem
StdioClientTransport.sendMessage() uses tryEmitNext() on a unicast Reactor sink. When two threads call sendMessage() concurrently on the same transport, one call fails with "Failed to enqueue message" due to FAIL_NON_SERIALIZED.
This happens because Sinks.many().unicast().onBackpressureBuffer() is wrapped by Reactor's SinkManySerialized, which uses a CAS-based guard in tryEmitNext(). When two threads race on the CAS, the loser immediately gets FAIL_NON_SERIALIZED — the method does not retry.
When this occurs
Any time two threads call McpSyncClient.callTool() (or any other method that calls sendMessage()) concurrently on the same client instance. This is a normal scenario when an MCP server proxies tool calls from parallel requests to a downstream MCP server via stdio transport.
Current code
// StdioClientTransport.java line 229
public Mono<Void> sendMessage(JSONRPCMessage message) {
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
return Mono.empty();
}
else {
return Mono.error(new RuntimeException("Failed to enqueue message"));
}
}The existing TODO comment at line 230 acknowledges this limitation:
"we delegate the retry and the backpressure onto the caller. This might be enough for most cases."
Proposed fix
Replace tryEmitNext with emitNext + busyLooping, which spin-retries on FAIL_NON_SERIALIZED until the competing thread finishes its CAS (microseconds):
outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));This is the pattern recommended by Reactor's own Javadoc for emitNext:
"It would be possible for an EmitFailureHandler to busy-loop and optimistically wait for the contention to disappear"
Reproduction
A test with two threads sending messages concurrently through the same transport reproduces the failure in ~90% of runs.