| const { klona } = require('klona'); |
| const { sleep } = require('@librechat/agents'); |
| const { sendEvent } = require('@librechat/api'); |
| const { logger } = require('@librechat/data-schemas'); |
| const { |
| StepTypes, |
| RunStatus, |
| StepStatus, |
| ContentTypes, |
| ToolCallTypes, |
| imageGenTools, |
| EModelEndpoint, |
| defaultOrderQuery, |
| } = require('librechat-data-provider'); |
| const { retrieveAndProcessFile } = require('~/server/services/Files/process'); |
| const { processRequiredActions } = require('~/server/services/ToolService'); |
| const { RunManager, waitForRun } = require('~/server/services/Runs'); |
| const { processMessages } = require('~/server/services/Threads'); |
| const { createOnProgress } = require('~/server/utils'); |
| const { TextStream } = require('~/app/clients'); |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function createOnTextProgress({ |
| openai, |
| conversationId, |
| userMessageId, |
| messageId, |
| thread_id, |
| }) { |
| openai.responseMessage = { |
| conversationId, |
| parentMessageId: userMessageId, |
| role: 'assistant', |
| messageId, |
| content: [], |
| }; |
|
|
| openai.responseText = ''; |
|
|
| openai.addContentData = (data) => { |
| const { type, index } = data; |
| openai.responseMessage.content[index] = { type, [type]: data[type] }; |
|
|
| if (type === ContentTypes.TEXT) { |
| openai.responseText += data[type].value; |
| return; |
| } |
|
|
| const contentData = { |
| index, |
| type, |
| [type]: data[type], |
| messageId, |
| thread_id, |
| conversationId, |
| }; |
|
|
| logger.debug('Content data:', contentData); |
| sendEvent(openai.res, contentData); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function getResponse({ openai, run_id, thread_id }) { |
| const run = await waitForRun({ openai, run_id, thread_id, pollIntervalMs: 2000 }); |
|
|
| if (run.status === RunStatus.COMPLETED) { |
| const messages = await openai.beta.threads.messages.list(thread_id, defaultOrderQuery); |
| const newMessages = messages.data.filter((msg) => msg.run_id === run_id); |
|
|
| return newMessages; |
| } else if (run.status === RunStatus.REQUIRES_ACTION) { |
| const actions = []; |
| run.required_action?.submit_tool_outputs.tool_calls.forEach((item) => { |
| const functionCall = item.function; |
| const args = JSON.parse(functionCall.arguments); |
| actions.push({ |
| tool: functionCall.name, |
| toolInput: args, |
| toolCallId: item.id, |
| run_id, |
| thread_id, |
| }); |
| }); |
|
|
| return actions; |
| } |
|
|
| const runInfo = JSON.stringify(run, null, 2); |
| throw new Error(`Unexpected run status ${run.status}.\nFull run info:\n\n${runInfo}`); |
| } |
|
|
| |
| |
| |
| |
| |
| function filterSteps(steps = []) { |
| if (steps.length <= 1) { |
| return steps; |
| } |
| const stepMap = new Map(); |
|
|
| steps.forEach((step) => { |
| if (!step) { |
| return; |
| } |
|
|
| const effectiveTimestamp = Math.max( |
| step.created_at, |
| step.expired_at || 0, |
| step.cancelled_at || 0, |
| step.failed_at || 0, |
| step.completed_at || 0, |
| ); |
|
|
| if (!stepMap.has(step.id) || effectiveTimestamp > stepMap.get(step.id).effectiveTimestamp) { |
| const latestStep = { ...step, effectiveTimestamp }; |
| if (latestStep.last_error) { |
| |
| } |
| stepMap.set(step.id, latestStep); |
| } |
| }); |
|
|
| return Array.from(stepMap.values()).map((step) => { |
| delete step.effectiveTimestamp; |
| return step; |
| }); |
| } |
|
|
| |
| |
| |
| |
| |
| |
|
|
| function hasToolCallChanged(previousCall, currentCall) { |
| return JSON.stringify(previousCall) !== JSON.stringify(currentCall); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| function createInProgressHandler(openai, thread_id, messages) { |
| openai.index = 0; |
| openai.mappedOrder = new Map(); |
| openai.seenToolCalls = new Map(); |
| openai.processedFileIds = new Set(); |
| openai.completeToolCallSteps = new Set(); |
| openai.seenCompletedMessages = new Set(); |
|
|
| |
| |
| |
| |
| |
| async function in_progress({ step }) { |
| if (step.type === StepTypes.TOOL_CALLS) { |
| const { tool_calls } = step.step_details; |
|
|
| for (const _toolCall of tool_calls) { |
| |
| const toolCall = _toolCall; |
| const previousCall = openai.seenToolCalls.get(toolCall.id); |
|
|
| |
| if (previousCall && !hasToolCallChanged(previousCall, toolCall)) { |
| continue; |
| } |
|
|
| let toolCallIndex = openai.mappedOrder.get(toolCall.id); |
| if (toolCallIndex === undefined) { |
| |
| toolCallIndex = openai.index; |
| openai.mappedOrder.set(toolCall.id, openai.index); |
| openai.index++; |
| } |
|
|
| if (step.status === StepStatus.IN_PROGRESS) { |
| toolCall.progress = |
| previousCall && previousCall.progress |
| ? Math.min(previousCall.progress + 0.2, 0.95) |
| : 0.01; |
| } else { |
| toolCall.progress = 1; |
| openai.completeToolCallSteps.add(step.id); |
| } |
|
|
| if ( |
| toolCall.type === ToolCallTypes.CODE_INTERPRETER && |
| step.status === StepStatus.COMPLETED |
| ) { |
| const { outputs } = toolCall[toolCall.type]; |
|
|
| for (const output of outputs) { |
| if (output.type !== 'image') { |
| continue; |
| } |
|
|
| if (openai.processedFileIds.has(output.image?.file_id)) { |
| continue; |
| } |
|
|
| const { file_id } = output.image; |
| const file = await retrieveAndProcessFile({ |
| openai, |
| client: openai, |
| file_id, |
| basename: `${file_id}.png`, |
| }); |
|
|
| const prelimImage = file; |
|
|
| |
| const prelimImageKeys = Object.keys(prelimImage); |
| const validImageFile = prelimImageKeys.every((key) => prelimImage[key]); |
|
|
| if (!validImageFile) { |
| continue; |
| } |
|
|
| const image_file = { |
| [ContentTypes.IMAGE_FILE]: prelimImage, |
| type: ContentTypes.IMAGE_FILE, |
| index: openai.index, |
| }; |
| openai.addContentData(image_file); |
| openai.processedFileIds.add(file_id); |
| openai.index++; |
| } |
| } else if ( |
| toolCall.type === ToolCallTypes.FUNCTION && |
| step.status === StepStatus.COMPLETED && |
| imageGenTools.has(toolCall[toolCall.type].name) |
| ) { |
| |
| openai.seenToolCalls.set(toolCall.id, toolCall); |
| continue; |
| } |
|
|
| openai.addContentData({ |
| [ContentTypes.TOOL_CALL]: toolCall, |
| index: toolCallIndex, |
| type: ContentTypes.TOOL_CALL, |
| }); |
|
|
| |
| openai.seenToolCalls.set(toolCall.id, toolCall); |
| } |
| } else if (step.type === StepTypes.MESSAGE_CREATION && step.status === StepStatus.COMPLETED) { |
| const { message_id } = step.step_details.message_creation; |
| if (openai.seenCompletedMessages.has(message_id)) { |
| return; |
| } |
|
|
| openai.seenCompletedMessages.add(message_id); |
|
|
| const message = await openai.beta.threads.messages.retrieve(message_id, { thread_id }); |
| if (!message?.content?.length) { |
| return; |
| } |
| messages.push(message); |
|
|
| let messageIndex = openai.mappedOrder.get(step.id); |
| if (messageIndex === undefined) { |
| |
| messageIndex = openai.index; |
| openai.mappedOrder.set(step.id, openai.index); |
| openai.index++; |
| } |
|
|
| const result = await processMessages({ openai, client: openai, messages: [message] }); |
| openai.addContentData({ |
| [ContentTypes.TEXT]: { value: result.text }, |
| type: ContentTypes.TEXT, |
| index: messageIndex, |
| }); |
|
|
| |
| const { onProgress: progressCallback } = createOnProgress({ |
| |
| |
| }); |
|
|
| |
| |
| const onProgress = progressCallback({ |
| res: openai.res, |
| index: messageIndex, |
| messageId: openai.responseMessage.messageId, |
| conversationId: openai.responseMessage.conversationId, |
| type: ContentTypes.TEXT, |
| thread_id, |
| }); |
|
|
| |
| await sleep(500); |
|
|
| const stream = new TextStream(result.text, { delay: 9 }); |
| await stream.processTextStream(onProgress); |
| } |
| } |
|
|
| return in_progress; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function runAssistant({ |
| openai, |
| run_id, |
| thread_id, |
| accumulatedSteps = [], |
| accumulatedMessages = [], |
| in_progress: inProgress, |
| }) { |
| const appConfig = openai.req.config; |
| let steps = accumulatedSteps; |
| let messages = accumulatedMessages; |
| const in_progress = inProgress ?? createInProgressHandler(openai, thread_id, messages); |
| openai.in_progress = in_progress; |
|
|
| const runManager = new RunManager({ |
| in_progress, |
| final: async ({ step, runStatus, stepsByStatus }) => { |
| logger.debug(`[runAssistant] Final step for ${run_id} with status ${runStatus}`, step); |
|
|
| const promises = []; |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| for (const [_status, stepsPromises] of Object.entries(stepsByStatus)) { |
| promises.push(...stepsPromises); |
| } |
|
|
| const resolved = await Promise.all(promises); |
| const finalSteps = filterSteps(steps.concat(resolved)); |
|
|
| if (step.type === StepTypes.MESSAGE_CREATION) { |
| const incompleteToolCallSteps = finalSteps.filter( |
| (s) => s && s.type === StepTypes.TOOL_CALLS && !openai.completeToolCallSteps.has(s.id), |
| ); |
| for (const incompleteToolCallStep of incompleteToolCallSteps) { |
| await in_progress({ step: incompleteToolCallStep }); |
| } |
| } |
| await in_progress({ step }); |
| |
| |
| resolved.push(step); |
| |
| steps = klona(finalSteps); |
| }, |
| }); |
|
|
| const { endpoint = EModelEndpoint.azureAssistants } = openai.req.body; |
| |
| const assistantsEndpointConfig = appConfig.endpoints?.[endpoint] ?? {}; |
| const { pollIntervalMs, timeoutMs } = assistantsEndpointConfig; |
|
|
| const run = await waitForRun({ |
| openai, |
| run_id, |
| thread_id, |
| runManager, |
| pollIntervalMs, |
| timeout: timeoutMs, |
| }); |
|
|
| if (!run.required_action) { |
| |
| |
| const sortedMessages = messages.sort((a, b) => a.created_at - b.created_at); |
| return { |
| run, |
| steps, |
| messages: sortedMessages, |
| finalMessage: openai.responseMessage, |
| text: openai.responseText, |
| }; |
| } |
|
|
| const { submit_tool_outputs } = run.required_action; |
| const actions = submit_tool_outputs.tool_calls.map((item) => { |
| const functionCall = item.function; |
| const args = JSON.parse(functionCall.arguments); |
| return { |
| tool: functionCall.name, |
| toolInput: args, |
| toolCallId: item.id, |
| run_id, |
| thread_id, |
| }; |
| }); |
|
|
| const tool_outputs = await processRequiredActions(openai, actions); |
| const toolRun = await openai.beta.threads.runs.submitToolOutputs(run.id, { |
| thread_id: run.thread_id, |
| tool_outputs, |
| }); |
|
|
| |
| return await runAssistant({ |
| openai, |
| run_id: toolRun.id, |
| thread_id, |
| accumulatedSteps: steps, |
| accumulatedMessages: messages, |
| in_progress, |
| }); |
| } |
|
|
| module.exports = { |
| getResponse, |
| runAssistant, |
| createOnTextProgress, |
| }; |
|
|