stream respose with fastapi and langchain

以下代码是一个使用FastAPI和Langchain的异步流处理的例子。它创建了一个FastAPI应用,该应用有一个POST路由/stream,该路由接收一个消息,然后使用Langchain的ChatOpenAI模型生成一个响应流。
完整实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

import asyncio
import os
from typing import AsyncIterable, Awaitable

import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from pydantic import BaseModel

# Two ways to load env variables
# 1.load env variables from .env file
load_dotenv()

# 2.manually set env variables
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = ""

app = FastAPI()


async def send_message(message: str) -> AsyncIterable[str]:
callback = AsyncIteratorCallbackHandler()
model = ChatOpenAI(
streaming=True,
verbose=True,
callbacks=[callback],
)

async def wrap_done(fn: Awaitable, event: asyncio.Event):
"""Wrap an awaitable with a event to signal when it's done or an exception is raised."""
try:
await fn
except Exception as e:
# TODO: handle exception
print(f"Caught exception: {e}")
finally:
# Signal the aiter to stop.
event.set()

# Begin a task that runs in the background.
task = asyncio.create_task(wrap_done(
model.agenerate(messages=[[HumanMessage(content=message)]]),
callback.done),
)

async for token in callback.aiter():
# Use server-sent-events to stream the response
yield f"data: {token}\n\n"

await task


class StreamRequest(BaseModel):
"""Request body for streaming."""
message: str


@app.post("/stream")
def stream(body: StreamRequest):
return StreamingResponse(send_message(body.message), media_type="text/event-stream")


if __name__ == "__main__":
uvicorn.run(host="0.0.0.0", port=8000, app=app)

以下是代码的主要部分的解释:

send_message函数:这是一个异步生成器函数,它接收一个消息,然后使用Langchain的ChatOpenAI模型生成一个响应流。这个函数使用了一个AsyncIteratorCallbackHandler来处理模型的回调,并使用一个后台任务来运行模型的生成器函数。每当生成器产生一个新的令牌时,这个函数就会产生一个新的服务器发送的事件。

StreamRequest类:这是一个Pydantic模型,用于验证流请求的请求体。它只有一个字段message,表示要发送的消息。

/stream路由:这是一个POST路由,它接收一个StreamRequest对象,然后调用send_message函数来生成一个响应流。它使用StreamingResponse来返回这个流,媒体类型设置为text/event-stream,这是服务器发送的事件的标准媒体类型。

主函数:如果这个脚本作为主程序运行,它将使用uvicorn来启动FastAPI应用。