MCP Server核心源码解析以及多MCP Server自定义实现

一、核心组件分析

1. WebFluxSseServerTransportProvider

作用与职责


WebFluxSseServerTransportProvider 是 MCP Server 的传输层提供者,负责处理客户端与服务器之间的通信。它实现了基于 Spring WebFlux 和 Server-Sent Events 的响应式通信机制。

源码实现

@Bean
public WebFluxSseServerTransportProvider customerTransportProvider(ObjectMapper objectMapper){
    return new WebFluxSseServerTransportProvider(
        objectMapper,
        "/customer/mcp/message",  // HTTP 消息端点
        "/customer/mcp/sse"       // Server-Sent Events 端点
    );
}
	public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
			String sseEndpoint) {
		Assert.notNull(objectMapper, "ObjectMapper must not be null");
		Assert.notNull(baseUrl, "Message base path must not be null");
		Assert.notNull(messageEndpoint, "Message endpoint must not be null");
		Assert.notNull(sseEndpoint, "SSE endpoint must not be null");

		this.objectMapper = objectMapper;
		this.baseUrl = baseUrl;
		this.messageEndpoint = messageEndpoint;
		this.sseEndpoint = sseEndpoint;
		this.routerFunction = RouterFunctions.route()
			.GET(this.sseEndpoint, this::handleSseConnection)
			.POST(this.messageEndpoint, this::handleMessage)
			.build();
	}
private Mono<ServerResponse> handleSseConnection(ServerRequest request) {
		if (isClosing) {
			return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
		}

		return ServerResponse.ok()
			.contentType(MediaType.TEXT_EVENT_STREAM)
			.body(Flux.<ServerSentEvent<?>>create(sink -> {
				WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);

				McpServerSession session = sessionFactory.create(sessionTransport);
				String sessionId = session.getId();

				logger.debug("Created new SSE connection for session: {}", sessionId);
				sessions.put(sessionId, session);

				// Send initial endpoint event
				logger.debug("Sending initial endpoint event to session: {}", sessionId);
				sink.next(ServerSentEvent.builder()
					.event(ENDPOINT_EVENT_TYPE)
					.data(this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId)
					.build());
				sink.onCancel(() -> {
					logger.debug("Session {} cancelled", sessionId);
					sessions.remove(sessionId);
				});
			}), ServerSentEvent.class);
	}

核心机制:

  • 会话管理:为每个 SSE 连接创建独立的会话
  • 生命周期管理:通过 sink.onCancel() 处理连接断开
  • 初始事件发送:连接建立后立即发送端点信息
  • 资源清理:会话取消时自动清理资源
private Mono<ServerResponse> handleMessage(ServerRequest request) {
		if (isClosing) {
			return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
		}

		if (request.queryParam("sessionId").isEmpty()) {
			return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing in message endpoint"));
		}

		McpServerSession session = sessions.get(request.queryParam("sessionId").get());

		if (session == null) {
			return ServerResponse.status(HttpStatus.NOT_FOUND)
				.bodyValue(new McpError("Session not found: " + request.queryParam("sessionId").get()));
		}

		return request.bodyToMono(String.class).flatMap(body -> {
			try {
				McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
				return session.handle(message).flatMap(response -> ServerResponse.ok().build()).onErrorResume(error -> {
					logger.error("Error processing  message: {}", error.getMessage());
					// TODO: instead of signalling the error, just respond with 200 OK
					// - the error is signalled on the SSE connection
					// return ServerResponse.ok().build();
					return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
						.bodyValue(new McpError(error.getMessage()));
				});
			}
			catch (IllegalArgumentException | IOException e) {
				logger.error("Failed to deserialize message: {}", e.getMessage());
				return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
			}
		});
	}

关键特性

  • /customer/mcp/message:处理 HTTP 请求/响应
  • /customer/mcp/sse:处理 Server-Sent Events 流式通信
  • 会话关联:通过 sessionId 将 HTTP 请求与 SSE 连接关联

2. RouterFunction

作用与职责

RouterFunction 是 Spring WebFlux 的路由函数,负责将 HTTP 请求路由到对应的处理器。

源码实现

