MCP 开发实战 - 开发一个使用 Chroma mcp 的客户端(踩坑记录)

首先是 chroma-mcp 服务,这里有巨坑!巨坑!!

chroma-mcp

首先是开源项目地址,先 fork 一份

https://github.com/chroma-core/chroma-mcp

小坑

首先源码中 transport 为 stdio 且不能修改

而 stdio 是给 curosor mcp server 用的,这里要用 Http 服务,所以要改

现在要加上环境变量,另外源码前面要接受这个 args ,自行添加即可

mcp.run(transport=args.transport)

中坑

官方的启动方式文档里会有这么一段

1
2
3
4
5
6
7
8
if __name__ == "__main__":
mcp.run(
transport="streamable-http",
host="127.0.0.1",
port=4200,
path="/my-custom-path",
log_level="debug",
)

但是实际上 mcp.run 方法源码里并不接受后面的那堆参数

port 等参数应该在这里提供

mcp = FastMCP("chroma", port=8000)

巨坑!

这是个史诗级别巨坑,我甚至怀疑官方有没有在 docker 部署这个服务

将这个 docker image 启动后,映射了默认端口 8000 ,但实际上没法访问到!没法访问!!!

随后我试了自行搭建最小 mcp 服务部署,发现只要是在 Docker 内就没法访问,而启动日志会告诉你 127.0.0.1 已经开启了,并且在容器内是可以访问的

好了,如果你到这里没有发现上面那句话的问题,那你也要踩这个坑,哈哈

原因是 127.0.0.1 只监听到了本地 ip 访问,并没有监听到容器的网络访问,所以要修改 host

这也是我为什么怀疑官方并没有测试这个镜像的原因

mcp = FastMCP("chroma", port= 8080, host="0.0.0.0")

环境变量

于是,这个镜像启动的环境变量如下

1
2
3
4
5
CHROMA_HOST=10.11.12.10
CHROMA_PORT=48000
CHROMA_SSL=false
CHROMA_CLIENT_TYPE=http
CHROMA_TRANSPORT=streamable-http

启动时需要带上参数 -i -t ,否则容器会几秒后退出

启动后应该可以访问到 ip:port/mcp 会给出一段数据

至此,容器应该能成功部署,并且在 cursor mcp 服务中应该能挂载上去了

mcp 客户端

这里使用 fastmcp 库来开发

有兴趣的小伙伴直接使用 cursor 生成代码即可

要注意以下几个问题:

  1. 流式输出会导致不好判断 AI 返回的内容是否要调用 mcp 服务,这里的判断要记录流式输出内容判断,因为输出 mcp 这个字符串时可能会分开输出
  2. AI 调用 mcp 服务时参数可能会有问题,将报错还给 AI ,让 AI 重试,记得设置最大重试次数

这里给出几个关键的代码部分,剩下的拼接可以扔给 cursor 完成。这部分的代码我也是通过 cursor 写的

获取可用工具

1
2
3
4
5
6
7
8
9
try:
async with Client(url) as client:
tools = await client.list_tools()
tool_names = [tool.name for tool in tools]
logger.info(f"获取到{len(tool_names)}个可用工具: {', '.join(tool_names)}")
return tool_names
except Exception as e:
logger.error(f"获取MCP工具列表失败: {str(e)}", exc_info=True)
return []

调用工具

1
2
3
4
5
6
7
8
9
10
11
# call_streaming_model
result = await client.call_tool(tool_name, params)

# 处理结果,FastMCP的call_tool返回的是一个列表
if result and isinstance(result, list) and len(result) > 0:
# 获取第一个结果项的文本内容
if hasattr(result[0], 'text'):
result_text = result[0].text
else:
# 如果没有text属性,尝试转换为字符串
result_text = str(result[0])

从 AI 回复中提取 mcp 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def _extract_tool_calls(self, text: str) -> List[Dict[str, Any]]:
"""从响应文本中提取MCP工具调用"""
tool_calls = []
# 使用正则表达式匹配 ```mcp ... ``` 代码块
pattern = r"```mcp\s*([\s\S]*?)\s*```"
matches = re.finditer(pattern, text)

