Skip to content

Commit d29c69a

Browse files
kasarolzzwJqRrtCoda-bot
authored
[fix][prompt] async ptaas (#30)
* http client add async client * fix: [Coda] 修复apost_stream异步流处理的根本性错误 (LogID: 20250922201317010091104016621DD33) Co-Authored-By: Coda <[email protected]> * fix: [Coda] 修复流式请求中上下文管理器使用错误导致的AttributeError (LogID: 202509222042560100911040167333F06) Co-Authored-By: Coda <[email protected]> * fix async ptaas Change-Id: If57b470952a1db94d67dc36430262f2ee720d209 * feat: [Coda] translate all Chinese comments to English (LogID: 202509231157310100911040168860B1F) Co-Authored-By: Coda <[email protected]> * feat: [Coda] translate all Chinese comments to English comments (LogID: 202509231207050100911040169818AAD) Co-Authored-By: Coda <[email protected]> * feat: [Coda] translate all Chinese comments to English comments (LogID: 202509231207050100911040169818AAD) Change-Id: Ia27860041f86ea9fea40055588a85b999eb8b497 Co-Authored-By: Coda <[email protected]> * comment Change-Id: If1499dadbbc1dfbe54bc6218209b4e0cc8a44b5c --------- Co-authored-by: jiangqi.rrt <[email protected]> Co-authored-by: Coda <[email protected]>
1 parent 62ff262 commit d29c69a

File tree

25 files changed

+411
-313
lines changed

25 files changed

+411
-313
lines changed

cozeloop/_client.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,9 @@ def execute_prompt(
282282
timeout: Optional[int] = None
283283
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
284284
"""
285-
执行Prompt请求
285+
Execute Prompt request
286286
287-
:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
287+
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
288288
"""
289289
if self._closed:
290290
raise ClientClosedError()
@@ -310,9 +310,9 @@ async def aexecute_prompt(
310310
timeout: Optional[int] = None
311311
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
312312
"""
313-
异步执行Prompt请求
313+
Asynchronously execute Prompt request
314314
315-
:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
315+
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
316316
"""
317317
if self._closed:
318318
raise ClientClosedError()
@@ -436,9 +436,9 @@ def execute_prompt(
436436
timeout: Optional[int] = None
437437
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
438438
"""
439-
执行Prompt请求
439+
Execute Prompt request
440440
441-
:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
441+
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
442442
"""
443443
return get_default_client().execute_prompt(
444444
prompt_key,
@@ -462,9 +462,9 @@ async def aexecute_prompt(
462462
timeout: Optional[int] = None
463463
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
464464
"""
465-
异步执行Prompt请求
465+
Asynchronously execute Prompt request
466466
467-
:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
467+
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
468468
"""
469469
return await get_default_client().aexecute_prompt(
470470
prompt_key,

cozeloop/entities/prompt.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ class Prompt(BaseModel):
127127

128128

129129
class ExecuteParam(BaseModel):
130-
"""Execute参数"""
130+
"""Execute parameters"""
131131
prompt_key: str
132132
version: str = ""
133133
label: str = ""
@@ -136,13 +136,13 @@ class ExecuteParam(BaseModel):
136136

137137

138138
class TokenUsage(BaseModel):
139-
"""Token使用统计"""
139+
"""Token usage statistics"""
140140
input_tokens: int = 0
141141
output_tokens: int = 0
142142

143143

144144
class ExecuteResult(BaseModel):
145-
"""Execute结果"""
145+
"""Execute result"""
146146
message: Optional[Message] = None
147147
finish_reason: Optional[str] = None
148148
usage: Optional[TokenUsage] = None

cozeloop/entities/stream.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,29 @@
88

99

1010
class StreamReader(ABC, Generic[T]):
11-
"""流式读取器接口"""
11+
"""Stream reader interface"""
1212

1313
@abstractmethod
1414
def __iter__(self) -> Iterator[T]:
15-
"""支持同步迭代 - for循环直接读取"""
15+
"""Support synchronous iteration - for loop direct reading"""
1616
pass
1717

1818
@abstractmethod
1919
def __next__(self) -> T:
20-
"""支持next()函数调用"""
20+
"""Support next() function call"""
2121
pass
2222

2323
@abstractmethod
2424
def __aiter__(self) -> AsyncIterator[T]:
25-
"""支持异步迭代 - async for循环直接读取"""
25+
"""Support asynchronous iteration - async for loop direct reading"""
2626
pass
2727

2828
@abstractmethod
2929
async def __anext__(self) -> T:
30-
"""支持async next()调用"""
30+
"""Support async next() call"""
3131
pass
3232

3333
@abstractmethod
3434
def close(self):
35-
"""关闭流"""
35+
"""Close stream"""
3636
pass

cozeloop/internal/consts/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL = 60
1616
DEFAULT_TIMEOUT = 3
1717
DEFAULT_UPLOAD_TIMEOUT = 30
18-
DEFAULT_PROMPT_EXECUTE_TIMEOUT = 600 # 10分钟,专用于execute_prompt和aexecute_prompt方法
18+
DEFAULT_PROMPT_EXECUTE_TIMEOUT = 600 # 10 minutes, dedicated for execute_prompt and aexecute_prompt methods
1919

2020
LOG_ID_HEADER = "x-tt-logid"
2121
AUTHORIZE_HEADER = "Authorization"

cozeloop/internal/httpclient/client.py

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def post_stream(
130130
json: Union[BaseModel, Dict] = None,
131131
timeout: Optional[int] = None,
132132
):
133-
"""发起流式POST请求,返回stream_context"""
133+
"""Initiate streaming POST request, return stream_context"""
134134
url = self._build_url(path)
135135
headers = self._set_headers({"Content-Type": "application/json"})
136136

@@ -140,7 +140,7 @@ def post_stream(
140140
_timeout = timeout if timeout is not None else self.timeout
141141

142142
try:
143-
# 返回stream_context,让StreamReader管理上下文
143+
# Return stream_context, let StreamReader manage context
144144
stream_context = self.http_client.stream(
145145
"POST",
146146
url,
@@ -153,13 +153,54 @@ def post_stream(
153153
logger.error(f"Http client stream request failed, path: {path}, err: {e}.")
154154
raise consts.NetworkError from e
155155

156+
async def arequest(
157+
self,
158+
path: str,
159+
method: str,
160+
response_model: Type[T],
161+
*,
162+
params: Optional[Dict[str, str]] = None,
163+
form: Optional[Dict[str, str]] = None,
164+
json: Optional[Union[BaseModel, Dict]] = None,
165+
files: Optional[Dict[str, FileType]] = None,
166+
headers: Optional[Dict[str, str]] = None,
167+
timeout: Optional[int] = None,
168+
) -> T:
169+
url = self._build_url(path)
170+
_headers = self._set_headers(headers)
171+
172+
_timeout = timeout if timeout is not None else self.timeout
173+
174+
if isinstance(json, BaseModel):
175+
if pydantic.VERSION.startswith('1'):
176+
json = json.dict(by_alias=True)
177+
else:
178+
json = json.model_dump(by_alias=True)
179+
180+
try:
181+
response = await self.http_client.arequest(
182+
method,
183+
url,
184+
params=params,
185+
data=form,
186+
json=json,
187+
files=files,
188+
headers=_headers,
189+
timeout=_timeout
190+
)
191+
except httpx.HTTPError as e:
192+
logger.error(f"Http client request failed, path: {path}, err: {e}.")
193+
raise consts.NetworkError from e
194+
195+
return parse_response(url, response, response_model)
196+
156197
async def apost_stream(
157198
self,
158199
path: str,
159200
json: Union[BaseModel, Dict] = None,
160201
timeout: Optional[int] = None,
161202
):
162-
"""发起异步流式POST请求,返回stream_context"""
203+
"""Initiate asynchronous streaming POST request, return stream_context"""
163204
url = self._build_url(path)
164205
headers = self._set_headers({"Content-Type": "application/json"})
165206

@@ -169,8 +210,8 @@ async def apost_stream(
169210
_timeout = timeout if timeout is not None else self.timeout
170211

171212
try:
172-
# 返回stream_context,让StreamReader管理上下文
173-
stream_context = self.http_client.stream(
213+
# Return stream_context, let StreamReader manage context
214+
stream_context = self.http_client.astream(
174215
"POST",
175216
url,
176217
json=json,

cozeloop/internal/httpclient/http_client.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
# SPDX-License-Identifier: MIT
33

44
import logging
5-
from typing import Dict, Type, TypeVar
5+
import typing
6+
from typing import Dict, Type, TypeVar, Any, Generator
67

78
import httpx
89
import pydantic
10+
from httpx import URL, Response
911
from pydantic import ValidationError
1012

1113
from cozeloop.internal import consts
@@ -16,9 +18,24 @@
1618
T = TypeVar('T', bound=BaseResponse)
1719

1820

19-
class HTTPClient(httpx.Client):
21+
class HTTPClient:
2022
def __init__(self):
21-
super().__init__()
23+
self.sync_client = httpx.Client()
24+
self.async_client = httpx.AsyncClient()
25+
26+
def request(self, method: str, url: URL | str, **kwargs: Any) -> Response:
27+
return self.sync_client.request(method, url, **kwargs)
28+
29+
def stream(self, method: str, url: URL | str, **kwargs: Any):
30+
"""Return synchronous stream context manager"""
31+
return self.sync_client.stream(method, url, **kwargs)
32+
33+
async def arequest(self, method: str, url: URL | str, **kwargs: Any) -> Response:
34+
return await self.async_client.request(method, url, **kwargs)
35+
36+
def astream(self, method: str, url: URL | str, **kwargs: Any):
37+
"""Return asynchronous stream context manager"""
38+
return self.async_client.stream(method, url, **kwargs)
2239

2340

2441
def _check_oauth_error(body: Dict, http_code: int, log_id: str) -> None:
@@ -63,4 +80,4 @@ def parse_response(url: str, response: httpx.Response, response_model: Type[T])
6380
logger.error(f"Failed to parse response. Path: {url}, http code: {http_code}, log id: {log_id}, error: {e}.")
6481
raise consts.InternalError from e
6582
logger.debug(f"Call remote service success. Path: {url}, response: {res}, log id: {log_id}")
66-
return res
83+
return res

0 commit comments

Comments
 (0)