我正在对post的接口返回strem的形式进行代理, 使用python将<code_a>的 stream 形式的返回结构,转换为<code_b>的形式,需要考虑 stream 分批不是完整的 event 的场景 <code_a>event:data data:{"count":1,"message":"亲爱的编","messageSingle":"亲爱的编","end":false}
event:data data:{"count":2,"message":"亲爱的编程小勇士","messageSingle":"程小勇士","end":false}
event:done data:
</code_a> <code_b> data: {"id":"chatcmpl-534","object":"chat.completion.chunk","created":1722599889,"model":"qwen2","system_fingerprint":"fp_ollama","choices":[{"index":0,"delta":{"role":"assistant","content":"嘿"},"finish_reason":null}]}
data: {"id":"chatcmpl-534","object":"chat.completion.chunk","created":1722599889,"model":"qwen2","system_fingerprint":"fp_ollama","choices":[{"index":0,"delta":{"role":"assistant","content":","},"finish_reason":null}]}
data: {"id":"chatcmpl-534","object":"chat.completion.chunk","created":1722599889,"model":"qwen2","system_fingerprint":"fp_ollama","choices":[{"index":0,"delta":{"role":"assistant","content":"小"},"finish_reason":null}]}
data: [DONE]
</code_b>
要将 code_a
的流式返回结构转换为 code_b
的形式,可以使用 Python 进行流处理。我们需要考虑到流数据可能会分批到达,并且每个批次可能不是完整的事件。以下是一个示例代码,展示了如何实现这个转换:
import json
import time
import uuid
def generate_code_b_event(data, role="assistant"):
return {
"id": "chatcmpl-534",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "qwen2",
"system_fingerprint": "fp_ollama",
"choices": [
{
"index": 0,
"delta": {
"role": role,
"content": data
},
"finish_reason": None
}
]
}
def process_stream(input_stream):
buffer = ""
for chunk in input_stream:
buffer += chunk
while "\n\n" in buffer:
event, buffer = buffer.split("\n\n", 1)
if event.startswith("event: done"):
yield "data: [DONE]\n\n"
elif event.startswith("event: data"):
data_line = event.split("\n")[1]
data_json = json.loads(data_line.split(":", 1)[1].strip())
message = data_json.get("messageSingle", "")
for char in message:
event_b = generate_code_b_event(char)
yield f"data: {json.dumps(event_b)}\n\n"
# 模拟输入流
input_stream = [
"event:data\ndata:{\"count\":1,\"message\":\"亲爱的编\",\"messageSingle\":\"亲爱的编\",\"end\":false}\n\n",
"event:data\ndata:{\"count\":2,\"message\":\"亲爱的编程小勇士\",\"messageSingle\":\"程小勇士\",\"end\":false}\n\n",
"event:done\ndata:\n\n"
]
# 处理流并输出结果
for output in process_stream(input_stream):
print(output, end="")
generate_code_b_event
函数:生成 code_b
格式的事件。process_stream
函数:处理输入流,将 code_a
格式的事件转换为 code_b
格式。
buffer
来存储未处理的部分,确保处理分批到达的数据。split("\n\n", 1)
来分割事件,处理每个完整的事件。data
或 done
)生成相应的 code_b
格式输出。input_stream
模拟了一个流式输入,包含多个事件。process_stream
的输出并打印结果。这个示例代码展示了如何处理流式数据并进行格式转换,确保即使数据分批到达也能正确处理。