match_count = 0
for match in matches:
match_count += 1
try:
tool_call_json = match.group(1).strip()
logger.info(f"提取到工具调用JSON文本: {tool_call_json}")

# 尝试解析JSON
try:
tool_call = json.loads(tool_call_json)

# 验证工具调用格式
if "tool" in tool_call and "params" in tool_call:
tool_name = tool_call["tool"]
params = tool_call["params"]
logger.info(f"解析工具调用: tool={tool_name}, params={params}")

# 检查工具是否可用
if tool_name in self.available_tools:
tool_calls.append(tool_call)
logger.info(f"有效工具调用: tool={tool_name}")
else:
logger.warning(f"工具不可用: {tool_name}, 可用工具: {self.available_tools}")
else:
missing_fields = []
if "tool" not in tool_call:
missing_fields.append("tool")
if "params" not in tool_call:
missing_fields.append("params")
logger.warning(f"工具调用格式不完整,缺少字段: {', '.join(missing_fields)}, 原始JSON: {tool_call_json}")
except json.JSONDecodeError as e:
logger.error(f"工具调用JSON解析错误: {str(e)}, 位置: {e.pos}, 行: {e.lineno}, 列: {e.colno}, 原始文本: {tool_call_json}")
continue
except Exception as e:
logger.error(f"处理工具调用时发生未知错误: {str(e)}, 原始匹配: {match.group(0)[:100]}...", exc_info=True)
continue

logger.info(f"从文本中提取到{len(tool_calls)}/{match_count}个有效工具调用")
return tool_calls

调用 MCP 并重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
async def _call_tool_streaming(self, tool_name: str, params: Dict[str, Any]) -> AsyncGenerator[str, None]:
"""调用MCP工具

Args:
tool_name: 工具名称
params: 工具参数

Yields:
工具调用结果的输出
"""
logger.info(f"开始调用工具: tool_name={tool_name}, params={params}")

# 设置最大尝试次数
MAX_ATTEMPTS = 5
attempt = 1

if not tool_name:
error_msg = "❌ 工具名称不能为空"
logger.error(error_msg)
yield error_msg
return

if tool_name not in self.available_tools:
available_tools_str = ", ".join(self.available_tools) if self.available_tools else "无可用工具"
error_msg = f"⚠️ 工具 '{tool_name}' 不可用。可用工具: {available_tools_str}"
logger.warning(f"工具不可用: {tool_name}, 可用工具: {self.available_tools}")
yield error_msg
return

# 记录参数历史,以便检测AI是否修改了参数
params_history = []

# 重试循环
while attempt <= MAX_ATTEMPTS:
logger.info(f"尝试 #{attempt}/{MAX_ATTEMPTS} 调用工具: {tool_name}")

# 记录本次参数
params_str = json.dumps(params, ensure_ascii=False)
params_history.append(params_str)

# 初始化错误标志
has_error = False
error_message = ""

try:
# 记录开始调用工具的时间
start_time = asyncio.get_event_loop().time()

# 流式调用MCP工具并实时返回结果
logger.info(f"开始调用MCP工具 (尝试 #{attempt}): tool_name={tool_name}, mcp_url={self.mcp_url}")
yield f"✅ 工具 '{tool_name}' 调用中 (尝试 #{attempt}/{MAX_ATTEMPTS})...\n"

# 调用工具并流式返回结果
result_count = 0
total_size = 0
result_chunks = [] # 收集所有结果块

try:
async for result_chunk in call_mcp_tool_streaming(tool_name, params, self.mcp_url):
result_count += 1
total_size += len(result_chunk)
result_chunks.append(result_chunk)

logger.info(f"工具调用返回结果块 #{result_count}: 大小={len(result_chunk)}, 内容={result_chunk[:50]}...")

# 检查是否为错误消息
if result_chunk.startswith("工具调用错误:"):
has_error = True
error_message = result_chunk
logger.warning(f"工具调用返回错误: {error_message}")

# 正常返回结果块
yield result_chunk

# 记录工具调用完成的时间和统计信息
end_time = asyncio.get_event_loop().time()
duration = end_time - start_time
logger.info(f"工具调用完成 (尝试 #{attempt}): tool_name={tool_name}, 用时={duration:.2f}秒, 返回了{result_count}个结果块, 总大小={total_size}字节")

