Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,19 @@
*/
package io.agentscope.core.studio;

import io.agentscope.core.hook.ActingChunkEvent;
import io.agentscope.core.hook.Hook;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.hook.PostCallEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.hook.ReasoningChunkEvent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.ToolResultBlock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -91,6 +100,112 @@ public <T extends HookEvent> Mono<T> onEvent(T event) {
return Mono.just(event);
});
}

// Reasoning incremental chunk
if (event instanceof ReasoningChunkEvent) {
ReasoningChunkEvent e = (ReasoningChunkEvent) event;
Msg chunk = e.getIncrementalChunk();
if (chunk != null && studioClient != null) {
Msg tagged = withEventMetadata(chunk, "reasoning", false);
return studioClient
.pushMessage(tagged)
.thenReturn(event)
.onErrorResume(
ex -> {
logger.error("Failed to push reasoning chunk to Studio", ex);
return Mono.just(event);
});
}
return Mono.just(event);
}

// Reasoning final result (after stream completes)
if (event instanceof PostReasoningEvent) {
PostReasoningEvent e = (PostReasoningEvent) event;
Msg finalMsg = e.getReasoningMessage();
if (finalMsg != null && studioClient != null) {
Msg tagged = withEventMetadata(finalMsg, "reasoning", true);
return studioClient
.pushMessage(tagged)
.thenReturn(event)
.onErrorResume(
ex -> {
logger.error("Failed to push final reasoning to Studio", ex);
return Mono.just(event);
});
}
return Mono.just(event);
}

// Acting (tool) incremental chunk
if (event instanceof ActingChunkEvent) {
ActingChunkEvent e = (ActingChunkEvent) event;
ToolResultBlock chunk = e.getChunk();
if (chunk != null && studioClient != null) {
// build a Msg with TOOL role similar to streaming hook
Msg toolMsg =
Msg.builder()
.name("tool")
.role(MsgRole.TOOL)
.content(List.of(chunk))
.build();
Msg tagged = withEventMetadata(toolMsg, "tool_result", false);
return studioClient
.pushMessage(tagged)
.thenReturn(event)
.onErrorResume(
ex -> {
logger.error("Failed to push acting chunk to Studio", ex);
return Mono.just(event);
});
}
return Mono.just(event);
}

// Acting (tool) final result
if (event instanceof PostActingEvent) {
PostActingEvent e = (PostActingEvent) event;
ToolResultBlock result = e.getToolResult();
if (result != null && studioClient != null) {
Msg toolMsg =
Msg.builder()
.name("tool")
.role(MsgRole.TOOL)
.content(List.of(result))
.build();
Msg tagged = withEventMetadata(toolMsg, "tool_result", true);
return studioClient
.pushMessage(tagged)
.thenReturn(event)
.onErrorResume(
ex -> {
logger.error("Failed to push tool result to Studio", ex);
return Mono.just(event);
});
}
return Mono.just(event);
}

return Mono.just(event);
}

/**
* Wrap/augment a Msg with metadata that indicates event type and whether it's the last chunk.
*/
private Msg withEventMetadata(Msg orig, String eventType, boolean isLast) {
Map<String, Object> meta = new HashMap<>();
if (orig.getMetadata() != null) {
meta.putAll(orig.getMetadata());
}
meta.put("studio_event_type", eventType);
meta.put("studio_is_last", isLast);
// Rebuild msg copying fields and injecting metadata; adapt if Msg.Builder has different
// methods
return Msg.builder()
.name(orig.getName())
.role(orig.getRole())
.content(orig.getContent())
.metadata(meta)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,31 @@

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.agentscope.core.agent.Agent;
import io.agentscope.core.hook.ActingChunkEvent;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.hook.PostCallEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.hook.PreCallEvent;
import io.agentscope.core.hook.ReasoningChunkEvent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.message.ToolResultBlock;
import io.agentscope.core.message.ToolUseBlock;
import io.agentscope.core.tool.Toolkit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -229,4 +238,150 @@ void testMultipleEvents() {
verify(mockStudioClient, times(1)).pushMessage(msg1);
verify(mockStudioClient, times(1)).pushMessage(msg2);
}

@Test
@DisplayName("Should forward reasoning chunk to Studio")
void testReasoningChunkEventForwards() {
// Mock push success
when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty());

// Build incremental and accumulated msg
Msg incremental =
Msg.builder()
.name("Assistant")
.role(MsgRole.ASSISTANT)
.content(TextBlock.builder().text("partial chunk").build())
.build();
Msg accumulated =
Msg.builder()
.name("Assistant")
.role(MsgRole.ASSISTANT)
.content(TextBlock.builder().text("accumulated so far").build())
.build();

ReasoningChunkEvent event =
new ReasoningChunkEvent(mockAgent, "test-model", null, incremental, accumulated);

Mono<HookEvent> result = hook.onEvent(event);

StepVerifier.create(result).expectNext(event).verifyComplete();

verify(mockStudioClient, times(1))
.pushMessage(
argThat(
m -> {
Map<String, Object> md = m.getMetadata();
return md != null
&& "reasoning".equals(md.get("studio_event_type"))
&& Boolean.FALSE.equals(md.get("studio_is_last"));
}));
}

