| const { logger } = require('@librechat/data-schemas'); |
| const { getMultiplier, getCacheMultiplier } = require('./tx'); |
| const { Transaction, Balance } = require('~/db/models'); |
|
|
| const cancelRate = 1.15; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| const updateBalance = async ({ user, incrementValue, setValues }) => { |
| let maxRetries = 10; |
| let delay = 50; |
| let lastError = null; |
|
|
| for (let attempt = 1; attempt <= maxRetries; attempt++) { |
| let currentBalanceDoc; |
| try { |
| |
| currentBalanceDoc = await Balance.findOne({ user }).lean(); |
| const currentCredits = currentBalanceDoc ? currentBalanceDoc.tokenCredits : 0; |
|
|
| |
| const potentialNewCredits = currentCredits + incrementValue; |
| const newCredits = Math.max(0, potentialNewCredits); |
|
|
| |
| const updatePayload = { |
| $set: { |
| tokenCredits: newCredits, |
| ...(setValues || {}), |
| }, |
| }; |
|
|
| |
| let updatedBalance = null; |
| if (currentBalanceDoc) { |
| |
| |
| updatedBalance = await Balance.findOneAndUpdate( |
| { |
| user: user, |
| tokenCredits: currentCredits, |
| }, |
| updatePayload, |
| { |
| new: true, |
| |
| }, |
| ).lean(); |
|
|
| if (updatedBalance) { |
| |
| return updatedBalance; |
| } |
| |
| lastError = new Error(`Concurrency conflict for user ${user} on attempt ${attempt}.`); |
| |
| } else { |
| |
| |
| |
| |
| try { |
| updatedBalance = await Balance.findOneAndUpdate( |
| { |
| user: user, |
| |
| |
| |
| |
| |
| }, |
| updatePayload, |
| { |
| upsert: true, |
| new: true, |
| |
| |
| }, |
| ).lean(); |
|
|
| if (updatedBalance) { |
| |
| return updatedBalance; |
| } |
| |
| lastError = new Error( |
| `Upsert race condition suspected for user ${user} on attempt ${attempt}.`, |
| ); |
| } catch (error) { |
| if (error.code === 11000) { |
| |
| |
| |
| lastError = error; |
| |
| } else { |
| |
| throw error; |
| } |
| } |
| } |
| } catch (error) { |
| |
| logger.error(`[updateBalance] Error during attempt ${attempt} for user ${user}:`, error); |
| lastError = error; |
| |
| } |
|
|
| |
| if (attempt < maxRetries) { |
| const jitter = Math.random() * delay * 0.5; |
| await new Promise((resolve) => setTimeout(resolve, delay + jitter)); |
| delay = Math.min(delay * 2, 2000); |
| } |
| } |
|
|
| |
| logger.error( |
| `[updateBalance] Failed to update balance for user ${user} after ${maxRetries} attempts.`, |
| ); |
| throw ( |
| lastError || |
| new Error( |
| `Failed to update balance for user ${user} after maximum retries due to persistent conflicts.`, |
| ) |
| ); |
| }; |
|
|
| |
| function calculateTokenValue(txn) { |
| if (!txn.valueKey || !txn.tokenType) { |
| txn.tokenValue = txn.rawAmount; |
| } |
| const { valueKey, tokenType, model, endpointTokenConfig } = txn; |
| const multiplier = Math.abs(getMultiplier({ valueKey, tokenType, model, endpointTokenConfig })); |
| txn.rate = multiplier; |
| txn.tokenValue = txn.rawAmount * multiplier; |
| if (txn.context && txn.tokenType === 'completion' && txn.context === 'incomplete') { |
| txn.tokenValue = Math.ceil(txn.tokenValue * cancelRate); |
| txn.rate *= cancelRate; |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function createAutoRefillTransaction(txData) { |
| if (txData.rawAmount != null && isNaN(txData.rawAmount)) { |
| return; |
| } |
| const transaction = new Transaction(txData); |
| transaction.endpointTokenConfig = txData.endpointTokenConfig; |
| calculateTokenValue(transaction); |
| await transaction.save(); |
|
|
| const balanceResponse = await updateBalance({ |
| user: transaction.user, |
| incrementValue: txData.rawAmount, |
| setValues: { lastRefill: new Date() }, |
| }); |
| const result = { |
| rate: transaction.rate, |
| user: transaction.user.toString(), |
| balance: balanceResponse.tokenCredits, |
| }; |
| logger.debug('[Balance.check] Auto-refill performed', result); |
| result.transaction = transaction; |
| return result; |
| } |
|
|
| |
| |
| |
| |
| async function createTransaction(_txData) { |
| const { balance, transactions, ...txData } = _txData; |
| if (txData.rawAmount != null && isNaN(txData.rawAmount)) { |
| return; |
| } |
|
|
| if (transactions?.enabled === false) { |
| return; |
| } |
|
|
| const transaction = new Transaction(txData); |
| transaction.endpointTokenConfig = txData.endpointTokenConfig; |
| calculateTokenValue(transaction); |
|
|
| await transaction.save(); |
| if (!balance?.enabled) { |
| return; |
| } |
|
|
| let incrementValue = transaction.tokenValue; |
| const balanceResponse = await updateBalance({ |
| user: transaction.user, |
| incrementValue, |
| }); |
|
|
| return { |
| rate: transaction.rate, |
| user: transaction.user.toString(), |
| balance: balanceResponse.tokenCredits, |
| [transaction.tokenType]: incrementValue, |
| }; |
| } |
|
|
| |
| |
| |
| |
| async function createStructuredTransaction(_txData) { |
| const { balance, transactions, ...txData } = _txData; |
| if (transactions?.enabled === false) { |
| return; |
| } |
|
|
| const transaction = new Transaction({ |
| ...txData, |
| endpointTokenConfig: txData.endpointTokenConfig, |
| }); |
|
|
| calculateStructuredTokenValue(transaction); |
|
|
| await transaction.save(); |
|
|
| if (!balance?.enabled) { |
| return; |
| } |
|
|
| let incrementValue = transaction.tokenValue; |
|
|
| const balanceResponse = await updateBalance({ |
| user: transaction.user, |
| incrementValue, |
| }); |
|
|
| return { |
| rate: transaction.rate, |
| user: transaction.user.toString(), |
| balance: balanceResponse.tokenCredits, |
| [transaction.tokenType]: incrementValue, |
| }; |
| } |
|
|
| |
| function calculateStructuredTokenValue(txn) { |
| if (!txn.tokenType) { |
| txn.tokenValue = txn.rawAmount; |
| return; |
| } |
|
|
| const { model, endpointTokenConfig } = txn; |
|
|
| if (txn.tokenType === 'prompt') { |
| const inputMultiplier = getMultiplier({ tokenType: 'prompt', model, endpointTokenConfig }); |
| const writeMultiplier = |
| getCacheMultiplier({ cacheType: 'write', model, endpointTokenConfig }) ?? inputMultiplier; |
| const readMultiplier = |
| getCacheMultiplier({ cacheType: 'read', model, endpointTokenConfig }) ?? inputMultiplier; |
|
|
| txn.rateDetail = { |
| input: inputMultiplier, |
| write: writeMultiplier, |
| read: readMultiplier, |
| }; |
|
|
| const totalPromptTokens = |
| Math.abs(txn.inputTokens || 0) + |
| Math.abs(txn.writeTokens || 0) + |
| Math.abs(txn.readTokens || 0); |
|
|
| if (totalPromptTokens > 0) { |
| txn.rate = |
| (Math.abs(inputMultiplier * (txn.inputTokens || 0)) + |
| Math.abs(writeMultiplier * (txn.writeTokens || 0)) + |
| Math.abs(readMultiplier * (txn.readTokens || 0))) / |
| totalPromptTokens; |
| } else { |
| txn.rate = Math.abs(inputMultiplier); |
| } |
|
|
| txn.tokenValue = -( |
| Math.abs(txn.inputTokens || 0) * inputMultiplier + |
| Math.abs(txn.writeTokens || 0) * writeMultiplier + |
| Math.abs(txn.readTokens || 0) * readMultiplier |
| ); |
|
|
| txn.rawAmount = -totalPromptTokens; |
| } else if (txn.tokenType === 'completion') { |
| const multiplier = getMultiplier({ tokenType: txn.tokenType, model, endpointTokenConfig }); |
| txn.rate = Math.abs(multiplier); |
| txn.tokenValue = -Math.abs(txn.rawAmount) * multiplier; |
| txn.rawAmount = -Math.abs(txn.rawAmount); |
| } |
|
|
| if (txn.context && txn.tokenType === 'completion' && txn.context === 'incomplete') { |
| txn.tokenValue = Math.ceil(txn.tokenValue * cancelRate); |
| txn.rate *= cancelRate; |
| if (txn.rateDetail) { |
| txn.rateDetail = Object.fromEntries( |
| Object.entries(txn.rateDetail).map(([k, v]) => [k, v * cancelRate]), |
| ); |
| } |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| async function getTransactions(filter) { |
| try { |
| return await Transaction.find(filter).lean(); |
| } catch (error) { |
| logger.error('Error querying transactions:', error); |
| throw error; |
| } |
| } |
|
|
| module.exports = { |
| getTransactions, |
| createTransaction, |
| createAutoRefillTransaction, |
| createStructuredTransaction, |
| }; |
|
|