if result_count == 0:
logger.warning(f"工具调用没有返回任何结果: {tool_name}")
yield "工具调用没有返回任何结果"
has_error = True
error_message = "工具调用没有返回任何结果"
except Exception as e:
logger.error(f"工具调用过程中发生错误: {str(e)}", exc_info=True)
error_msg = f"工具调用过程中发生错误: {str(e)}"
yield error_msg
has_error = True
error_message = error_msg

# 结束当前调用的结果块
yield "\n"

# 如果调用成功或已达到最大尝试次数,则退出循环
if not has_error or attempt >= MAX_ATTEMPTS:
if has_error and attempt >= MAX_ATTEMPTS:
logger.warning(f"工具 {tool_name} 调用在 {MAX_ATTEMPTS} 次尝试后仍然失败")
yield f"\n已达到最大尝试次数 ({MAX_ATTEMPTS}),工具调用仍然失败。"
break

# 否则,准备重试
retry_prompt = f"\n工具调用失败: {error_message}\n\n请修正参数并重新尝试。这是第 {attempt}/{MAX_ATTEMPTS} 次尝试。"
yield retry_prompt

# 生成AI响应以获取修正后的参数
try:
# 构造简单的提示,让AI提供修正后的参数
ai_prompt = f"工具 '{tool_name}' 调用失败: {error_message}\n请提供修正后的参数。只需要回复JSON格式的参数部分,不要包含其他内容。"

# 标记是否接收到了新参数
received_new_params = False

# 使用流式响应获取AI的建议
ai_response = ""
logger.info(f"请求AI提供修正后的参数 (尝试 #{attempt})")

yield "\n等待AI修正参数...\n"

# 使用_get_ai_streaming_response获取AI响应
async for ai_chunk in self._get_ai_streaming_response(ai_prompt):
ai_response += ai_chunk

# 尝试从响应中提取JSON参数
json_match = re.search(r'({[\s\S]*?})', ai_response)
if json_match and not received_new_params:
try:
json_str = json_match.group(1)
new_params = json.loads(json_str)

# 检查是否与之前的参数相同
new_params_str = json.dumps(new_params, ensure_ascii=False)
if new_params_str not in params_history:
logger.info(f"AI提供了新的参数: {new_params_str}")
params = new_params # 更新参数
received_new_params = True
yield f"\nAI提供了新的参数: json\n{json.dumps(new_params, indent=2, ensure_ascii=False)}\n"
else:
logger.warning("AI提供的参数与之前尝试的参数相同")
yield "\nAI提供的参数与之前尝试的相同,仍将重试...\n"
except json.JSONDecodeError:
# 继续收集更多响应,可能JSON尚未完整
pass

# 如果没有成功提取到新参数,记录警告
if not received_new_params:
logger.warning(f"无法从AI响应中提取有效的JSON参数: {ai_response}")
yield "\n无法获取有效的参数修正,将使用原参数重试...\n"

except Exception as e:
logger.error(f"获取AI参数修正时出错: {str(e)}", exc_info=True)
yield f"\n获取参数修正失败: {str(e)},将使用原参数重试...\n"

# 增加尝试次数
attempt += 1

except Exception as e:
error_msg = f"❌ 工具 '{tool_name}' 调用失败: {str(e)}"
logger.error(f"工具调用异常: {error_msg}", exc_info=True)
yield error_msg

# 如果未达到最大尝试次数,准备重试
if attempt < MAX_ATTEMPTS:
retry_prompt = f"\n工具调用异常: {str(e)}\n\n请修正参数并重新尝试。这是第 {attempt}/{MAX_ATTEMPTS} 次尝试。"
yield retry_prompt
attempt += 1
else:
logger.warning(f"工具 {tool_name} 调用在 {MAX_ATTEMPTS} 次尝试后仍然失败")
yield f"\n已达到最大尝试次数 ({MAX_ATTEMPTS}),工具调用仍然失败。"
break

入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284

@chat_bp.route('/stream', methods=['GET', 'POST'])
def stream_chat():
"""流式聊天接口 - 支持SSE事件流和MCP工具调用

两种使用模式:
1. 双阶段模式:先通过GET请求建立连接,获取sessionId,再通过POST请求发送消息
2. 单阶段模式:直接通过POST请求发送消息并返回流式响应
"""

