Skip to content

Commit 1ea1ec4

Browse files
iroquetaBeta Bot
authored andcommitted
Cherry pick branch 'genexuslabs:StreamToClientInChatAgent' into beta
1 parent 88adc0e commit 1ea1ec4

File tree

4 files changed

+86
-72
lines changed

4 files changed

+86
-72
lines changed

java/src/main/java/com/genexus/GXProcedure.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -275,15 +275,25 @@ protected String callAssistant(String agent, GXProperties properties, ArrayList<
275275
}
276276

277277
protected ChatResult chatAgent(String agent, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result) {
278-
callAgent(agent, true, properties, messages, result);
279-
return new ChatResult(this, agent, properties, messages, result, client);
278+
ChatResult chatResult = new ChatResult();
279+
280+
new Thread(() -> {
281+
try {
282+
context.setThreadModelContext(context);
283+
callAgent(agent, true, properties, messages, result, chatResult);
284+
} finally {
285+
chatResult.markDone();
286+
}
287+
}).start();
288+
289+
return chatResult;
280290
}
281291

282292
protected String callAgent(String agent, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result) {
283-
return callAgent(agent, false, properties, messages, result);
293+
return callAgent(agent, false, properties, messages, result, null);
284294
}
285295

286-
protected String callAgent(String agent, boolean stream, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result) {
296+
protected String callAgent(String agent, boolean stream, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result, ChatResult chatResult) {
287297
OpenAIRequest aiRequest = new OpenAIRequest();
288298
aiRequest.setModel(String.format("saia:agent:%s", agent));
289299
if (!messages.isEmpty())
@@ -292,15 +302,15 @@ protected String callAgent(String agent, boolean stream, GXProperties properties
292302
if (stream)
293303
aiRequest.setStream(true);
294304
client = new HttpClient();
295-
OpenAIResponse aiResponse = SaiaService.call(aiRequest, client, result);
305+
OpenAIResponse aiResponse = SaiaService.call(this, aiRequest, client, agent, stream, properties, messages, result, chatResult);
296306
if (aiResponse != null && aiResponse.getChoices() != null) {
297307
for (OpenAIResponse.Choice element : aiResponse.getChoices()) {
298308
String finishReason = element.getFinishReason();
299309
if (finishReason.equals("stop"))
300310
return element.getMessage().getStringContent();
301311
if (finishReason.equals("tool_calls")) {
302312
messages.add(element.getMessage());
303-
return processNotChunkedResponse(agent, stream, properties, messages, result, element.getMessage().getToolCalls());
313+
return processNotChunkedResponse(agent, stream, properties, messages, result, chatResult, element.getMessage().getToolCalls());
304314
}
305315
}
306316
} else if (client.getStatusCode() == 200) {
@@ -309,11 +319,11 @@ protected String callAgent(String agent, boolean stream, GXProperties properties
309319
return "";
310320
}
311321

312-
public String processNotChunkedResponse(String agent, boolean stream, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result, ArrayList<OpenAIResponse.ToolCall> toolCalls) {
322+
public String processNotChunkedResponse(String agent, boolean stream, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result, ChatResult chatResult, ArrayList<OpenAIResponse.ToolCall> toolCalls) {
313323
for (OpenAIResponse.ToolCall tollCall : toolCalls) {
314324
processToolCall(tollCall, messages);
315325
}
316-
return callAgent(agent, stream, properties, messages, result);
326+
return callAgent(agent, stream, properties, messages, result, chatResult);
317327
}
318328

319329
private void processToolCall(OpenAIResponse.ToolCall toolCall, ArrayList<OpenAIResponse.Message> messages) {
Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,38 @@
11
package com.genexus.util;
22

3-
import com.fasterxml.jackson.databind.ObjectMapper;
4-
import com.genexus.GXProcedure;
5-
import com.genexus.internet.HttpClient;
6-
import com.genexus.util.saia.OpenAIResponse;
7-
import org.json.JSONObject;
8-
9-
import java.util.ArrayList;
3+
import java.util.concurrent.BlockingQueue;
4+
import java.util.concurrent.LinkedBlockingQueue;
105

116
public class ChatResult {
12-
private HttpClient client = null;
13-
private String agent = null;
14-
private GXProperties properties = null;
15-
private ArrayList<OpenAIResponse.Message> messages = null;
16-
private CallResult result = null;
17-
private GXProcedure agentProcedure = null;
18-
19-
public ChatResult() {
20-
}
7+
private static final String END_MARKER = new String("__END__");
8+
private final BlockingQueue<String> chunks = new LinkedBlockingQueue<>();
9+
private volatile boolean done = false;
2110

22-
public ChatResult(GXProcedure agentProcedure, String agent, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result, HttpClient client) {
23-
this.agentProcedure = agentProcedure;
24-
this.agent = agent;
25-
this.properties = properties;
26-
this.messages = messages;
27-
this.result = result;
28-
this.client = client;
11+
public synchronized void addChunk(String chunk) {
12+
if (chunk != null) {
13+
chunks.offer(chunk);
14+
}
2915
}
3016

31-
public boolean hasMoreData() {
32-
return !client.getEof();
17+
public void markDone() {
18+
done = true;
19+
chunks.offer(END_MARKER);
3320
}
3421

3522
public String getMoreData() {
36-
String data = client.readChunk();
37-
if (data.isEmpty())
38-
return "";
39-
int index = data.indexOf("data:") + "data:".length();
40-
String chunkJson = data.substring(index).trim();
4123
try {
42-
JSONObject jsonResponse = new JSONObject(chunkJson);
43-
OpenAIResponse chunkResponse = new ObjectMapper().readValue(jsonResponse.toString(), OpenAIResponse.class);
44-
OpenAIResponse.Choice choise = chunkResponse.getChoices().get(0);
45-
String chunkString = choise.getDelta().getStringContent();
46-
if (chunkString == null)
24+
String chunk = chunks.take();
25+
if (END_MARKER.equals(chunk)) {
4726
return "";
48-
return chunkString;
49-
}
50-
catch (Exception e) {
27+
}
28+
return chunk;
29+
} catch (InterruptedException e) {
30+
Thread.currentThread().interrupt();
5131
return "";
5232
}
5333
}
54-
}
34+
35+
public boolean hasMoreData() {
36+
return !(done && chunks.isEmpty());
37+
}
38+
}

java/src/main/java/com/genexus/util/saia/SaiaService.java

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,36 @@
11
package com.genexus.util.saia;
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.genexus.GXProcedure;
45
import com.genexus.SdtMessages_Message;
56
import com.genexus.common.interfaces.SpecificImplementation;
67
import com.genexus.diagnostics.core.ILogger;
78
import com.genexus.diagnostics.core.LogManager;
89
import com.genexus.internet.HttpClient;
10+
import com.genexus.util.ChatResult;
11+
import com.genexus.util.GXProperties;
912
import org.json.JSONObject;
1013
import com.genexus.util.CallResult;
1114
import org.slf4j.Logger;
1215
import org.slf4j.LoggerFactory;
1316

17+
import java.util.ArrayList;
18+
1419
public class SaiaService {
1520
private static final ILogger logger = LogManager.getLogger(SaiaService.class);
1621
private static final String apiKey = (String) SpecificImplementation.Application.getProperty("AI_PROVIDER_API_KEY", "");;
1722
private static final String aiProvider = (String) SpecificImplementation.Application.getProperty("AI_PROVIDER", "");
1823
private static final Logger log = LoggerFactory.getLogger(SaiaService.class);
1924

20-
public static OpenAIResponse call(OpenAIRequest request, HttpClient client, CallResult result) {
21-
return call(request, false, client, result);
25+
public static OpenAIResponse call(GXProcedure proc, OpenAIRequest request, HttpClient client, String agent, boolean stream, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result, ChatResult chatResult) {
26+
return call(proc, request, false, client, agent, stream, properties, messages, result, chatResult);
2227
}
2328

2429
public static OpenAIResponse call(OpenAIRequest request, boolean isEmbedding, CallResult result) {
25-
return call(request, isEmbedding, new HttpClient(), result);
30+
return call(null, request, isEmbedding, new HttpClient(), null, false, null, null, result, null);
2631
}
2732

28-
public static OpenAIResponse call(OpenAIRequest request, boolean isEmbedding, HttpClient client, CallResult result) {
33+
public static OpenAIResponse call(GXProcedure proc, OpenAIRequest request, boolean isEmbedding, HttpClient client, String agent, boolean stream, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result, ChatResult chatResult) {
2934
try {
3035
String jsonRequest = new ObjectMapper().writeValueAsString(request);
3136
logger.debug("Agent payload: " + jsonRequest);
@@ -44,25 +49,8 @@ public static OpenAIResponse call(OpenAIRequest request, boolean isEmbedding, Ht
4449
if (client.getStatusCode() == 200) {
4550
String saiaResponse;
4651
if (client.getHeader("Content-Type").contains("text/event-stream")){
47-
saiaResponse = client.readChunk();
48-
int index = saiaResponse.indexOf("data:") + "data:".length();
49-
String chunkJson = saiaResponse.substring(index).trim();
50-
try {
51-
JSONObject jsonResponse = new JSONObject(chunkJson);
52-
OpenAIResponse chunkResponse = new ObjectMapper().readValue(jsonResponse.toString(), OpenAIResponse.class);
53-
OpenAIResponse.Choice choise = chunkResponse.getChoices().get(0);
54-
if (choise.getFinishReason() != null && choise.getFinishReason().equals("tool_calls")){
55-
saiaResponse = chunkJson;
56-
}
57-
else {
58-
client.unreadChunk();
59-
return null;
60-
}
61-
}
62-
catch (Exception e) {
63-
client.unreadChunk();
64-
return null;
65-
}
52+
getChunkedSaiaResponse(proc, client, agent, stream, properties, messages, result, chatResult);
53+
return null;
6654
}
6755
else {
6856
saiaResponse = client.getString();
@@ -88,6 +76,38 @@ public static OpenAIResponse call(OpenAIRequest request, boolean isEmbedding, Ht
8876
return null;
8977
}
9078

79+
private static void getChunkedSaiaResponse(GXProcedure proc, HttpClient client, String agent, boolean stream, GXProperties properties, ArrayList<OpenAIResponse.Message> messages, CallResult result, ChatResult chatResult) {
80+
String saiaChunkResponse = client.readChunk();;
81+
String chunkJson;
82+
while (!client.getEof()) {
83+
logger.debug("Agent response chunk: " + saiaChunkResponse);
84+
if (saiaChunkResponse.isEmpty() || saiaChunkResponse.equals("data: [DONE]")) {
85+
saiaChunkResponse = client.readChunk();
86+
continue;
87+
}
88+
int index = saiaChunkResponse.indexOf("data:") + "data:".length();
89+
chunkJson = saiaChunkResponse.substring(index).trim();
90+
try {
91+
JSONObject jsonResponse = new JSONObject(chunkJson);
92+
OpenAIResponse chunkResponse = new ObjectMapper().readValue(jsonResponse.toString(), OpenAIResponse.class);
93+
if (!chunkResponse.getChoices().isEmpty()) {
94+
OpenAIResponse.Choice choice = chunkResponse.getChoices().get(0);
95+
if (choice.getFinishReason() != null && choice.getFinishReason().equals("tool_calls")) {
96+
messages.add(choice.getMessage());
97+
proc.processNotChunkedResponse(agent, stream, properties, messages, result, chatResult, choice.getMessage().getToolCalls());
98+
;
99+
} else if (choice.getDelta() != null && choice.getDelta().getContent() != null) {
100+
chatResult.addChunk(((OpenAIResponse.StringContent) choice.getDelta().getContent()).getValue());
101+
}
102+
}
103+
saiaChunkResponse = client.readChunk();
104+
}
105+
catch (Exception e) {
106+
logger.warn("Error deserializing the response chunk", e);
107+
saiaChunkResponse = client.readChunk();
108+
}
109+
}
110+
}
91111

92112
private static void addResultMessage(String id, byte type, String description, CallResult result){
93113
if (type == 1)

java/src/test/java/com/genexus/agent/Agent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ else if (AV3Parameter1.equals("chat_stream")) {
8484
messages.add(message);
8585
ChatResult chatResult = chatAgent( "The weatherman", Gxproperties, messages, new CallResult()) ;
8686
while (chatResult.hasMoreData()) {
87-
System.out.print(chatResult.hasMoreData());
87+
System.out.print(chatResult.getMoreData());
8888
}
8989
}
9090
else if (AV3Parameter1.equals("toolcall")) {
@@ -110,7 +110,7 @@ else if (AV3Parameter1.equals("toolcall_stream")) {
110110
messages.add(message);
111111
ChatResult chatResult = chatAgent( "ProductInfo", Gxproperties, messages, new CallResult()) ;
112112
while (chatResult.hasMoreData()) {
113-
System.out.print(chatResult.hasMoreData());
113+
System.out.print(chatResult.getMoreData());
114114
}
115115
}
116116
else {

0 commit comments

Comments
 (0)