Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion dotnet/test/E2E/CopilotRequestE2EProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ private static HttpResponseMessage BuildInferenceResponse(string url, string bod

if (u.EndsWith("/messages", StringComparison.Ordinal))
{
return Json(BufferedAnthropicMessageJson);
return wantsStream
? Sse(string.Concat(AnthropicStreamEvents))
: Json(BufferedAnthropicMessageJson);
}

// /chat/completions non-streaming (and any other inference url) — buffered JSON.
Expand Down Expand Up @@ -158,6 +160,20 @@ internal static HttpResponseMessage BuildNonInferenceResponse(string url)
"data: [DONE]\n\n",
];

// Anthropic Messages streaming (SSE) sequence. Emitted when the runtime issues a
// streaming /messages request (stream: true); the buffered JSON below is only valid
// for non-streaming requests, and returning it for a streaming request makes the
// runtime's Anthropic client fail with "stream ended without producing a Message".
private static readonly string[] AnthropicStreamEvents =
[
"event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_stub_1\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude-sonnet-4.5\",\"content\":[],\"stop_reason\":null,\"stop_sequence\":null,\"usage\":{\"input_tokens\":5,\"output_tokens\":1}}}\n\n",
"event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n",
"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"" + SyntheticText + "\"}}\n\n",
"event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n",
"event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"output_tokens\":7}}\n\n",
"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
];

private static readonly string BufferedResponseJson =
"{\"id\":\"resp_stub_1\",\"object\":\"response\",\"status\":\"completed\",\"output\":[{\"id\":\"msg_1\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"" + SyntheticText + "\"}]}],\"usage\":{\"input_tokens\":5,\"output_tokens\":7,\"total_tokens\":12}}";

Expand Down
44 changes: 44 additions & 0 deletions go/internal/e2e/copilot_request_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,47 @@ func buildResponsesSSEBody(text, respID string) string {
return sb.String()
}

// buildAnthropicMessageSSEBody returns a complete Anthropic Messages SSE body for a
// streaming /messages response (message_start … message_stop). The buffered JSON
// message is only valid for a non-streaming request; a streaming request expects
// named SSE events or the runtime fails to finalize the message.
func buildAnthropicMessageSSEBody(text string) string {
events := []struct {
name string
data map[string]any
}{
{"message_start", map[string]any{
"type": "message_start",
"message": map[string]any{
"id": "msg_stub_1", "type": "message", "role": "assistant",
"model": "claude-sonnet-4.5", "content": []any{},
"stop_reason": nil, "stop_sequence": nil,
"usage": map[string]any{"input_tokens": 5, "output_tokens": 1},
},
}},
{"content_block_start", map[string]any{
"type": "content_block_start", "index": 0,
"content_block": map[string]any{"type": "text", "text": ""},
}},
{"content_block_delta", map[string]any{
"type": "content_block_delta", "index": 0,
"delta": map[string]any{"type": "text_delta", "text": text},
}},
{"content_block_stop", map[string]any{"type": "content_block_stop", "index": 0}},
{"message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{"stop_reason": "end_turn", "stop_sequence": nil},
"usage": map[string]any{"output_tokens": 7},
}},
{"message_stop", map[string]any{"type": "message_stop"}},
}
var sb strings.Builder
for _, event := range events {
sb.WriteString(sseFrame(event.name, event.data))
}
return sb.String()
}

