FastAPI Deploy Qwen2.5
作者:XD / 发表: 2024年11月20日 03:44 / 更新: 2024年11月20日 03:47 / 编程笔记 / 阅读量:90
Here is the service code.
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import uvicorn
from typing import List, Dict, Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor
import os
from datetime import datetime
import logging
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ChatRequest(BaseModel):
messages: List[Dict[str, str]]
max_new_tokens: int = Field(default=512, ge=1, le=2048)
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
top_p: float = Field(default=0.9, ge=0.0, le=1.0)
repetition_penalty: float = Field(default=1.0, ge=0.0, le=2.0)
top_k: int = Field(default=50, ge=0)
num_beams: int = Field(default=1, ge=1)
do_sample: bool = Field(default=True)
class ModelConfig(BaseModel):
model_path: str = "/your_model_path/qwen2.5_3b/"
gpu_id: Optional[int] = None
max_batch_size: int = 32
max_concurrent_requests: int = 10
class Config:
protected_namespaces = ()
class ModelService:
_instance = None
_lock = asyncio.Lock()
@classmethod
async def get_instance(cls, config: ModelConfig):
async with cls._lock:
if cls._instance is None:
cls._instance = cls(config)
return cls._instance
def __init__(self, config: ModelConfig):
if ModelService._instance is not None:
raise Exception("This class is a singleton!")
self.config = config
# 获取实际可用的GPU ID
if torch.cuda.is_available():
cuda_visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES')
if cuda_visible_devices is not None:
self.device = "cuda:0"
logger.info(f"CUDA_VISIBLE_DEVICES is set to {cuda_visible_devices}")
else:
if config.gpu_id is not None and config.gpu_id < torch.cuda.device_count():
self.device = f"cuda:{config.gpu_id}"
else:
self.device = "cuda:0"
else:
self.device = "cpu"
logger.info(f"Using device: {self.device}")
# 加载模型和tokenizer
logger.info("Loading model and tokenizer...")
try:
if self.device == "cpu":
device_map = "cpu"
else:
device_map = {"": self.device}
self.model = AutoModelForCausalLM.from_pretrained(
config.model_path,
torch_dtype=torch.bfloat16 if 'cuda' in self.device else torch.float32,
device_map=device_map,
trust_remote_code=True
)
self.tokenizer = AutoTokenizer.from_pretrained(
config.model_path,
trust_remote_code=True
)
logger.info(f"Model is on device: {next(self.model.parameters()).device}")
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
raise
logger.info("Model loaded successfully!")
# 初始化线程池
self.executor = ThreadPoolExecutor(max_workers=config.max_concurrent_requests)
# 初始化请求队列
self.request_queue = asyncio.Queue(maxsize=config.max_batch_size)
# 启动异步处理任务
self.task = asyncio.create_task(self._process_queue())
async def _process_queue(self):
"""异步处理请求队列"""
while True:
batch = []
try:
while len(batch) < self.config.max_batch_size:
if not batch:
request, future = await self.request_queue.get()
else:
try:
request, future = self.request_queue.get_nowait()
except asyncio.QueueEmpty:
break
batch.append((request, future))
if batch:
responses = await asyncio.get_event_loop().run_in_executor(
self.executor,
self._batch_generate,
[r[0] for r in batch]
)
for (_, future), response in zip(batch, responses):
if not future.done():
future.set_result(response)
except Exception as e:
logger.error(f"Error processing batch: {str(e)}")
for _, future in batch:
if not future.done():
future.set_exception(e)
finally:
for _ in range(len(batch)):
self.request_queue.task_done()
def _batch_generate(self, requests: List[ChatRequest]) -> List[str]:
"""批量生成回复"""
try:
texts = [
self.tokenizer.apply_chat_template(
req.messages,
tokenize=False,
add_generation_prompt=True
)
for req in requests
]
inputs = self.tokenizer(texts, return_tensors="pt", padding=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad(), torch.inference_mode():
generated_ids = self.model.generate(
**inputs,
max_new_tokens=max(r.max_new_tokens for r in requests),
temperature=requests[0].temperature,
top_p=requests[0].top_p,
repetition_penalty=requests[0].repetition_penalty,
top_k=requests[0].top_k,
num_beams=requests[0].num_beams,
do_sample=requests[0].do_sample,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
responses = []
for i, output_ids in enumerate(generated_ids):
new_tokens = output_ids[len(inputs['input_ids'][i]):]
response = self.tokenizer.decode(new_tokens, skip_special_tokens=True)
responses.append(response)
return responses
except Exception as e:
logger.error(f"Error in batch generation: {str(e)}")
raise
async def generate(self, request: ChatRequest):
"""异步生成回复"""
try:
loop = asyncio.get_event_loop()
future = loop.create_future()
await self.request_queue.put((request, future))
response = await future
return response
except Exception as e:
logger.error(f"Error generating response: {str(e)}")
raise
# 创建FastAPI应用
app = FastAPI(title="Qwen API Server")
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 配置
model_config = ModelConfig()
model_service = None
@app.on_event("startup")
async def startup_event():
global model_service
model_service = await ModelService.get_instance(model_config)
@app.post("/chat")
async def chat(request: ChatRequest):
try:
response = await model_service.generate(request)
return {
"status": "success",
"response": response,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error in chat endpoint: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"gpu_id": model_config.gpu_id,
"device": model_service.device if model_service else "not initialized"
}
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)
Here is the nohup command line for service.
nohup bash -c 'CUDA_VISIBLE_DEVICES=2 python app.py > app.log 2>&1' &
Here is the request code.
import requests
url = "http://10.110.10.110:8000/chat" #your service IP address
data = {
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello, how are you?"}
],
"max_new_tokens": 2048,
"temperature": 0.7,
"top_p": 0.8,
"top_k": 20,
"repetition_penalty": 1.05,
"do_sample": True
}
response = requests.post(url, json=data)
print(response.json())