Spaces:
Running
Running
| /** | |
| * SSE Streamer for Cloud Code | |
| * | |
| * Streams SSE events in real-time, converting Google format to Anthropic format. | |
| * Handles thinking blocks, text blocks, and tool use blocks. | |
| */ | |
| import crypto from 'crypto'; | |
| import { MIN_SIGNATURE_LENGTH, getModelFamily } from '../constants.js'; | |
| import { EmptyResponseError } from '../errors.js'; | |
| import { cacheSignature, cacheThinkingSignature } from '../format/signature-cache.js'; | |
| import { logger } from '../utils/logger.js'; | |
| /** | |
| * Stream SSE response and yield Anthropic-format events | |
| * | |
| * @param {Response} response - The HTTP response with SSE body | |
| * @param {string} originalModel - The original model name | |
| * @yields {Object} Anthropic-format SSE events | |
| */ | |
| export async function* streamSSEResponse(response, originalModel) { | |
| const messageId = `msg_${crypto.randomBytes(16).toString('hex')}`; | |
| let hasEmittedStart = false; | |
| let blockIndex = 0; | |
| let currentBlockType = null; | |
| let currentThinkingSignature = ''; | |
| let inputTokens = 0; | |
| let outputTokens = 0; | |
| let cacheReadTokens = 0; | |
| let stopReason = null; | |
| const reader = response.body.getReader(); | |
| const decoder = new TextDecoder(); | |
| let buffer = ''; | |
| while (true) { | |
| const { done, value } = await reader.read(); | |
| if (done) break; | |
| buffer += decoder.decode(value, { stream: true }); | |
| const lines = buffer.split('\n'); | |
| buffer = lines.pop() || ''; | |
| for (const line of lines) { | |
| if (!line.startsWith('data:')) continue; | |
| const jsonText = line.slice(5).trim(); | |
| if (!jsonText) continue; | |
| try { | |
| const data = JSON.parse(jsonText); | |
| const innerResponse = data.response || data; | |
| // Extract usage metadata (including cache tokens) | |
| const usage = innerResponse.usageMetadata; | |
| if (usage) { | |
| inputTokens = usage.promptTokenCount || inputTokens; | |
| outputTokens = usage.candidatesTokenCount || outputTokens; | |
| cacheReadTokens = usage.cachedContentTokenCount || cacheReadTokens; | |
| } | |
| const candidates = innerResponse.candidates || []; | |
| const firstCandidate = candidates[0] || {}; | |
| const content = firstCandidate.content || {}; | |
| const parts = content.parts || []; | |
| // Emit message_start on first data | |
| // Note: input_tokens = promptTokenCount - cachedContentTokenCount (Antigravity includes cached in total) | |
| if (!hasEmittedStart && parts.length > 0) { | |
| hasEmittedStart = true; | |
| yield { | |
| type: 'message_start', | |
| message: { | |
| id: messageId, | |
| type: 'message', | |
| role: 'assistant', | |
| content: [], | |
| model: originalModel, | |
| stop_reason: null, | |
| stop_sequence: null, | |
| usage: { | |
| input_tokens: inputTokens - cacheReadTokens, | |
| output_tokens: 0, | |
| cache_read_input_tokens: cacheReadTokens, | |
| cache_creation_input_tokens: 0 | |
| } | |
| } | |
| }; | |
| } | |
| // Process each part | |
| for (const part of parts) { | |
| if (part.thought === true) { | |
| // Handle thinking block | |
| const text = part.text || ''; | |
| const signature = part.thoughtSignature || ''; | |
| if (currentBlockType !== 'thinking') { | |
| if (currentBlockType !== null) { | |
| yield { type: 'content_block_stop', index: blockIndex }; | |
| blockIndex++; | |
| } | |
| currentBlockType = 'thinking'; | |
| currentThinkingSignature = ''; | |
| yield { | |
| type: 'content_block_start', | |
| index: blockIndex, | |
| content_block: { type: 'thinking', thinking: '' } | |
| }; | |
| } | |
| if (signature && signature.length >= MIN_SIGNATURE_LENGTH) { | |
| currentThinkingSignature = signature; | |
| // Cache thinking signature with model family for cross-model compatibility | |
| const modelFamily = getModelFamily(originalModel); | |
| cacheThinkingSignature(signature, modelFamily); | |
| } | |
| yield { | |
| type: 'content_block_delta', | |
| index: blockIndex, | |
| delta: { type: 'thinking_delta', thinking: text } | |
| }; | |
| } else if (part.text !== undefined) { | |
| // Skip empty text parts (but preserve whitespace-only chunks for proper spacing) | |
| if (part.text === '') { | |
| continue; | |
| } | |
| // Handle regular text | |
| if (currentBlockType !== 'text') { | |
| if (currentBlockType === 'thinking' && currentThinkingSignature) { | |
| yield { | |
| type: 'content_block_delta', | |
| index: blockIndex, | |
| delta: { type: 'signature_delta', signature: currentThinkingSignature } | |
| }; | |
| currentThinkingSignature = ''; | |
| } | |
| if (currentBlockType !== null) { | |
| yield { type: 'content_block_stop', index: blockIndex }; | |
| blockIndex++; | |
| } | |
| currentBlockType = 'text'; | |
| yield { | |
| type: 'content_block_start', | |
| index: blockIndex, | |
| content_block: { type: 'text', text: '' } | |
| }; | |
| } | |
| yield { | |
| type: 'content_block_delta', | |
| index: blockIndex, | |
| delta: { type: 'text_delta', text: part.text } | |
| }; | |
| } else if (part.functionCall) { | |
| // Handle tool use | |
| // For Gemini 3+, capture thoughtSignature from the functionCall part | |
| // The signature is a sibling to functionCall, not inside it | |
| const functionCallSignature = part.thoughtSignature || ''; | |
| if (currentBlockType === 'thinking' && currentThinkingSignature) { | |
| yield { | |
| type: 'content_block_delta', | |
| index: blockIndex, | |
| delta: { type: 'signature_delta', signature: currentThinkingSignature } | |
| }; | |
| currentThinkingSignature = ''; | |
| } | |
| if (currentBlockType !== null) { | |
| yield { type: 'content_block_stop', index: blockIndex }; | |
| blockIndex++; | |
| } | |
| currentBlockType = 'tool_use'; | |
| stopReason = 'tool_use'; | |
| const toolId = part.functionCall.id || `toolu_${crypto.randomBytes(12).toString('hex')}`; | |
| // For Gemini, include the thoughtSignature in the tool_use block | |
| // so it can be sent back in subsequent requests | |
| const toolUseBlock = { | |
| type: 'tool_use', | |
| id: toolId, | |
| name: part.functionCall.name, | |
| input: {} | |
| }; | |
| // Store the signature in the tool_use block for later retrieval | |
| if (functionCallSignature && functionCallSignature.length >= MIN_SIGNATURE_LENGTH) { | |
| toolUseBlock.thoughtSignature = functionCallSignature; | |
| // Cache for future requests (Claude Code may strip this field) | |
| cacheSignature(toolId, functionCallSignature); | |
| } | |
| yield { | |
| type: 'content_block_start', | |
| index: blockIndex, | |
| content_block: toolUseBlock | |
| }; | |
| yield { | |
| type: 'content_block_delta', | |
| index: blockIndex, | |
| delta: { | |
| type: 'input_json_delta', | |
| partial_json: JSON.stringify(part.functionCall.args || {}) | |
| } | |
| }; | |
| } else if (part.inlineData) { | |
| // Handle image content from Google format | |
| if (currentBlockType === 'thinking' && currentThinkingSignature) { | |
| yield { | |
| type: 'content_block_delta', | |
| index: blockIndex, | |
| delta: { type: 'signature_delta', signature: currentThinkingSignature } | |
| }; | |
| currentThinkingSignature = ''; | |
| } | |
| if (currentBlockType !== null) { | |
| yield { type: 'content_block_stop', index: blockIndex }; | |
| blockIndex++; | |
| } | |
| currentBlockType = 'image'; | |
| // Emit image block as a complete block | |
| yield { | |
| type: 'content_block_start', | |
| index: blockIndex, | |
| content_block: { | |
| type: 'image', | |
| source: { | |
| type: 'base64', | |
| media_type: part.inlineData.mimeType, | |
| data: part.inlineData.data | |
| } | |
| } | |
| }; | |
| yield { type: 'content_block_stop', index: blockIndex }; | |
| blockIndex++; | |
| currentBlockType = null; | |
| } | |
| } | |
| // Check finish reason (only if not already set by tool_use) | |
| if (firstCandidate.finishReason && !stopReason) { | |
| if (firstCandidate.finishReason === 'MAX_TOKENS') { | |
| stopReason = 'max_tokens'; | |
| } else if (firstCandidate.finishReason === 'STOP') { | |
| stopReason = 'end_turn'; | |
| } | |
| } | |
| } catch (parseError) { | |
| logger.warn('[CloudCode] SSE parse error:', parseError.message); | |
| } | |
| } | |
| } | |
| // Handle no content received - throw error to trigger retry in streaming-handler | |
| if (!hasEmittedStart) { | |
| logger.warn('[CloudCode] No content parts received, throwing for retry'); | |
| throw new EmptyResponseError('No content parts received from API'); | |
| } else { | |
| // Close any open block | |
| if (currentBlockType !== null) { | |
| if (currentBlockType === 'thinking' && currentThinkingSignature) { | |
| yield { | |
| type: 'content_block_delta', | |
| index: blockIndex, | |
| delta: { type: 'signature_delta', signature: currentThinkingSignature } | |
| }; | |
| } | |
| yield { type: 'content_block_stop', index: blockIndex }; | |
| } | |
| } | |
| // Emit message_delta and message_stop | |
| yield { | |
| type: 'message_delta', | |
| delta: { stop_reason: stopReason || 'end_turn', stop_sequence: null }, | |
| usage: { | |
| output_tokens: outputTokens, | |
| cache_read_input_tokens: cacheReadTokens, | |
| cache_creation_input_tokens: 0 | |
| } | |
| }; | |
| yield { type: 'message_stop' }; | |
| } | |