diff --git a/dotnet/test/E2E/CopilotRequestE2EProvider.cs b/dotnet/test/E2E/CopilotRequestE2EProvider.cs index e8e483556..bade5c6e5 100644 --- a/dotnet/test/E2E/CopilotRequestE2EProvider.cs +++ b/dotnet/test/E2E/CopilotRequestE2EProvider.cs @@ -96,7 +96,9 @@ private static HttpResponseMessage BuildInferenceResponse(string url, string bod if (u.EndsWith("/messages", StringComparison.Ordinal)) { - return Json(BufferedAnthropicMessageJson); + return wantsStream + ? Sse(string.Concat(AnthropicStreamEvents)) + : Json(BufferedAnthropicMessageJson); } // /chat/completions non-streaming (and any other inference url) — buffered JSON. @@ -158,6 +160,20 @@ internal static HttpResponseMessage BuildNonInferenceResponse(string url) "data: [DONE]\n\n", ]; + // Anthropic Messages streaming (SSE) sequence. Emitted when the runtime issues a + // streaming /messages request (stream: true); the buffered JSON below is only valid + // for non-streaming requests, and returning it for a streaming request makes the + // runtime's Anthropic client fail with "stream ended without producing a Message". + private static readonly string[] AnthropicStreamEvents = + [ + "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_stub_1\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude-sonnet-4.5\",\"content\":[],\"stop_reason\":null,\"stop_sequence\":null,\"usage\":{\"input_tokens\":5,\"output_tokens\":1}}}\n\n", + "event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n", + "event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"" + SyntheticText + "\"}}\n\n", + "event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n", + "event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"output_tokens\":7}}\n\n", + "event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n", + ]; + private static readonly string BufferedResponseJson = "{\"id\":\"resp_stub_1\",\"object\":\"response\",\"status\":\"completed\",\"output\":[{\"id\":\"msg_1\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"" + SyntheticText + "\"}]}],\"usage\":{\"input_tokens\":5,\"output_tokens\":7,\"total_tokens\":12}}"; diff --git a/go/internal/e2e/copilot_request_helpers_test.go b/go/internal/e2e/copilot_request_helpers_test.go index 7c6058207..81d14f4d9 100644 --- a/go/internal/e2e/copilot_request_helpers_test.go +++ b/go/internal/e2e/copilot_request_helpers_test.go @@ -128,6 +128,47 @@ func buildResponsesSSEBody(text, respID string) string { return sb.String() } +// buildAnthropicMessageSSEBody returns a complete Anthropic Messages SSE body for a +// streaming /messages response (message_start … message_stop). The buffered JSON +// message is only valid for a non-streaming request; a streaming request expects +// named SSE events or the runtime fails to finalize the message. +func buildAnthropicMessageSSEBody(text string) string { + events := []struct { + name string + data map[string]any + }{ + {"message_start", map[string]any{ + "type": "message_start", + "message": map[string]any{ + "id": "msg_stub_1", "type": "message", "role": "assistant", + "model": "claude-sonnet-4.5", "content": []any{}, + "stop_reason": nil, "stop_sequence": nil, + "usage": map[string]any{"input_tokens": 5, "output_tokens": 1}, + }, + }}, + {"content_block_start", map[string]any{ + "type": "content_block_start", "index": 0, + "content_block": map[string]any{"type": "text", "text": ""}, + }}, + {"content_block_delta", map[string]any{ + "type": "content_block_delta", "index": 0, + "delta": map[string]any{"type": "text_delta", "text": text}, + }}, + {"content_block_stop", map[string]any{"type": "content_block_stop", "index": 0}}, + {"message_delta", map[string]any{ + "type": "message_delta", + "delta": map[string]any{"stop_reason": "end_turn", "stop_sequence": nil}, + "usage": map[string]any{"output_tokens": 7}, + }}, + {"message_stop", map[string]any{"type": "message_stop"}}, + } + var sb strings.Builder + for _, event := range events { + sb.WriteString(sseFrame(event.name, event.data)) + } + return sb.String() +} + // buildInferenceResponse synthesizes a well-formed inference HTTP response. func buildInferenceResponse(url string, bodyText string) *http.Response { wantsStream := isStreamingRequest(bodyText) @@ -167,6 +208,9 @@ func buildInferenceResponse(url string, bodyText string) *http.Response { } if strings.HasSuffix(u, "/messages") { + if wantsStream { + return buildSSEResponse(buildAnthropicMessageSSEBody(syntheticResponseText)) + } raw, _ := json.Marshal(map[string]any{ "id": "msg_stub_1", "type": "message", diff --git a/java/src/test/java/com/github/copilot/CopilotRequestTestSupport.java b/java/src/test/java/com/github/copilot/CopilotRequestTestSupport.java index 734772458..7a2e7f2f0 100644 --- a/java/src/test/java/com/github/copilot/CopilotRequestTestSupport.java +++ b/java/src/test/java/com/github/copilot/CopilotRequestTestSupport.java @@ -122,6 +122,58 @@ static String sseBody(String text, String respId) { return sb.toString(); } + /** + * Builds a complete Anthropic Messages SSE body (message_start … message_stop) + * for a streaming {@code /messages} response. The buffered JSON message is only + * valid for a non-streaming request; a streaming request expects named SSE + * events or the runtime fails to finalize the message. + */ + static String anthropicMessageSseBody(String text) { + Map startMessage = new LinkedHashMap<>(); + startMessage.put("id", "msg_stub_1"); + startMessage.put("type", "message"); + startMessage.put("role", "assistant"); + startMessage.put("model", "claude-sonnet-4.5"); + startMessage.put("content", List.of()); + startMessage.put("stop_reason", null); + startMessage.put("stop_sequence", null); + startMessage.put("usage", Map.of("input_tokens", 5, "output_tokens", 1)); + Map messageStart = new LinkedHashMap<>(); + messageStart.put("type", "message_start"); + messageStart.put("message", startMessage); + + Map contentBlockStart = new LinkedHashMap<>(); + contentBlockStart.put("type", "content_block_start"); + contentBlockStart.put("index", 0); + contentBlockStart.put("content_block", Map.of("type", "text", "text", "")); + + Map contentBlockDelta = new LinkedHashMap<>(); + contentBlockDelta.put("type", "content_block_delta"); + contentBlockDelta.put("index", 0); + contentBlockDelta.put("delta", Map.of("type", "text_delta", "text", text)); + + Map contentBlockStop = new LinkedHashMap<>(); + contentBlockStop.put("type", "content_block_stop"); + contentBlockStop.put("index", 0); + + Map messageDeltaDelta = new LinkedHashMap<>(); + messageDeltaDelta.put("stop_reason", "end_turn"); + messageDeltaDelta.put("stop_sequence", null); + Map messageDelta = new LinkedHashMap<>(); + messageDelta.put("type", "message_delta"); + messageDelta.put("delta", messageDeltaDelta); + messageDelta.put("usage", Map.of("output_tokens", 7)); + + StringBuilder sb = new StringBuilder(); + sb.append(sse("message_start", messageStart)); + sb.append(sse("content_block_start", contentBlockStart)); + sb.append(sse("content_block_delta", contentBlockDelta)); + sb.append(sse("content_block_stop", contentBlockStop)); + sb.append(sse("message_delta", messageDelta)); + sb.append(sse("message_stop", Map.of("type", "message_stop"))); + return sb.toString(); + } + // --- Synthetic response builders for the CopilotRequestHandler send override // --- @@ -191,6 +243,9 @@ static HttpResponse buildInferenceResponse(String url, String bodyT } if (u.endsWith("/messages")) { + if (stream) { + return sseResponse(anthropicMessageSseBody(text)); + } Map body = new LinkedHashMap<>(); body.put("id", "msg_stub_1"); body.put("type", "message"); diff --git a/nodejs/test/e2e/session_config.e2e.test.ts b/nodejs/test/e2e/session_config.e2e.test.ts index 70ee6546e..98b1a0bfa 100644 --- a/nodejs/test/e2e/session_config.e2e.test.ts +++ b/nodejs/test/e2e/session_config.e2e.test.ts @@ -308,6 +308,59 @@ describe("Session Configuration", async () => { }); } + function sse(body: string): Response { + return new Response(body, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + } + + function anthropicMessageStreamBody(text: string): string { + const events: Array<[string, unknown]> = [ + [ + "message_start", + { + type: "message_start", + message: { + id: "msg_stub_1", + type: "message", + role: "assistant", + model: "claude-sonnet-4.5", + content: [], + stop_reason: null, + stop_sequence: null, + usage: { input_tokens: 5, output_tokens: 1 }, + }, + }, + ], + [ + "content_block_start", + { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "" }, + }, + ], + [ + "content_block_delta", + { type: "content_block_delta", index: 0, delta: { type: "text_delta", text } }, + ], + ["content_block_stop", { type: "content_block_stop", index: 0 }], + [ + "message_delta", + { + type: "message_delta", + delta: { stop_reason: "end_turn", stop_sequence: null }, + usage: { output_tokens: 7 }, + }, + ], + ["message_stop", { type: "message_stop" }], + ]; + return events + .map(([event, data]) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`) + .join(""); + } + function buildNonInferenceResponse(url: string): Response { const u = url.toLowerCase(); if (u.endsWith("/models")) { @@ -342,9 +395,13 @@ describe("Session Configuration", async () => { return json({}); } - function buildInferenceResponse(url: string, _body: string): Response { + function buildInferenceResponse(url: string, body: string): Response { const u = url.toLowerCase(); + const wantsStream = /"stream"\s*:\s*true/.test(body); if (u.endsWith("/messages")) { + if (wantsStream) { + return sse(anthropicMessageStreamBody("OK from the synthetic stream.")); + } return json({ id: "msg_stub_1", type: "message", diff --git a/python/e2e/_copilot_request_helpers.py b/python/e2e/_copilot_request_helpers.py index 6a3d3d1de..2d91bc9bc 100644 --- a/python/e2e/_copilot_request_helpers.py +++ b/python/e2e/_copilot_request_helpers.py @@ -224,6 +224,57 @@ def build_inference_response(request: httpx.Request, text: str = SYNTHETIC_TEXT) ) if url.endswith("/messages"): + if wants_stream: + events = [ + ( + "message_start", + { + "type": "message_start", + "message": { + "id": "msg_stub_1", + "type": "message", + "role": "assistant", + "model": "claude-sonnet-4.5", + "content": [], + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": 5, "output_tokens": 1}, + }, + }, + ), + ( + "content_block_start", + { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""}, + }, + ), + ( + "content_block_delta", + { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": text}, + }, + ), + ("content_block_stop", {"type": "content_block_stop", "index": 0}), + ( + "message_delta", + { + "type": "message_delta", + "delta": {"stop_reason": "end_turn", "stop_sequence": None}, + "usage": {"output_tokens": 7}, + }, + ), + ("message_stop", {"type": "message_stop"}), + ] + stream_body = "".join(sse(event, data) for event, data in events) + return httpx.Response( + 200, + headers={"content-type": "text/event-stream"}, + content=stream_body.encode(), + ) return httpx.Response( 200, headers={"content-type": "application/json"}, diff --git a/rust/tests/e2e/session_config.rs b/rust/tests/e2e/session_config.rs index d4948ba17..0a7a5eb11 100644 --- a/rust/tests/e2e/session_config.rs +++ b/rust/tests/e2e/session_config.rs @@ -300,7 +300,7 @@ impl CopilotRequestHandler for RecordingHandler { body: request.body.clone(), }); if is_inference_url(&request.url) { - return Ok(synth_inference_response(&request.url)); + return Ok(synth_inference_response(&request.url, &request.body)); } Ok(synth_non_inference_response(&request.url)) } @@ -329,6 +329,78 @@ fn http_response(status: u16, headers: HeaderMap, body: Value) -> CopilotHttpRes CopilotHttpResponse::new(status, None, headers, Box::pin(stream)) } +fn sse_response(body: String) -> CopilotHttpResponse { + let mut headers = HeaderMap::new(); + headers.insert( + "content-type", + HeaderValue::from_static("text/event-stream"), + ); + let stream = futures_util::stream::once(async move { + Ok::(Bytes::from(body.into_bytes())) + }); + CopilotHttpResponse::new(200, None, headers, Box::pin(stream)) +} + +fn wants_stream(body: &[u8]) -> bool { + String::from_utf8_lossy(body) + .replace(char::is_whitespace, "") + .contains("\"stream\":true") +} + +fn anthropic_message_stream_body(text: &str) -> String { + let events = [ + ( + "message_start", + json!({ + "type": "message_start", + "message": { + "id": "msg_stub_1", + "type": "message", + "role": "assistant", + "model": "claude-sonnet-4.5", + "content": [], + "stop_reason": null, + "stop_sequence": null, + "usage": { "input_tokens": 5, "output_tokens": 1 }, + }, + }), + ), + ( + "content_block_start", + json!({ + "type": "content_block_start", + "index": 0, + "content_block": { "type": "text", "text": "" }, + }), + ), + ( + "content_block_delta", + json!({ + "type": "content_block_delta", + "index": 0, + "delta": { "type": "text_delta", "text": text }, + }), + ), + ( + "content_block_stop", + json!({ "type": "content_block_stop", "index": 0 }), + ), + ( + "message_delta", + json!({ + "type": "message_delta", + "delta": { "stop_reason": "end_turn", "stop_sequence": null }, + "usage": { "output_tokens": 7 }, + }), + ), + ("message_stop", json!({ "type": "message_stop" })), + ]; + events + .iter() + .map(|(name, data)| format!("event: {name}\ndata: {data}\n\n")) + .collect() +} + fn synth_non_inference_response(url: &str) -> CopilotHttpResponse { let lower = url.to_lowercase(); if lower.ends_with("/models") { @@ -369,9 +441,12 @@ fn synth_non_inference_response(url: &str) -> CopilotHttpResponse { http_response(200, json_headers(), json!({})) } -fn synth_inference_response(url: &str) -> CopilotHttpResponse { +fn synth_inference_response(url: &str, body: &[u8]) -> CopilotHttpResponse { let lower = url.to_lowercase(); if lower.ends_with("/messages") { + if wants_stream(body) { + return sse_response(anthropic_message_stream_body(SYNTHETIC_TEXT)); + } return http_response( 200, json_headers(),