| const { nanoid } = require('nanoid'); |
| const { sendEvent } = require('@librechat/api'); |
| const { logger } = require('@librechat/data-schemas'); |
| const { Tools, StepTypes, FileContext, ErrorTypes } = require('librechat-data-provider'); |
| const { |
| EnvVar, |
| Providers, |
| GraphEvents, |
| getMessageId, |
| ToolEndHandler, |
| handleToolCalls, |
| ChatModelStreamHandler, |
| } = require('@librechat/agents'); |
| const { processFileCitations } = require('~/server/services/Files/Citations'); |
| const { processCodeOutput } = require('~/server/services/Files/Code/process'); |
| const { loadAuthValues } = require('~/server/services/Tools/credentials'); |
| const { saveBase64Image } = require('~/server/services/Files/process'); |
|
|
| class ModelEndHandler { |
| |
| |
| |
| constructor(collectedUsage) { |
| if (!Array.isArray(collectedUsage)) { |
| throw new Error('collectedUsage must be an array'); |
| } |
| this.collectedUsage = collectedUsage; |
| } |
|
|
| finalize(errorMessage) { |
| if (!errorMessage) { |
| return; |
| } |
| throw new Error(errorMessage); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| async handle(event, data, metadata, graph) { |
| if (!graph || !metadata) { |
| console.warn(`Graph or metadata not found in ${event} event`); |
| return; |
| } |
|
|
| |
| let errorMessage; |
| try { |
| const agentContext = graph.getAgentContext(metadata); |
| const isGoogle = agentContext.provider === Providers.GOOGLE; |
| const streamingDisabled = !!agentContext.clientOptions?.disableStreaming; |
| if (data?.output?.additional_kwargs?.stop_reason === 'refusal') { |
| const info = { ...data.output.additional_kwargs }; |
| errorMessage = JSON.stringify({ |
| type: ErrorTypes.REFUSAL, |
| info, |
| }); |
| logger.debug(`[ModelEndHandler] Model refused to respond`, { |
| ...info, |
| userId: metadata.user_id, |
| messageId: metadata.run_id, |
| conversationId: metadata.thread_id, |
| }); |
| } |
|
|
| const toolCalls = data?.output?.tool_calls; |
| let hasUnprocessedToolCalls = false; |
| if (Array.isArray(toolCalls) && toolCalls.length > 0 && graph?.toolCallStepIds?.has) { |
| try { |
| hasUnprocessedToolCalls = toolCalls.some( |
| (tc) => tc?.id && !graph.toolCallStepIds.has(tc.id), |
| ); |
| } catch { |
| hasUnprocessedToolCalls = false; |
| } |
| } |
| if (isGoogle || streamingDisabled || hasUnprocessedToolCalls) { |
| await handleToolCalls(toolCalls, metadata, graph); |
| } |
|
|
| const usage = data?.output?.usage_metadata; |
| if (!usage) { |
| return this.finalize(errorMessage); |
| } |
| const modelName = metadata?.ls_model_name || agentContext.clientOptions?.model; |
| if (modelName) { |
| usage.model = modelName; |
| } |
|
|
| this.collectedUsage.push(usage); |
| if (!streamingDisabled) { |
| return this.finalize(errorMessage); |
| } |
| if (!data.output.content) { |
| return this.finalize(errorMessage); |
| } |
| const stepKey = graph.getStepKey(metadata); |
| const message_id = getMessageId(stepKey, graph) ?? ''; |
| if (message_id) { |
| await graph.dispatchRunStep(stepKey, { |
| type: StepTypes.MESSAGE_CREATION, |
| message_creation: { |
| message_id, |
| }, |
| }); |
| } |
| const stepId = graph.getStepIdByKey(stepKey); |
| const content = data.output.content; |
| if (typeof content === 'string') { |
| await graph.dispatchMessageDelta(stepId, { |
| content: [ |
| { |
| type: 'text', |
| text: content, |
| }, |
| ], |
| }); |
| } else if (content.every((c) => c.type?.startsWith('text'))) { |
| await graph.dispatchMessageDelta(stepId, { |
| content, |
| }); |
| } |
| } catch (error) { |
| logger.error('Error handling model end event:', error); |
| return this.finalize(errorMessage); |
| } |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| function checkIfLastAgent(last_agent_id, langgraph_node) { |
| if (!last_agent_id || !langgraph_node) { |
| return false; |
| } |
| return langgraph_node?.endsWith(last_agent_id); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedUsage }) { |
| if (!res || !aggregateContent) { |
| throw new Error( |
| `[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`, |
| ); |
| } |
| const handlers = { |
| [GraphEvents.CHAT_MODEL_END]: new ModelEndHandler(collectedUsage), |
| [GraphEvents.TOOL_END]: new ToolEndHandler(toolEndCallback, logger), |
| [GraphEvents.CHAT_MODEL_STREAM]: new ChatModelStreamHandler(), |
| [GraphEvents.ON_RUN_STEP]: { |
| |
| |
| |
| |
| |
| |
| handle: (event, data, metadata) => { |
| if (data?.stepDetails.type === StepTypes.TOOL_CALLS) { |
| sendEvent(res, { event, data }); |
| } else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { |
| sendEvent(res, { event, data }); |
| } else if (!metadata?.hide_sequential_outputs) { |
| sendEvent(res, { event, data }); |
| } else { |
| const agentName = metadata?.name ?? 'Agent'; |
| const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS; |
| const action = isToolCall ? 'performing a task...' : 'thinking...'; |
| sendEvent(res, { |
| event: 'on_agent_update', |
| data: { |
| runId: metadata?.run_id, |
| message: `${agentName} is ${action}`, |
| }, |
| }); |
| } |
| aggregateContent({ event, data }); |
| }, |
| }, |
| [GraphEvents.ON_RUN_STEP_DELTA]: { |
| |
| |
| |
| |
| |
| |
| handle: (event, data, metadata) => { |
| if (data?.delta.type === StepTypes.TOOL_CALLS) { |
| sendEvent(res, { event, data }); |
| } else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { |
| sendEvent(res, { event, data }); |
| } else if (!metadata?.hide_sequential_outputs) { |
| sendEvent(res, { event, data }); |
| } |
| aggregateContent({ event, data }); |
| }, |
| }, |
| [GraphEvents.ON_RUN_STEP_COMPLETED]: { |
| |
| |
| |
| |
| |
| |
| handle: (event, data, metadata) => { |
| if (data?.result != null) { |
| sendEvent(res, { event, data }); |
| } else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { |
| sendEvent(res, { event, data }); |
| } else if (!metadata?.hide_sequential_outputs) { |
| sendEvent(res, { event, data }); |
| } |
| aggregateContent({ event, data }); |
| }, |
| }, |
| [GraphEvents.ON_MESSAGE_DELTA]: { |
| |
| |
| |
| |
| |
| |
| handle: (event, data, metadata) => { |
| if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { |
| sendEvent(res, { event, data }); |
| } else if (!metadata?.hide_sequential_outputs) { |
| sendEvent(res, { event, data }); |
| } |
| aggregateContent({ event, data }); |
| }, |
| }, |
| [GraphEvents.ON_REASONING_DELTA]: { |
| |
| |
| |
| |
| |
| |
| handle: (event, data, metadata) => { |
| if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { |
| sendEvent(res, { event, data }); |
| } else if (!metadata?.hide_sequential_outputs) { |
| sendEvent(res, { event, data }); |
| } |
| aggregateContent({ event, data }); |
| }, |
| }, |
| }; |
|
|
| return handlers; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| function createToolEndCallback({ req, res, artifactPromises }) { |
| |
| |
| |
| return async (data, metadata) => { |
| const output = data?.output; |
| if (!output) { |
| return; |
| } |
|
|
| if (!output.artifact) { |
| return; |
| } |
|
|
| if (output.artifact[Tools.file_search]) { |
| artifactPromises.push( |
| (async () => { |
| const user = req.user; |
| const attachment = await processFileCitations({ |
| user, |
| metadata, |
| appConfig: req.config, |
| toolArtifact: output.artifact, |
| toolCallId: output.tool_call_id, |
| }); |
| if (!attachment) { |
| return null; |
| } |
| if (!res.headersSent) { |
| return attachment; |
| } |
| res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); |
| return attachment; |
| })().catch((error) => { |
| logger.error('Error processing file citations:', error); |
| return null; |
| }), |
| ); |
| } |
|
|
| |
| |
| if (output.artifact[Tools.ui_resources]) { |
| artifactPromises.push( |
| (async () => { |
| const attachment = { |
| type: Tools.ui_resources, |
| messageId: metadata.run_id, |
| toolCallId: output.tool_call_id, |
| conversationId: metadata.thread_id, |
| [Tools.ui_resources]: output.artifact[Tools.ui_resources].data, |
| }; |
| if (!res.headersSent) { |
| return attachment; |
| } |
| res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); |
| return attachment; |
| })().catch((error) => { |
| logger.error('Error processing artifact content:', error); |
| return null; |
| }), |
| ); |
| } |
|
|
| if (output.artifact[Tools.web_search]) { |
| artifactPromises.push( |
| (async () => { |
| const attachment = { |
| type: Tools.web_search, |
| messageId: metadata.run_id, |
| toolCallId: output.tool_call_id, |
| conversationId: metadata.thread_id, |
| [Tools.web_search]: { ...output.artifact[Tools.web_search] }, |
| }; |
| if (!res.headersSent) { |
| return attachment; |
| } |
| res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); |
| return attachment; |
| })().catch((error) => { |
| logger.error('Error processing artifact content:', error); |
| return null; |
| }), |
| ); |
| } |
|
|
| if (output.artifact.content) { |
| |
| const content = output.artifact.content; |
| for (let i = 0; i < content.length; i++) { |
| const part = content[i]; |
| if (!part) { |
| continue; |
| } |
| if (part.type !== 'image_url') { |
| continue; |
| } |
| const { url } = part.image_url; |
| artifactPromises.push( |
| (async () => { |
| const filename = `${output.name}_${output.tool_call_id}_img_${nanoid()}`; |
| const file_id = output.artifact.file_ids?.[i]; |
| const file = await saveBase64Image(url, { |
| req, |
| file_id, |
| filename, |
| endpoint: metadata.provider, |
| context: FileContext.image_generation, |
| }); |
| const fileMetadata = Object.assign(file, { |
| messageId: metadata.run_id, |
| toolCallId: output.tool_call_id, |
| conversationId: metadata.thread_id, |
| }); |
| if (!res.headersSent) { |
| return fileMetadata; |
| } |
|
|
| if (!fileMetadata) { |
| return null; |
| } |
|
|
| res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`); |
| return fileMetadata; |
| })().catch((error) => { |
| logger.error('Error processing artifact content:', error); |
| return null; |
| }), |
| ); |
| } |
| return; |
| } |
|
|
| { |
| if (output.name !== Tools.execute_code) { |
| return; |
| } |
| } |
|
|
| if (!output.artifact.files) { |
| return; |
| } |
|
|
| for (const file of output.artifact.files) { |
| const { id, name } = file; |
| artifactPromises.push( |
| (async () => { |
| const result = await loadAuthValues({ |
| userId: req.user.id, |
| authFields: [EnvVar.CODE_API_KEY], |
| }); |
| const fileMetadata = await processCodeOutput({ |
| req, |
| id, |
| name, |
| apiKey: result[EnvVar.CODE_API_KEY], |
| messageId: metadata.run_id, |
| toolCallId: output.tool_call_id, |
| conversationId: metadata.thread_id, |
| session_id: output.artifact.session_id, |
| }); |
| if (!res.headersSent) { |
| return fileMetadata; |
| } |
|
|
| if (!fileMetadata) { |
| return null; |
| } |
|
|
| res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`); |
| return fileMetadata; |
| })().catch((error) => { |
| logger.error('Error processing code output:', error); |
| return null; |
| }), |
| ); |
| } |
| }; |
| } |
|
|
| module.exports = { |
| getDefaultHandlers, |
| createToolEndCallback, |
| }; |
|
|