# 获取会话ID (共同处理GET和POST请求)
if request.method == 'POST':
data = request.get_json()
sessionId = data.get('sessionId')
create_new_stream = data.get('createStream', False) # 新增参数,指示是否创建新的流连接
logger.info(f"收到POST流式请求: sessionId={sessionId}, createStream={create_new_stream}")
else:
sessionId = request.args.get('sessionId')
create_new_stream = False
logger.info(f"收到GET流式请求: sessionId={sessionId}")

# 如果没有会话ID,创建一个新的
if not sessionId:
sessionId = str(uuid.uuid4())
logger.info(f"生成新的会话ID: {sessionId}")

# 如果是GET请求或者POST请求带有createStream=True参数,创建新的SSE流
if request.method == 'GET' or create_new_stream:
def generate_session():
logger.debug(f"开始生成SSE会话: sessionId={sessionId}")
yield f"data: {json.dumps({'type': 'session_id', 'session_id': sessionId})}\n\n"
logger.debug(f"已发送会话ID: {sessionId}")

# 创建响应队列
response_queue = queue.Queue()
active_streams[sessionId] = response_queue
logger.debug(f"已创建响应队列: sessionId={sessionId}")

# 如果是POST请求且带有消息,则处理消息
if request.method == 'POST' and create_new_stream:
userInput = data.get('message')
if userInput:
logger.info(f"处理POST流式请求中的消息: sessionId={sessionId}, message={userInput[:50]}...")
# 获取或创建聊天会话
modelType = data.get('modelType', 'deepseek')
systemPrompt = data.get('systemPrompt', '')
mcp_url = data.get('mcpUrl') or get_env_variable("MCP_URL")

mcpChat = mcpChatSessions.get(sessionId)
if not mcpChat:
logger.info(f"创建新的MCP聊天会话: sessionId={sessionId}, modelType={modelType}")
mcpChat = MCPChat(modelType, systemPrompt)
mcpChatSessions[sessionId] = mcpChat

# 如果提供了自定义MCP URL,设置它
if mcp_url and mcp_url != mcpChat.mcp_url:
logger.info(f"设置自定义MCP URL: {mcp_url}")
mcpChat.mcp_url = mcp_url

# 定义回调函数和工具调用标志
has_tool_call_flag = [False]

# 在单独的线程中处理消息
logger.debug(f"启动异步处理线程: sessionId={sessionId}")
thread = threading.Thread(
target=process_async_response,
args=(sessionId, mcpChat, userInput, response_queue, has_tool_call_flag)
)
thread.daemon = True
thread.start()

try:
while True:
try:
# 非阻塞方式获取响应,如果没有则短暂休眠
item = response_queue.get(block=False)
if item is None: # 结束信号
logger.debug(f"收到结束信号: sessionId={sessionId}")
yield f"data: {json.dumps({'type': 'end'})}\n\n"
break
else:
logger.debug(f"发送SSE数据: {item[:100]}...")
yield item
except queue.Empty:
# 等待一段时间后再次检查
time.sleep(0.1)
finally:
# 清理
if sessionId in active_streams:
logger.debug(f"清理响应队列: sessionId={sessionId}")
del active_streams[sessionId]

