首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >MCP源码分析:SSE

MCP源码分析:SSE

作者头像
golangLeetcode
发布2026-03-18 17:40:56
发布2026-03-18 17:40:56
1110
举报

启动一个sse的mcp server的过程前三步和stdio的流程一模一样:定义server描述,定义tool描述,将tool注册到server的map里

代码语言:javascript
复制
mcpServer := server.NewMCPServer(
        "example-server",
        "1.0.0",
        server.WithResourceCapabilities(true, true),
        server.WithPromptCapabilities(true),
        server.WithToolCapabilities(true),
    )
    // Add echo tool
    mcpServer.AddTool(mcp.NewTool("echo",
        mcp.WithDescription("Echo back the input"),
        mcp.WithString("message",
            mcp.Required(),
            mcp.Description("Message to echo back"),
        ),
    ), echoHandler)

后面不同的是下面两步

4,定义SSE server

代码语言:javascript
复制
sseServer := s.ServeSSE("localhost:8080")
代码语言:javascript
复制
func (s *MCPServer) ServeSSE(addr string) *server.SSEServer {
    return server.NewSSEServer(s.server,
        server.WithBaseURL(fmt.Sprintf("http://%s", addr)),
    )
}

5,启动服务

代码语言:javascript
复制
if err := sseServer.Start(":8080"); err != nil {

创建SSEServer和StdioServer流程几乎一样,知不多多了endpoint和messagepoint两个常量,它是SSE的uri路径

代码语言:javascript
复制
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
    s := &SSEServer{
        server:                       server,
        sseEndpoint:                  "/sse",
        messageEndpoint:              "/message",
        useFullURLForMessageEndpoint: true,
        keepAlive:                    false,
        keepAliveInterval:            10 * time.Second,
    }

SSEServer没有用map来保存路径到工具的映射,里面封装了一个http server,因为它是基于http协议的长链接服务。

代码语言:javascript
复制
type SSEServer struct {
    server                       *MCPServer
    baseURL                      string
    basePath                     string
    appendQueryToMessageEndpoint bool
    useFullURLForMessageEndpoint bool
    messageEndpoint              string
    sseEndpoint                  string
    sessions                     sync.Map
    srv                          *http.Server
    contextFunc                  HTTPContextFunc
    dynamicBasePathFunc          DynamicBasePathFunc
    keepAlive         bool
    keepAliveInterval time.Duration
    mu sync.RWMutex
}

它的start函数就是http的监听函数

代码语言:javascript
复制
func (s *SSEServer) Start(addr string) error {
    s.mu.Lock()
    if s.srv == nil {
        s.srv = &http.Server{
            Addr:    addr,
            Handler: s,
        }
    } else {
        if s.srv.Addr == "" {
            s.srv.Addr = addr
        } else if s.srv.Addr != addr {
            return fmt.Errorf("conflicting listen address: WithHTTPServer(%q) vs Start(%q)", s.srv.Addr, addr)
        }
    }
    srv := s.srv
    s.mu.Unlock()
    return srv.ListenAndServe()
}

/sse和/messages的路由注册函数如下

代码语言:javascript
复制
func (s *SSEServer) SSEHandler() http.Handler {
    return http.HandlerFunc(s.handleSSE)
}
代码语言:javascript
复制
func (s *SSEServer) MessageHandler() http.Handler {
    return http.HandlerFunc(s.handleMessage)
}

最后我们看看ServeHttp函数,可以看到,在函数内部直接判断了路径,如果路径匹配直接调用对应的处理函数。如果不是多租户的动态路径,我们没有必要注册路由。

代码语言:javascript
复制
func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    ssePath := s.CompleteSsePath()
    if ssePath != "" && path == ssePath {
        s.handleSSE(w, r)
    messagePath := s.CompleteMessagePath()
    if messagePath != "" && path == messagePath {
        s.handleMessage(w, r)
    http.NotFound(w, r)

接着看看handleSSE函数的具体实现,定义了消息的类型是text/event-dtream,每次调用生成一个uuid作为sessionId。通过定时器不断发送ping请求和clien维持长连接。把sessionId拼接到/messages的url给客户端返回。

代码语言:javascript
复制
func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodGet {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")
    
    sessionID := uuid.New().String()
    
           for {
                select {
                case <-ticker.C:
                    message := mcp.JSONRPCRequest{
                        JSONRPC: "2.0",
                        ID:      session.requestID.Add(1),
                        Request: mcp.Request{
                            Method: "ping",
                        },
                    }
                    messageBytes, _ := json.Marshal(message)
                    pingMsg := fmt.Sprintf("event: message\ndata:%s\n\n", messageBytes)
                    select {
                    case session.eventQueue <- pingMsg:
                    
    endpoint := s.GetMessageEndpointForClient(r, sessionID)
    if s.appendQueryToMessageEndpoint && len(r.URL.RawQuery) > 0 {
        endpoint += "&" + r.URL.RawQuery
    }
    fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", endpoint)

handleMessage方法,会根据用户请求中的sessionId去加载session信息,然后用json-rpc协议解析请求,然后启动协程来处理请求。

代码语言:javascript
复制
func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed")
        return
    }
    sessionID := r.URL.Query().Get("sessionId")
    if sessionID == "" {
        s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId")
        return
    }
    sessionI, ok := s.sessions.Load(sessionID)
    if !ok {
        s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID")
        return
    }
    session := sessionI.(*sseSession)
    
    var rawMessage json.RawMessage
    if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
        s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error")
        return
    }
    go func(ctx context.Context) {
        defer cancel()
        // Use the context that will be canceled when session is done
        // Process message through MCPServer
        response := s.server.HandleMessage(ctx, rawMessage)
        // Only send response if there is one (not for notifications)
        if response != nil {
            var message string
            if eventData, err := json.Marshal(response); err != nil {

最后处理消息的函数和stdio方式是同一个函数,根据不同method进行分发处理。

代码语言:javascript
复制
func (s *MCPServer) HandleMessage(
    ctx context.Context,
    message json.RawMessage,
) mcp.JSONRPCMessage {
     switch baseMessage.Method {
     case mcp.MethodInitialize:
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档