// buildInferenceResponse synthesizes a well-formed inference HTTP response.
func buildInferenceResponse(url string, bodyText string) *http.Response {
wantsStream := isStreamingRequest(bodyText)
Expand Down Expand Up @@ -167,6 +208,9 @@ func buildInferenceResponse(url string, bodyText string) *http.Response {
}

if strings.HasSuffix(u, "/messages") {
if wantsStream {
return buildSSEResponse(buildAnthropicMessageSSEBody(syntheticResponseText))
}
raw, _ := json.Marshal(map[string]any{
"id": "msg_stub_1",
"type": "message",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,58 @@ static String sseBody(String text, String respId) {
return sb.toString();
}

/**
* Builds a complete Anthropic Messages SSE body (message_start … message_stop)
* for a streaming {@code /messages} response. The buffered JSON message is only
* valid for a non-streaming request; a streaming request expects named SSE
* events or the runtime fails to finalize the message.
*/
static String anthropicMessageSseBody(String text) {
Map<String, Object> startMessage = new LinkedHashMap<>();
startMessage.put("id", "msg_stub_1");
startMessage.put("type", "message");
startMessage.put("role", "assistant");
startMessage.put("model", "claude-sonnet-4.5");
startMessage.put("content", List.of());
startMessage.put("stop_reason", null);
startMessage.put("stop_sequence", null);
startMessage.put("usage", Map.of("input_tokens", 5, "output_tokens", 1));
Map<String, Object> messageStart = new LinkedHashMap<>();
messageStart.put("type", "message_start");
messageStart.put("message", startMessage);

Map<String, Object> contentBlockStart = new LinkedHashMap<>();
contentBlockStart.put("type", "content_block_start");
contentBlockStart.put("index", 0);
contentBlockStart.put("content_block", Map.of("type", "text", "text", ""));

Map<String, Object> contentBlockDelta = new LinkedHashMap<>();
contentBlockDelta.put("type", "content_block_delta");
contentBlockDelta.put("index", 0);
contentBlockDelta.put("delta", Map.of("type", "text_delta", "text", text));

Map<String, Object> contentBlockStop = new LinkedHashMap<>();
contentBlockStop.put("type", "content_block_stop");
contentBlockStop.put("index", 0);

Map<String, Object> messageDeltaDelta = new LinkedHashMap<>();
messageDeltaDelta.put("stop_reason", "end_turn");
messageDeltaDelta.put("stop_sequence", null);
Map<String, Object> messageDelta = new LinkedHashMap<>();
messageDelta.put("type", "message_delta");
messageDelta.put("delta", messageDeltaDelta);
messageDelta.put("usage", Map.of("output_tokens", 7));

StringBuilder sb = new StringBuilder();
sb.append(sse("message_start", messageStart));
sb.append(sse("content_block_start", contentBlockStart));
sb.append(sse("content_block_delta", contentBlockDelta));
sb.append(sse("content_block_stop", contentBlockStop));
sb.append(sse("message_delta", messageDelta));
sb.append(sse("message_stop", Map.of("type", "message_stop")));
return sb.toString();
}

// --- Synthetic response builders for the CopilotRequestHandler send override
// ---

Expand Down Expand Up @@ -191,6 +243,9 @@ static HttpResponse<InputStream> buildInferenceResponse(String url, String bodyT
}

if (u.endsWith("/messages")) {
if (stream) {
return sseResponse(anthropicMessageSseBody(text));
}
Map<String, Object> body = new LinkedHashMap<>();
body.put("id", "msg_stub_1");
body.put("type", "message");
Expand Down
59 changes: 58 additions & 1 deletion nodejs/test/e2e/session_config.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,59 @@ describe("Session Configuration", async () => {
});
}

function sse(body: string): Response {
return new Response(body, {
status: 200,
headers: { "content-type": "text/event-stream" },
});
}

function anthropicMessageStreamBody(text: string): string {
const events: Array<[string, unknown]> = [
[
"message_start",
{
type: "message_start",
message: {
id: "msg_stub_1",
type: "message",
role: "assistant",
model: "claude-sonnet-4.5",
content: [],
stop_reason: null,
stop_sequence: null,
usage: { input_tokens: 5, output_tokens: 1 },
},
},
],
[
"content_block_start",
{
type: "content_block_start",
index: 0,
content_block: { type: "text", text: "" },
},
],
[
"content_block_delta",
{ type: "content_block_delta", index: 0, delta: { type: "text_delta", text } },
],
["content_block_stop", { type: "content_block_stop", index: 0 }],
[
"message_delta",
{
type: "message_delta",
delta: { stop_reason: "end_turn", stop_sequence: null },
usage: { output_tokens: 7 },
},
],
["message_stop", { type: "message_stop" }],
];
return events
.map(([event, data]) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`)
.join("");
}

function buildNonInferenceResponse(url: string): Response {
const u = url.toLowerCase();
if (u.endsWith("/models")) {
Expand Down Expand Up @@ -342,9 +395,13 @@ describe("Session Configuration", async () => {
return json({});
}

function buildInferenceResponse(url: string, _body: string): Response {
function buildInferenceResponse(url: string, body: string): Response {
const u = url.toLowerCase();
const wantsStream = /"stream"\s*:\s*true/.test(body);
if (u.endsWith("/messages")) {
if (wantsStream) {
return sse(anthropicMessageStreamBody("OK from the synthetic stream."));
}
return json({
id: "msg_stub_1",
type: "message",
Expand Down
51 changes: 51 additions & 0 deletions python/e2e/_copilot_request_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,57 @@ def build_inference_response(request: httpx.Request, text: str = SYNTHETIC_TEXT)
)

if url.endswith("/messages"):
if wants_stream:
events = [
(
"message_start",
{
"type": "message_start",
"message": {
"id": "msg_stub_1",
"type": "message",
"role": "assistant",
"model": "claude-sonnet-4.5",
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 5, "output_tokens": 1},
},
},
),
(
"content_block_start",
{
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
},
),
(
"content_block_delta",
{
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": text},
},
),
("content_block_stop", {"type": "content_block_stop", "index": 0}),
(
"message_delta",
{
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"output_tokens": 7},
},
),
("message_stop", {"type": "message_stop"}),
]
stream_body = "".join(sse(event, data) for event, data in events)
return httpx.Response(
200,
headers={"content-type": "text/event-stream"},
content=stream_body.encode(),
)
return httpx.Response(
200,
headers={"content-type": "application/json"},
Expand Down
79 changes: 77 additions & 2 deletions rust/tests/e2e/session_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl CopilotRequestHandler for RecordingHandler {
body: request.body.clone(),
});
if is_inference_url(&request.url) {
return Ok(synth_inference_response(&request.url));
return Ok(synth_inference_response(&request.url, &request.body));
}
Ok(synth_non_inference_response(&request.url))
}
Expand Down Expand Up @@ -329,6 +329,78 @@ fn http_response(status: u16, headers: HeaderMap, body: Value) -> CopilotHttpRes
CopilotHttpResponse::new(status, None, headers, Box::pin(stream))
}

fn sse_response(body: String) -> CopilotHttpResponse {
let mut headers = HeaderMap::new();
headers.insert(
"content-type",
HeaderValue::from_static("text/event-stream"),
);
let stream = futures_util::stream::once(async move {
Ok::<Bytes, CopilotRequestError>(Bytes::from(body.into_bytes()))
});
CopilotHttpResponse::new(200, None, headers, Box::pin(stream))
}

fn wants_stream(body: &[u8]) -> bool {
String::from_utf8_lossy(body)
.replace(char::is_whitespace, "")
.contains("\"stream\":true")
}

fn anthropic_message_stream_body(text: &str) -> String {
let events = [
(
"message_start",
json!({
"type": "message_start",
"message": {
"id": "msg_stub_1",
"type": "message",
"role": "assistant",
"model": "claude-sonnet-4.5",
"content": [],
"stop_reason": null,
"stop_sequence": null,
"usage": { "input_tokens": 5, "output_tokens": 1 },
},
}),
),
(
"content_block_start",
json!({
"type": "content_block_start",
"index": 0,
"content_block": { "type": "text", "text": "" },
}),
),
(
"content_block_delta",
json!({
"type": "content_block_delta",
"index": 0,
"delta": { "type": "text_delta", "text": text },
}),
),
(
"content_block_stop",
json!({ "type": "content_block_stop", "index": 0 }),
),
(
"message_delta",
json!({
"type": "message_delta",
"delta": { "stop_reason": "end_turn", "stop_sequence": null },
"usage": { "output_tokens": 7 },
}),
),
("message_stop", json!({ "type": "message_stop" })),
];
events
.iter()
.map(|(name, data)| format!("event: {name}\ndata: {data}\n\n"))
.collect()
}

fn synth_non_inference_response(url: &str) -> CopilotHttpResponse {
let lower = url.to_lowercase();
if lower.ends_with("/models") {
Expand Down Expand Up @@ -369,9 +441,12 @@ fn synth_non_inference_response(url: &str) -> CopilotHttpResponse {
http_response(200, json_headers(), json!({}))
}

fn synth_inference_response(url: &str) -> CopilotHttpResponse {
fn synth_inference_response(url: &str, body: &[u8]) -> CopilotHttpResponse {
let lower = url.to_lowercase();
if lower.ends_with("/messages") {
if wants_stream(body) {
return sse_response(anthropic_message_stream_body(SYNTHETIC_TEXT));
}
return http_response(
200,
json_headers(),
Expand Down
Loading