
启动一个sse的mcp server的过程前三步和stdio的流程一模一样:定义server描述,定义tool描述,将tool注册到server的map里
mcpServer := server.NewMCPServer(
"example-server",
"1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
)
// Add echo tool
mcpServer.AddTool(mcp.NewTool("echo",
mcp.WithDescription("Echo back the input"),
mcp.WithString("message",
mcp.Required(),
mcp.Description("Message to echo back"),
),
), echoHandler)后面不同的是下面两步
4,定义SSE server
sseServer := s.ServeSSE("localhost:8080")
func (s *MCPServer) ServeSSE(addr string) *server.SSEServer {
return server.NewSSEServer(s.server,
server.WithBaseURL(fmt.Sprintf("http://%s", addr)),
)
}5,启动服务
if err := sseServer.Start(":8080"); err != nil {创建SSEServer和StdioServer流程几乎一样,知不多多了endpoint和messagepoint两个常量,它是SSE的uri路径
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
s := &SSEServer{
server: server,
sseEndpoint: "/sse",
messageEndpoint: "/message",
useFullURLForMessageEndpoint: true,
keepAlive: false,
keepAliveInterval: 10 * time.Second,
}SSEServer没有用map来保存路径到工具的映射,里面封装了一个http server,因为它是基于http协议的长链接服务。
type SSEServer struct {
server *MCPServer
baseURL string
basePath string
appendQueryToMessageEndpoint bool
useFullURLForMessageEndpoint bool
messageEndpoint string
sseEndpoint string
sessions sync.Map
srv *http.Server
contextFunc HTTPContextFunc
dynamicBasePathFunc DynamicBasePathFunc
keepAlive bool
keepAliveInterval time.Duration
mu sync.RWMutex
}它的start函数就是http的监听函数
func (s *SSEServer) Start(addr string) error {
s.mu.Lock()
if s.srv == nil {
s.srv = &http.Server{
Addr: addr,
Handler: s,
}
} else {
if s.srv.Addr == "" {
s.srv.Addr = addr
} else if s.srv.Addr != addr {
return fmt.Errorf("conflicting listen address: WithHTTPServer(%q) vs Start(%q)", s.srv.Addr, addr)
}
}
srv := s.srv
s.mu.Unlock()
return srv.ListenAndServe()
}/sse和/messages的路由注册函数如下
func (s *SSEServer) SSEHandler() http.Handler {
return http.HandlerFunc(s.handleSSE)
}func (s *SSEServer) MessageHandler() http.Handler {
return http.HandlerFunc(s.handleMessage)
}最后我们看看ServeHttp函数,可以看到,在函数内部直接判断了路径,如果路径匹配直接调用对应的处理函数。如果不是多租户的动态路径,我们没有必要注册路由。
func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ssePath := s.CompleteSsePath()
if ssePath != "" && path == ssePath {
s.handleSSE(w, r)
messagePath := s.CompleteMessagePath()
if messagePath != "" && path == messagePath {
s.handleMessage(w, r)
http.NotFound(w, r)接着看看handleSSE函数的具体实现,定义了消息的类型是text/event-dtream,每次调用生成一个uuid作为sessionId。通过定时器不断发送ping请求和clien维持长连接。把sessionId拼接到/messages的url给客户端返回。
func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
sessionID := uuid.New().String()
for {
select {
case <-ticker.C:
message := mcp.JSONRPCRequest{
JSONRPC: "2.0",
ID: session.requestID.Add(1),
Request: mcp.Request{
Method: "ping",
},
}
messageBytes, _ := json.Marshal(message)
pingMsg := fmt.Sprintf("event: message\ndata:%s\n\n", messageBytes)
select {
case session.eventQueue <- pingMsg:
endpoint := s.GetMessageEndpointForClient(r, sessionID)
if s.appendQueryToMessageEndpoint && len(r.URL.RawQuery) > 0 {
endpoint += "&" + r.URL.RawQuery
}
fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", endpoint)handleMessage方法,会根据用户请求中的sessionId去加载session信息,然后用json-rpc协议解析请求,然后启动协程来处理请求。
func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, "Method not allowed")
return
}
sessionID := r.URL.Query().Get("sessionId")
if sessionID == "" {
s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId")
return
}
sessionI, ok := s.sessions.Load(sessionID)
if !ok {
s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID")
return
}
session := sessionI.(*sseSession)
var rawMessage json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error")
return
}
go func(ctx context.Context) {
defer cancel()
// Use the context that will be canceled when session is done
// Process message through MCPServer
response := s.server.HandleMessage(ctx, rawMessage)
// Only send response if there is one (not for notifications)
if response != nil {
var message string
if eventData, err := json.Marshal(response); err != nil {最后处理消息的函数和stdio方式是同一个函数,根据不同method进行分发处理。
func (s *MCPServer) HandleMessage(
ctx context.Context,
message json.RawMessage,
) mcp.JSONRPCMessage {
switch baseMessage.Method {
case mcp.MethodInitialize:本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!