一、核心组件分析
1. WebFluxSseServerTransportProvider
作用与职责
WebFluxSseServerTransportProvider 是 MCP Server 的传输层提供者,负责处理客户端与服务器之间的通信。它实现了基于 Spring WebFlux 和 Server-Sent Events 的响应式通信机制。
源码实现
@Bean
public WebFluxSseServerTransportProvider customerTransportProvider(ObjectMapper objectMapper){
return new WebFluxSseServerTransportProvider(
objectMapper,
"/customer/mcp/message", // HTTP 消息端点
"/customer/mcp/sse" // Server-Sent Events 端点
);
}
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
String sseEndpoint) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(baseUrl, "Message base path must not be null");
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
this.objectMapper = objectMapper;
this.baseUrl = baseUrl;
this.messageEndpoint = messageEndpoint;
this.sseEndpoint = sseEndpoint;
this.routerFunction = RouterFunctions.route()
.GET(this.sseEndpoint, this::handleSseConnection)
.POST(this.messageEndpoint, this::handleMessage)
.build();
}
private Mono<ServerResponse> handleSseConnection(ServerRequest request) {
if (isClosing) {
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
}
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.<ServerSentEvent<?>>create(sink -> {
WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);
McpServerSession session = sessionFactory.create(sessionTransport);
String sessionId = session.getId();
logger.debug("Created new SSE connection for session: {}", sessionId);
sessions.put(sessionId, session);
// Send initial endpoint event
logger.debug("Sending initial endpoint event to session: {}", sessionId);
sink.next(ServerSentEvent.builder()
.event(ENDPOINT_EVENT_TYPE)
.data(this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId)
.build());
sink.onCancel(() -> {
logger.debug("Session {} cancelled", sessionId);
sessions.remove(sessionId);
});
}), ServerSentEvent.class);
}
核心机制:
- 会话管理:为每个 SSE 连接创建独立的会话
- 生命周期管理:通过 sink.onCancel() 处理连接断开
- 初始事件发送:连接建立后立即发送端点信息
- 资源清理:会话取消时自动清理资源
private Mono<ServerResponse> handleMessage(ServerRequest request) {
if (isClosing) {
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
}
if (request.queryParam("sessionId").isEmpty()) {
return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing in message endpoint"));
}
McpServerSession session = sessions.get(request.queryParam("sessionId").get());
if (session == null) {
return ServerResponse.status(HttpStatus.NOT_FOUND)
.bodyValue(new McpError("Session not found: " + request.queryParam("sessionId").get()));
}
return request.bodyToMono(String.class).flatMap(body -> {
try {
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
return session.handle(message).flatMap(response -> ServerResponse.ok().build()).onErrorResume(error -> {
logger.error("Error processing message: {}", error.getMessage());
// TODO: instead of signalling the error, just respond with 200 OK
// - the error is signalled on the SSE connection
// return ServerResponse.ok().build();
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(new McpError(error.getMessage()));
});
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
}
});
}
关键特性
- /customer/mcp/message:处理 HTTP 请求/响应
- /customer/mcp/sse:处理 Server-Sent Events 流式通信
- 会话关联:通过 sessionId 将 HTTP 请求与 SSE 连接关联
2. RouterFunction
作用与职责
RouterFunction 是 Spring WebFlux 的路由函数,负责将 HTTP 请求路由到对应的处理器。
源码实现
@Bean
public RouterFunction<ServerResponse> mvcMcpRouterFunction(
ObjectProvider<List<WebFluxSseServerTransportProvider>> transportProviderList) {
List<WebFluxSseServerTransportProvider> transportProviders =
transportProviderList.getIfAvailable();
if (CollectionUtils.isEmpty(transportProviders)) {
return RouterFunctions.route().build();
}
RouterFunction<ServerResponse> combinedRouter = null;
for (WebFluxSseServerTransportProvider transportProvider : transportProviders) {
RouterFunction<ServerResponse> transportProviderRouterFunction =
(RouterFunction<ServerResponse>) transportProvider.getRouterFunction();
RouterFunction<ServerResponse> routerFunction = transportProviderRouterFunction
.filter((request, next) -> {
// 可以在这里添加请求过滤逻辑
return next.handle(request);
});
// 组合多个路由
combinedRouter = (combinedRouter == null) ?
routerFunction : combinedRouter.and(routerFunction);
}
return combinedRouter;
}
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
String sseEndpoint) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(baseUrl, "Message base path must not be null");
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
this.objectMapper = objectMapper;
this.baseUrl = baseUrl;
this.messageEndpoint = messageEndpoint;
this.sseEndpoint = sseEndpoint;
this.routerFunction = RouterFunctions.route()
.GET(this.sseEndpoint, this::handleSseConnection)
.POST(this.messageEndpoint, this::handleMessage)
.build();
}
关键特性
- 动态路由组合:支持多个 TransportProvider 的路由组合
- 请求过滤:可以在路由层面添加过滤逻辑
- 响应式路由:基于 WebFlux 的函数式路由
3. McpSyncServer
作用与职责
McpSyncServer 是 MCP 协议的核心服务器实现,负责处理工具调用、提示词管理等核心功能。
源码实现
@Bean
public McpSyncServer customerSyncServer(
WebFluxSseServerTransportProvider customerTransportProvider,
ObjectProvider<BiConsumer<McpSyncServerExchange, List<McpSchema.Root>>> rootsChangeConsumers,
CustomerService customerService,
List<McpServerFeatures.SyncPromptSpecification> myPrompts){
return getMcpSyncServer("customer", myPrompts, customerTransportProvider,
rootsChangeConsumers, customerService);
}
核心构建方法
private McpSyncServer getMcpSyncServer(String name,
List<McpServerFeatures.SyncPromptSpecification> promptSpecifications,
McpServerTransportProvider serverTransportProvider,
ObjectProvider<BiConsumer<McpSyncServerExchange, List<McpSchema.Root>>> rootsChangeConsumers,
CustomerService customerService) {
// 1. 构建服务器能力
McpSchema.ServerCapabilities.Builder capabilitiesBuilder =
McpSchema.ServerCapabilities.builder();
// 2. 构建工具提供者
MethodToolCallbackProvider build = MethodToolCallbackProvider.builder()
.toolObjects(customerService).build();
// 3. 服务器信息
McpSchema.Implementation serverInfo = new McpSchema.Implementation(
"mcp-server-" + name, "1.0.0");
// 4. 构建服务器规范
McpServer.SyncSpecification serverBuilder = McpServer.sync(serverTransportProvider)
.serverInfo(serverInfo);
// 5. 注册工具
List<McpServerFeatures.SyncToolSpecification> syncToolSpecifications =
Arrays.stream(build.getToolCallbacks())
.map(McpToolUtils::toSyncToolSpecification)
.toList();
if (!CollectionUtils.isEmpty(syncToolSpecifications)) {
serverBuilder.tools(syncToolSpecifications);
capabilitiesBuilder.tools(true);
}
// 6. 注册提示词
if (Objects.nonNull(promptSpecifications)){
serverBuilder.prompts(promptSpecifications);
capabilitiesBuilder.prompts(true);
} else {
serverBuilder.prompts(Collections.emptyList());
capabilitiesBuilder.prompts(true);
}
// 7. 注册根节点变化处理器
rootsChangeConsumers.ifAvailable(consumer -> {
serverBuilder.rootsChangeHandler((exchange, roots) -> {
consumer.accept(exchange, roots);
});
});
// 8. 设置能力并构建
serverBuilder.capabilities(capabilitiesBuilder.build());
return serverBuilder.build();
}
public McpSyncServer build() {
McpServerFeatures.Sync syncFeatures = new McpServerFeatures.Sync(this.serverInfo, this.serverCapabilities,
this.tools, this.resources, this.resourceTemplates, this.prompts, this.rootsChangeHandlers,
this.instructions);
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures);
var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
var asyncServer = new McpAsyncServer(this.transportProvider, mapper, asyncFeatures);
return new McpSyncServer(asyncServer);
}
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
McpServerFeatures.Async features) {
this.mcpTransportProvider = mcpTransportProvider;
this.objectMapper = objectMapper;
this.serverInfo = features.serverInfo();
this.serverCapabilities = features.serverCapabilities();
this.instructions = features.instructions();
this.tools.addAll(features.tools());
this.resources.putAll(features.resources());
this.resourceTemplates.addAll(features.resourceTemplates());
this.prompts.putAll(features.prompts());
Map<String, McpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>();
// Initialize request handlers for standard MCP methods
// Ping MUST respond with an empty data, but not NULL response.
requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(Map.of()));
// Add tools API handlers if the tool capability is enabled
if (this.serverCapabilities.tools() != null) {
requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, toolsListRequestHandler());
requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, toolsCallRequestHandler());
}
// Add resources API handlers if provided
if (this.serverCapabilities.resources() != null) {
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
}
// Add prompts API handlers if provider exists
if (this.serverCapabilities.prompts() != null) {
requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, promptsListRequestHandler());
requestHandlers.put(McpSchema.METHOD_PROMPT_GET, promptsGetRequestHandler());
}
// Add logging API handlers if the logging capability is enabled
if (this.serverCapabilities.logging() != null) {
requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, setLoggerRequestHandler());
}
Map<String, McpServerSession.NotificationHandler> notificationHandlers = new HashMap<>();
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features
.rootsChangeConsumers();
if (Utils.isEmpty(rootsChangeConsumers)) {
rootsChangeConsumers = List.of((exchange,
roots) -> Mono.fromRunnable(() -> logger.warn(
"Roots list changed notification, but no consumers provided. Roots list changed: {}",
roots)));
}
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,
asyncRootsListChangedNotificationHandler(rootsChangeConsumers));
mcpTransportProvider
.setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(), transport,
this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));
}
二、一个服务启动多个MCP Server
@Bean
public WebFluxSseServerTransportProvider customerTransportProvider(ObjectMapper objectMapper){
return new WebFluxSseServerTransportProvider(objectMapper,"/customer/mcp/message","/customer/mcp/sse");
}
@Bean
public WebFluxSseServerTransportProvider weatherTransportProvider(ObjectMapper objectMapper){
return new WebFluxSseServerTransportProvider(objectMapper,"/weather/mcp/message","/weather/mcp/sse");
}
关键点:Spring AI 的自动配置类
McpServerAutoConfiguration 我们需要禁用自动配置。
@SpringBootApplication(scanBasePackages = "org.springframework.ai.mcp.sample.server", exclude = {
org.springframework.ai.mcp.server.autoconfigure.McpServerAutoConfiguration.class
})
public class McpServerApplication {
public static void main(String[] args) {
SpringApplication.run(McpServerApplication.class, args);
}
}