@Bean
public RouterFunction<ServerResponse> mvcMcpRouterFunction(
    ObjectProvider<List<WebFluxSseServerTransportProvider>> transportProviderList) {
    
    List<WebFluxSseServerTransportProvider> transportProviders = 
        transportProviderList.getIfAvailable();
    
    if (CollectionUtils.isEmpty(transportProviders)) {
        return RouterFunctions.route().build();
    }
    
    RouterFunction<ServerResponse> combinedRouter = null;
    
    for (WebFluxSseServerTransportProvider transportProvider : transportProviders) {
        RouterFunction<ServerResponse> transportProviderRouterFunction = 
            (RouterFunction<ServerResponse>) transportProvider.getRouterFunction();
        
        RouterFunction<ServerResponse> routerFunction = transportProviderRouterFunction
            .filter((request, next) -> {
                // 可以在这里添加请求过滤逻辑
                return next.handle(request);
            });
        
        // 组合多个路由
        combinedRouter = (combinedRouter == null) ? 
            routerFunction : combinedRouter.and(routerFunction);
    }
    
    return combinedRouter;
}
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
			String sseEndpoint) {
		Assert.notNull(objectMapper, "ObjectMapper must not be null");
		Assert.notNull(baseUrl, "Message base path must not be null");
		Assert.notNull(messageEndpoint, "Message endpoint must not be null");
		Assert.notNull(sseEndpoint, "SSE endpoint must not be null");

		this.objectMapper = objectMapper;
		this.baseUrl = baseUrl;
		this.messageEndpoint = messageEndpoint;
		this.sseEndpoint = sseEndpoint;
		this.routerFunction = RouterFunctions.route()
			.GET(this.sseEndpoint, this::handleSseConnection)
			.POST(this.messageEndpoint, this::handleMessage)
			.build();
	}

关键特性

  • 动态路由组合:支持多个 TransportProvider 的路由组合
  • 请求过滤:可以在路由层面添加过滤逻辑
  • 响应式路由:基于 WebFlux 的函数式路由

3. McpSyncServer

作用与职责

McpSyncServer 是 MCP 协议的核心服务器实现,负责处理工具调用、提示词管理等核心功能。

源码实现

@Bean
public McpSyncServer customerSyncServer(
    WebFluxSseServerTransportProvider customerTransportProvider,
    ObjectProvider<BiConsumer<McpSyncServerExchange, List<McpSchema.Root>>> rootsChangeConsumers,
    CustomerService customerService,
    List<McpServerFeatures.SyncPromptSpecification> myPrompts){

    return getMcpSyncServer("customer", myPrompts, customerTransportProvider, 
                           rootsChangeConsumers, customerService);
}

核心构建方法