return Response(
stream_with_context(generate_session()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)

# 处理常规的POST请求 - 接收用户消息并生成响应
userInput = data.get('message')
modelType = data.get('modelType', 'deepseek')
systemPrompt = data.get('systemPrompt', '')
mcp_url = data.get('mcpUrl') or get_env_variable("MCP_URL")

if not userInput:
return jsonify({"error": "消息内容不能为空"}), 400

# 获取或创建聊天会话
mcpChat = mcpChatSessions.get(sessionId)
if not mcpChat:
logger.info(f"创建新的MCP聊天会话: sessionId={sessionId}, modelType={modelType}")
mcpChat = MCPChat(modelType, systemPrompt)
mcpChatSessions[sessionId] = mcpChat

# 如果提供了自定义MCP URL,设置它
if mcp_url and mcp_url != mcpChat.mcp_url:
logger.info(f"设置自定义MCP URL: {mcp_url}")
mcpChat.mcp_url = mcp_url

# 检查是否有活动的流
if sessionId not in active_streams:
logger.warning(f"未找到活动的流连接: sessionId={sessionId}")
return jsonify({
"error": "没有找到活动的流连接,请先建立GET连接或使用createStream=true",
"sessionId": sessionId
}), 400

# 获取响应队列
response_queue = active_streams[sessionId]
has_tool_call_flag = [False] # 用于跟踪是否有工具调用

# 启动处理线程
logger.debug(f"启动异步处理线程: sessionId={sessionId}")
thread = threading.Thread(
target=process_async_response,
args=(sessionId, mcpChat, userInput, response_queue, has_tool_call_flag)
)
thread.daemon = True
thread.start()

return jsonify({"status": "processing", "sessionId": sessionId})

# 将异步处理逻辑抽取为单独的函数,便于重用
def process_async_response(sessionId, mcpChat, userInput, response_queue, has_tool_call_flag):
"""在单独的线程中处理异步模型调用"""

# 用于收集完整的AI响应
complete_response = ""

# 定义两参数的回调函数
def simple_callback(chunk, is_complete):
nonlocal complete_response

# 记录接收到的AI输出
logger.debug(f"AI输出: sessionId={sessionId}, chunk={chunk}, is_complete={is_complete}")

# 添加到完整响应
complete_response += chunk

# 发送原始块给客户端
if chunk and chunk.strip(): # 确保只有非空内容才发送
response_queue.put(f"data: {json.dumps({'type': 'chunk', 'content': chunk})}\n\n")

if is_complete:
# AI响应完成,处理完整响应
logger.info(f"AI响应完成: sessionId={sessionId}")
logger.info(f"完整响应: {complete_response}")

# 检测并处理工具调用
process_complete_response(complete_response, response_queue)

# 发送结束信号
response_queue.put(None)

# 处理完整响应中的工具调用
def process_complete_response(response_text, response_queue):
"""处理完整响应中的工具调用"""
# 使用正则表达式匹配所有 ```mcp ... ``` 代码块
pattern = r"```mcp\s*([\s\S]*?)\s*```"
matches = re.finditer(pattern, response_text)

tool_calls_found = False

for match in matches:
tool_call_text = match.group(1).strip()
logger.info(f"检测到工具调用: {tool_call_text}")
tool_calls_found = True
has_tool_call_flag[0] = True

# 处理工具调用
process_tool_call(tool_call_text, response_queue)

if not tool_calls_found:
logger.info("未检测到工具调用")

# 处理单个工具调用
def process_tool_call(tool_call_text, response_queue):
"""处理工具调用"""
try:
# 解析工具调用JSON
tool_call = json.loads(tool_call_text)
tool_name = tool_call.get("tool")
params = tool_call.get("params", {})

logger.info(f"解析工具调用: tool_name={tool_name}, params={params}")

# 通知前端有工具调用
response_queue.put(f"data: {json.dumps({'type': 'tool_call'})}\n\n")

# 创建新的事件循环来执行工具调用
tool_loop = asyncio.new_event_loop()

def run_tool_call():
asyncio.set_event_loop(tool_loop)
try:
# 在新事件循环中执行工具调用
tool_loop.run_until_complete(execute_tool_and_send_results(tool_name, params, response_queue))
finally:
tool_loop.close()

# 创建并启动线程来执行工具调用
tool_thread = threading.Thread(target=run_tool_call)
tool_thread.daemon = True
tool_thread.start()

# 等待工具调用线程完成
logger.info(f"等待工具调用完成: tool_name={tool_name}")
tool_thread.join()
logger.info(f"工具调用线程已完成: tool_name={tool_name}")

except json.JSONDecodeError as e:
error_msg = f"❌ 工具调用格式错误: {tool_call_text}, 错误: {str(e)}"
logger.error(f"工具调用JSON解析错误: {error_msg}")
response_queue.put(f"data: {json.dumps({'type': 'chunk', 'content': error_msg})}\n\n")
except Exception as e:
error_msg = f"❌ 工具调用处理失败: {str(e)}"
logger.error(f"工具调用处理异常: {error_msg}", exc_info=True)
response_queue.put(f"data: {json.dumps({'type': 'chunk', 'content': error_msg})}\n\n")

# 异步执行工具调用并发送结果
async def execute_tool_and_send_results(tool_name, params, response_queue):
"""异步执行工具调用并发送结果"""
try:
# 调用工具并获取结果
logger.debug(f"开始流式调用工具: tool_name={tool_name}")
tool_responses = []
async for result_chunk in mcpChat._call_tool_streaming(tool_name, params):
# 发送工具调用结果给客户端
logger.debug(f"工具调用结果: {result_chunk[:100]}...")
response_queue.put(f"data: {json.dumps({'type': 'chunk', 'content': result_chunk})}\n\n")
tool_responses.append(result_chunk)

logger.info(f"工具调用完成,共发送了 {len(tool_responses)} 个结果块")
except Exception as e:
error_msg = f"❌ 工具 '{tool_name}' 调用失败: {str(e)}"
logger.error(f"工具调用失败: {error_msg}", exc_info=True)
response_queue.put(f"data: {json.dumps({'type': 'chunk', 'content': error_msg})}\n\n")

# 创建新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
# 启动异步流式响应,直接使用符合参数要求的回调函数
logger.info(f"开始获取流式响应: sessionId={sessionId}, userInput={userInput[:50]}...")
async def run_streaming():
# 直接传递callback参数
await mcpChat.get_streaming_response(userInput, simple_callback)

loop.run_until_complete(run_streaming())
except Exception as e:
error_message = str(e)
logger.error(f"流式处理错误: {error_message}", exc_info=True)
response_queue.put(f"data: {json.dumps({'type': 'error', 'error': error_message})}\n\n")
response_queue.put(None) # 发送结束信号
finally:
loop.close()

# 存储活动的流连接
active_streams = {}

流式对话 - 前端代码

核心就一个 API 请求,主要是用 axios 启动 SSE 方式建立推送流
关于 SSE 可以去搜一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
export const sendStreamingMCPChatMessage = async (
data: MCPChatRequest,
onChunk: (chunk: string, isComplete: boolean, hasToolCall: boolean) => void
): Promise<{ sessionId: string }> => {
return new Promise((resolve, reject) => {
let sessionId = data.sessionId || "";
let hasToolCall = false;
let buffer = "";

// 创建 axios 请求配置
const config = {
method: "post",
url: `${API_BASE_URL}/chat/stream`,
data: {
...data,
createStream: true,
},
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
responseType: "stream" as any,
onDownloadProgress: (progressEvent: any) => {
// 获取新到达的文本数据
const rawText = progressEvent.event.target.response || "";
// 如果有新数据
if (rawText && rawText !== buffer) {
// 提取新增部分
const newText = rawText.substring(buffer.length);
buffer = rawText; // 更新缓冲区

// 处理新增的 SSE 数据
const lines = newText.split("\n\n");

for (const line of lines) {
if (!line.trim()) continue;

// 提取 data: 部分
const match = line.match(/data: (.*)/);
if (!match) continue;

try {
const eventData = JSON.parse(match[1]);
console.log("收到SSE数据:", eventData);

if (eventData.type === "chunk") {
// 处理文本块
onChunk(eventData.content, false, hasToolCall);
} else if (eventData.type === "tool_call") {
// 工具调用标记
hasToolCall = true;
onChunk("", false, true);
} else if (eventData.type === "session_id") {
// 保存会话ID
sessionId = eventData.session_id;
console.log("获取到会话ID:", sessionId);
} else if (eventData.type === "end") {
// 流结束
console.log("流式输出结束");
onChunk("", true, hasToolCall);
resolve({ sessionId });
} else if (eventData.type === "error") {
// 错误处理
console.error("流处理错误:", eventData.error);
onChunk(`错误: ${eventData.error}`, false, hasToolCall);
}
} catch (error) {
console.error("解析流事件失败:", error, "原始数据:", line);
}
}
}
},
};

// 发送请求
axios(config)
.then(() => {
// 请求完成,如果没有通过 end 事件解析,在这里解析
if (sessionId) {
resolve({ sessionId });
} else {
resolve({ sessionId: "" });
}
})
.catch((error) => {
console.error("SSE请求失败:", error);
reject(error);
});
});
};