MCP transports supports Streamable HTTP

This commit is contained in:
Liujian
2025-10-21 17:52:50 +08:00
parent 6ee1996e6f
commit ea32fb1cd6
11 changed files with 184 additions and 64 deletions
+71 -31
View File
@@ -21,13 +21,14 @@ import (
var _ IMcpController = (*imlMcpController)(nil)
type imlMcpController struct {
settingModule system.ISettingModule `autowired:""`
authorizationModule application_authorization.IAuthorizationModule `autowired:""`
appModule service.IAppModule `autowired:""`
mcpModule mcp.IMcpModule `autowired:""`
sessionKeys sync.Map
server map[string]http.Handler
openServer http.Handler
settingModule system.ISettingModule `autowired:""`
authorizationModule application_authorization.IAuthorizationModule `autowired:""`
appModule service.IAppModule `autowired:""`
mcpModule mcp.IMcpModule `autowired:""`
sessionKeys sync.Map
sseServers map[string]http.Handler
openSseServer http.Handler
openStreamableServer http.Handler
}
func (i *imlMcpController) AppMCPHandle(ctx *gin.Context) {
@@ -42,12 +43,12 @@ func (i *imlMcpController) AppMCPHandle(ctx *gin.Context) {
paths := strings.Split(req.URL.Path, "/")
req.URL.Path = fmt.Sprintf("/api/v1/%s/%s", mcp_server.GlobalBasePath, paths[len(paths)-1])
locale := utils.I18n(ctx)
if v, ok := i.server[locale]; ok {
if v, ok := i.sseServers[locale]; ok {
v.ServeHTTP(ctx.Writer, req)
return
}
i.server[languageEnUs].ServeHTTP(ctx.Writer, req)
i.sseServers[languageEnUs].ServeHTTP(ctx.Writer, req)
}
func (i *imlMcpController) AppHandleSSE(ctx *gin.Context) {
@@ -68,7 +69,7 @@ func (i *imlMcpController) AppHandleSSE(ctx *gin.Context) {
}
ctx.Request.URL.Path = fmt.Sprintf("/openapi/v1/%s/sse", mcp_server.GlobalBasePath)
i.handleSSE(ctx, i.openServer, SessionInfo{
i.handleSSE(ctx, i.openSseServer, SessionInfo{
Apikey: apikey,
App: appId,
})
@@ -81,8 +82,29 @@ func (i *imlMcpController) AppHandleMessage(ctx *gin.Context) {
return
}
ctx.Request.URL.Path = fmt.Sprintf("/openapi/v1/%s/message", mcp_server.GlobalBasePath)
ctx.Request = ctx.Request.WithContext(utils.SetLabel(ctx.Request.Context(), "app", appId))
i.handleMessage(ctx, i.openServer)
//ctx.Request = ctx.Request.WithContext(utils.SetLabel(ctx.Request.Context(), "app", appId))
i.handleMessage(ctx, i.openSseServer)
}
func (i *imlMcpController) AppHandleStreamHTTP(ctx *gin.Context) {
apikey := ctx.Request.Header.Get("Authorization")
apikey = strings.TrimPrefix(apikey, "Bearer ")
if apikey == "" {
ctx.AbortWithStatusJSON(403, gin.H{"code": -1, "msg": "invalid apikey", "success": "fail"})
return
}
appId := ctx.Request.Header.Get("X-Application-Id")
if appId == "" {
ctx.AbortWithStatusJSON(403, gin.H{"code": -1, "msg": "invalid app id", "success": "fail"})
return
}
cfg := i.settingModule.Get(ctx)
req := ctx.Request.WithContext(utils.SetGatewayInvoke(ctx.Request.Context(), cfg.InvokeAddress))
req = req.WithContext(utils.SetLabel(req.Context(), "apikey", apikey))
req = req.WithContext(utils.SetLabel(req.Context(), "app", appId))
req.URL.Path = mcp_server.OpenGlobalMCPPath
i.openStreamableServer.ServeHTTP(ctx.Writer, req)
}
func (i *imlMcpController) AppMCPConfig(ctx *gin.Context, appId string) (string, error) {
@@ -94,36 +116,44 @@ func (i *imlMcpController) AppMCPConfig(ctx *gin.Context, appId string) (string,
if err != nil {
return "", fmt.Errorf("get app info error: %v", err)
}
return fmt.Sprintf(mcpDefaultConfig, appInfo.Name, fmt.Sprintf("%s/openapi/v1/mcp/app/%s/sse?apikey={your_api_key}", strings.TrimSuffix(cfg.SitePrefix, "/"), appId)), nil
}
var mcpDefaultConfig = `{
"mcpServers": {
"%s": {
"url": "%s"
}
}
return mcp_server.NewMCPConfig(
mcp_server.TransportTypeStreamableHTTP,
fmt.Sprintf("%s%s", strings.TrimSuffix(cfg.SitePrefix, "/"), mcp_server.OpenAppMCPPath),
map[string]string{
"Authorization": "Bearer {your_api_key}",
"X-Application-Id": appId,
},
nil,
).ToString(appInfo.Name), nil
}
`
func (i *imlMcpController) GlobalMCPConfig(ctx *gin.Context) (string, error) {
cfg := i.settingModule.Get(ctx)
if cfg.SitePrefix == "" {
return "", fmt.Errorf("site prefix is empty")
}
return fmt.Sprintf(mcpDefaultConfig, "APIPark-MCP-Server", fmt.Sprintf("%s/openapi/v1/%s/sse?apikey={your_api_key}", strings.TrimSuffix(cfg.SitePrefix, "/"), mcp_server.GlobalBasePath)), nil
return mcp_server.NewMCPConfig(
mcp_server.TransportTypeStreamableHTTP,
fmt.Sprintf("%s%s", strings.TrimSuffix(cfg.SitePrefix, "/"), mcp_server.OpenGlobalMCPPath),
map[string]string{
"Authorization": "Bearer {your_api_key}",
},
nil,
).ToString("APIPark-MCP-Server"), nil
}
func (i *imlMcpController) OnComplete() {
i.server = make(map[string]http.Handler)
i.sseServers = make(map[string]http.Handler)
for language, tools := range mcpToolsByLanguage {
s := server.NewMCPServer("APIPark MCP Server", "1.0.0", server.WithLogging())
s.AddTool(tools[ToolServiceList], i.mcpModule.Services)
s.AddTool(tools[ToolOpenAPIDocument], i.mcpModule.APIs)
s.AddTool(tools[ToolInvokeAPI], i.mcpModule.Invoke)
i.server[language] = server.NewSSEServer(s, server.WithStaticBasePath(fmt.Sprintf("/api/v1/%s", mcp_server.GlobalBasePath)))
i.sseServers[language] = server.NewSSEServer(s, server.WithStaticBasePath(fmt.Sprintf("/api/v1/%s", mcp_server.GlobalBasePath)))
if language == languageEnUs {
i.openServer = server.NewSSEServer(s, server.WithStaticBasePath(fmt.Sprintf("/openapi/v1/%s", strings.Trim(mcp_server.GlobalBasePath, "/"))))
i.openSseServer = server.NewSSEServer(s, server.WithStaticBasePath(fmt.Sprintf("/openapi/v1/%s", strings.Trim(mcp_server.GlobalBasePath, "/"))))
i.openStreamableServer = server.NewStreamableHTTPServer(s, server.WithEndpointPath(mcp_server.OpenGlobalMCPPath))
}
}
}
@@ -132,16 +162,16 @@ func (i *imlMcpController) GlobalMCPHandle(ctx *gin.Context) {
cfg := i.settingModule.Get(ctx)
req := ctx.Request.WithContext(utils.SetGatewayInvoke(ctx.Request.Context(), cfg.InvokeAddress))
locale := utils.I18n(ctx)
if v, ok := i.server[locale]; ok {
if v, ok := i.sseServers[locale]; ok {
v.ServeHTTP(ctx.Writer, req)
return
}
i.server[languageEnUs].ServeHTTP(ctx.Writer, req)
i.sseServers[languageEnUs].ServeHTTP(ctx.Writer, req)
}
func (i *imlMcpController) GlobalHandleSSE(ctx *gin.Context) {
apikey := ctx.Request.URL.Query().Get("apikey")
i.handleSSE(ctx, i.openServer, SessionInfo{
i.handleSSE(ctx, i.openSseServer, SessionInfo{
Apikey: apikey,
})
}
@@ -167,7 +197,16 @@ func (i *imlMcpController) handleSSE(ctx *gin.Context, server http.Handler, sIn
}
func (i *imlMcpController) GlobalHandleMessage(ctx *gin.Context) {
i.handleMessage(ctx, i.openServer)
i.handleMessage(ctx, i.openSseServer)
}
func (i *imlMcpController) GlobalHandleStreamHTTP(ctx *gin.Context) {
apikey := ctx.Request.Header.Get("Authorization")
apikey = strings.TrimPrefix(apikey, "Bearer ")
cfg := i.settingModule.Get(ctx)
req := ctx.Request.WithContext(utils.SetGatewayInvoke(ctx.Request.Context(), cfg.InvokeAddress))
req = req.WithContext(utils.SetLabel(req.Context(), "apikey", apikey))
i.openStreamableServer.ServeHTTP(ctx.Writer, req)
}
func (i *imlMcpController) MCPHandle(ctx *gin.Context) {
@@ -204,12 +243,13 @@ func (i *imlMcpController) ServiceHandleMessage(ctx *gin.Context) {
}
func (i *imlMcpController) ServiceHandleStreamHTTP(ctx *gin.Context) {
apikey := ctx.Request.URL.Query().Get("apikey")
serviceId := ctx.Param("serviceId")
apikey := ctx.Request.Header.Get("Authorization")
serviceId := ctx.Request.Header.Get("X-Service-Id")
if serviceId == "" {
ctx.AbortWithStatusJSON(403, gin.H{"code": -1, "msg": "invalid service id", "success": "fail"})
return
}
apikey = strings.TrimPrefix(apikey, "Bearer ")
ok, err := i.authorizationModule.CheckAPIKeyAuthorizationByService(ctx, serviceId, apikey)
if err != nil {
ctx.AbortWithStatusJSON(403, gin.H{"code": -1, "msg": err.Error(), "success": "fail"})
+2
View File
@@ -13,11 +13,13 @@ type IMcpController interface {
GlobalMCPHandle(ctx *gin.Context)
GlobalHandleSSE(ctx *gin.Context)
GlobalHandleMessage(ctx *gin.Context)
GlobalHandleStreamHTTP(ctx *gin.Context)
GlobalMCPConfig(ctx *gin.Context) (string, error)
AppMCPHandle(ctx *gin.Context)
AppHandleSSE(ctx *gin.Context)
AppHandleMessage(ctx *gin.Context)
AppHandleStreamHTTP(ctx *gin.Context)
AppMCPConfig(ctx *gin.Context, appId string) (string, error)
ServiceHandleSSE(ctx *gin.Context)