private McpSyncServer getMcpSyncServer(String name,
                                       List<McpServerFeatures.SyncPromptSpecification> promptSpecifications,
                                       McpServerTransportProvider serverTransportProvider,
                                       ObjectProvider<BiConsumer<McpSyncServerExchange, List<McpSchema.Root>>> rootsChangeConsumers,
                                       CustomerService customerService) {

    // 1. 构建服务器能力
    McpSchema.ServerCapabilities.Builder capabilitiesBuilder = 
        McpSchema.ServerCapabilities.builder();

    // 2. 构建工具提供者
    MethodToolCallbackProvider build = MethodToolCallbackProvider.builder()
        .toolObjects(customerService).build();

    // 3. 服务器信息
    McpSchema.Implementation serverInfo = new McpSchema.Implementation(
        "mcp-server-" + name, "1.0.0");

    // 4. 构建服务器规范
    McpServer.SyncSpecification serverBuilder = McpServer.sync(serverTransportProvider)
        .serverInfo(serverInfo);

    // 5. 注册工具
    List<McpServerFeatures.SyncToolSpecification> syncToolSpecifications = 
        Arrays.stream(build.getToolCallbacks())
            .map(McpToolUtils::toSyncToolSpecification)
            .toList();

    if (!CollectionUtils.isEmpty(syncToolSpecifications)) {
        serverBuilder.tools(syncToolSpecifications);
        capabilitiesBuilder.tools(true);
    }

    // 6. 注册提示词
    if (Objects.nonNull(promptSpecifications)){
        serverBuilder.prompts(promptSpecifications);
        capabilitiesBuilder.prompts(true);
    } else {
        serverBuilder.prompts(Collections.emptyList());
        capabilitiesBuilder.prompts(true);
    }

    // 7. 注册根节点变化处理器
    rootsChangeConsumers.ifAvailable(consumer -> {
        serverBuilder.rootsChangeHandler((exchange, roots) -> {
            consumer.accept(exchange, roots);
        });
    });

    // 8. 设置能力并构建
    serverBuilder.capabilities(capabilitiesBuilder.build());
    return serverBuilder.build();
}
	public McpSyncServer build() {
			McpServerFeatures.Sync syncFeatures = new McpServerFeatures.Sync(this.serverInfo, this.serverCapabilities,
					this.tools, this.resources, this.resourceTemplates, this.prompts, this.rootsChangeHandlers,
					this.instructions);
			McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures);
			var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
			var asyncServer = new McpAsyncServer(this.transportProvider, mapper, asyncFeatures);

			return new McpSyncServer(asyncServer);
		}
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
				McpServerFeatures.Async features) {
			this.mcpTransportProvider = mcpTransportProvider;
			this.objectMapper = objectMapper;
			this.serverInfo = features.serverInfo();
			this.serverCapabilities = features.serverCapabilities();
			this.instructions = features.instructions();
			this.tools.addAll(features.tools());
			this.resources.putAll(features.resources());
			this.resourceTemplates.addAll(features.resourceTemplates());
			this.prompts.putAll(features.prompts());

			Map<String, McpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>();

			// Initialize request handlers for standard MCP methods

			// Ping MUST respond with an empty data, but not NULL response.
			requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(Map.of()));

			// Add tools API handlers if the tool capability is enabled
			if (this.serverCapabilities.tools() != null) {
				requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, toolsListRequestHandler());
				requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, toolsCallRequestHandler());
			}

			// Add resources API handlers if provided
			if (this.serverCapabilities.resources() != null) {
				requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
				requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
				requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
			}

			// Add prompts API handlers if provider exists
			if (this.serverCapabilities.prompts() != null) {
				requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, promptsListRequestHandler());
				requestHandlers.put(McpSchema.METHOD_PROMPT_GET, promptsGetRequestHandler());
			}

			// Add logging API handlers if the logging capability is enabled
			if (this.serverCapabilities.logging() != null) {
				requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, setLoggerRequestHandler());
			}

			Map<String, McpServerSession.NotificationHandler> notificationHandlers = new HashMap<>();

			notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());

			List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features
				.rootsChangeConsumers();

			if (Utils.isEmpty(rootsChangeConsumers)) {
				rootsChangeConsumers = List.of((exchange,
						roots) -> Mono.fromRunnable(() -> logger.warn(
								"Roots list changed notification, but no consumers provided. Roots list changed: {}",
								roots)));
			}

			notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,
					asyncRootsListChangedNotificationHandler(rootsChangeConsumers));

			mcpTransportProvider
				.setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(), transport,
						this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));
		}

二、一个服务启动多个MCP Server

    @Bean
    public WebFluxSseServerTransportProvider customerTransportProvider(ObjectMapper objectMapper){

        return new WebFluxSseServerTransportProvider(objectMapper,"/customer/mcp/message","/customer/mcp/sse");

    }


    @Bean
    public WebFluxSseServerTransportProvider weatherTransportProvider(ObjectMapper objectMapper){

        return new WebFluxSseServerTransportProvider(objectMapper,"/weather/mcp/message","/weather/mcp/sse");

    }

关键点:Spring AI 的自动配置类
McpServerAutoConfiguration 我们需要禁用自动配置。

@SpringBootApplication(scanBasePackages = "org.springframework.ai.mcp.sample.server", exclude = {
		org.springframework.ai.mcp.server.autoconfigure.McpServerAutoConfiguration.class
})
public class McpServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(McpServerApplication.class, args);
	}


}
原文链接:,转发请注明来源!