@Test
@DisplayName("Should forward final reasoning result to Studio")
void testPostReasoningEventForwards() {
// Mock push success
when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty());

Msg finalMsg =
Msg.builder()
.name("Assistant")
.role(MsgRole.ASSISTANT)
.content(TextBlock.builder().text("final reasoning").build())
.build();

PostReasoningEvent event = new PostReasoningEvent(mockAgent, "test-model", null, finalMsg);

Mono<HookEvent> result = hook.onEvent(event);

StepVerifier.create(result).expectNext(event).verifyComplete();

verify(mockStudioClient, times(1))
.pushMessage(
argThat(
m -> {
Map<String, Object> md = m.getMetadata();
return md != null
&& "reasoning".equals(md.get("studio_event_type"))
&& Boolean.TRUE.equals(md.get("studio_is_last"));
}));
}

@Test
@DisplayName("Should forward acting (tool) chunk to Studio")
void testActingChunkEventForwards() {
// Mock push success
when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty());

Toolkit toolkit = new Toolkit();
ToolUseBlock toolUse =
ToolUseBlock.builder().id("call-1").name("test_tool").input(Map.of()).build();
ToolResultBlock chunk = ToolResultBlock.text("progress update");

ActingChunkEvent event = new ActingChunkEvent(mockAgent, toolkit, toolUse, chunk);

Mono<HookEvent> result = hook.onEvent(event);

StepVerifier.create(result).expectNext(event).verifyComplete();

verify(mockStudioClient, times(1))
.pushMessage(
argThat(
m -> {
// Expect a TOOL role message with metadata and a
// ToolResultBlock in content
Map<String, Object> md = m.getMetadata();
boolean hasMeta =
md != null
&& "tool_result"
.equals(md.get("studio_event_type"))
&& Boolean.FALSE.equals(
md.get("studio_is_last"));
boolean isToolRole = m.getRole() == MsgRole.TOOL;
boolean hasToolResult =
!m.getContentBlocks(
io.agentscope.core.message
.ToolResultBlock.class)
.isEmpty();
return hasMeta && isToolRole && hasToolResult;
}));
}

@Test
@DisplayName("Should forward final acting (tool) result to Studio")
void testPostActingEventForwards() {
// Mock push success
when(mockStudioClient.pushMessage(any(Msg.class))).thenReturn(Mono.empty());

Toolkit toolkit = new Toolkit();
ToolUseBlock toolUse =
ToolUseBlock.builder().id("call-1").name("test_tool").input(Map.of()).build();
ToolResultBlock resultBlock = ToolResultBlock.text("final result");

PostActingEvent event = new PostActingEvent(mockAgent, toolkit, toolUse, resultBlock);

Mono<HookEvent> result = hook.onEvent(event);

StepVerifier.create(result).expectNext(event).verifyComplete();

verify(mockStudioClient, times(1))
.pushMessage(
argThat(
m -> {
Map<String, Object> md = m.getMetadata();
boolean hasMeta =
md != null
&& "tool_result"
.equals(md.get("studio_event_type"))
&& Boolean.TRUE.equals(
md.get("studio_is_last"));
boolean isToolRole = m.getRole() == MsgRole.TOOL;
boolean hasToolResult =
!m.getContentBlocks(
io.agentscope.core.message
.ToolResultBlock.class)
.isEmpty();
return hasMeta && isToolRole && hasToolResult;
}));
}
}
Loading