From ad91e0f3d4f89bee1fb54f8222df509ee23bd67f Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 12 Jun 2024 14:59:53 +0300 Subject: [PATCH] gpt refactoring --- web/chat-server.js | 579 ++++++++++++++++++++------------------------- 1 file changed, 254 insertions(+), 325 deletions(-) diff --git a/web/chat-server.js b/web/chat-server.js index 381799d..350c743 100644 --- a/web/chat-server.js +++ b/web/chat-server.js @@ -8,23 +8,21 @@ const path = require('path'); const dotenv = require('dotenv'); const ollama = require('ollama'); const axios = require('axios'); -// import OpenAI from "openai"; const OpenAI = require('openai'); -const openai = new OpenAI({ apiKey: "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" }); - const Groq = require('groq-sdk'); -//const LLM = require("@themaximalist/llm.js"); //https://www.npmjs.com/package/@themaximalist/llm.js + +// Load environment variables +dotenv.config({ path: `.env${process.env.NODE_ENV === 'development' ? '.development' : ''}` }); + +// Initialize services +const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); const groq = new Groq({ apiKey: process.env.GROQ_API_KEY }); - -if (dotenv) { - const envFile = process.env.NODE_ENV === 'development' ? '.env.development' : '.env'; - dotenv.config({ path: envFile }); -} - +// Express setup const app = express(); app.use(bodyParser.json()); +// Configuration constants const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000; const PORT_WS = process.env.SERVER_PORT_WS || 8080; const TTS_API_URL = process.env.TTS_API_URL; @@ -38,167 +36,160 @@ let queueCounter = 0; const sessions = new Map(); const chats = new Map(); // Store chat rooms -storage.init().then(() => { - storage.getItem('language').then((value) => { - if (value !== undefined) language = value; - else storage.setItem('language', language); - }); - storage.getItem('storeRecordings').then((value) => { - if (value !== undefined) storeRecordings = value; - else storage.setItem('storeRecordings', storeRecordings); - }); +// Initialize storage and load initial values +async function initStorage() { + await storage.init(); + language = await storage.getItem('language') || language; + storeRecordings = await storage.getItem('storeRecordings') || storeRecordings; - // Load existing chats, sessions from storage - storage.getItem('chats').then((value) => { - if (value !== undefined) { - value.forEach(chat => chats.set(chat.id, chat)); - } - }); - storage.getItem('sessions').then((value) => { - if (value !== undefined) { - value.forEach(session => sessions.set(session.sessionId, session)); - } - }); -}); + const storedChats = await storage.getItem('chats') || []; + storedChats.forEach(chat => chats.set(chat.id, chat)); + + const storedSessions = await storage.getItem('sessions') || []; + storedSessions.forEach(session => sessions.set(session.sessionId, session)); +} + +initStorage(); // WebSocket Server const wss = new WebSocket.Server({ port: PORT_WS }); -wss.on('connection', (ws) => { - - - - ws.on('message', (message) => { - let isJson = false; - let data = null; - // Check if message is JSON - try { - data = JSON.parse(message); - isJson = true; - } - catch (e) { - } - - if (isJson) { - // Handle JSON messages - try { - console.log('Received message:', data.type); - - switch (data.type) { - case 'sessionId': - ws.sessionId = Math.random().toString(36).substring(2); - sessions.set(ws.sessionId, { language: 'en' }); - //store new session in storage sessions - storage.getItem('sessions').then((value) => { - if (value !== undefined) { - value.push({ sessionId: ws.sessionId, language: 'en' }); - storage.setItem('sessions', value); - } else { - storage.setItem('sessions', [{ sessionId: ws.sessionId, language: 'en' }]); - } - } - ); - break; - case 'join': - const { username, language } = data; - sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language }); - ws.send(JSON.stringify({ type: 'sessionId', sessionId: ws.sessionId, language, storeRecordings })); - - const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); - ws.send(JSON.stringify({ type: 'chats', chats: userChats })); - - broadcastUserList(); - break; - case 'startChat': - const { users: chatUsers } = data; - const chatId = Math.random().toString(36).substring(2); - let participants = [ws.sessionId, ...chatUsers]; - // Deduplicate participants - participants = [...new Set(participants)]; - - chats.set(chatId, { participants, messages: [] }); - - //store new chat in storage chats - storage.getItem('chats').then((value) => { - if (value !== undefined) { - value.push({ id: chatId, participants }); - storage.setItem('chats', value); - } else { - storage.setItem('chats', [{ id: chatId, participants }]); - } - }); - - const userNames = participants.map(sessionId => { - const user = sessions.get(sessionId); - return user ? `${user.username}(${user.sessionId})` : 'Unknown User'; - }); - - console.log('Creating chat room. Users:', userNames); - - participants.forEach(sessionId => { - const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); - if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { - const userChats = Array.from(chats.entries()) - .filter(([id, chat]) => chat.participants.includes(sessionId)) - .map(([id, chat]) => ({ id, participants: chat.participants })); - participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats })); - } - }); - broadcastUserList(); - break; - // case 'audio': - // console.log('(queue ' + queueCounter + ') Received ' + (data.audio.length / 1024 / 1024).toFixed(3) + ' MB audio from client.'); - // handleAudioData(ws, data.audiobase64); - // break; - case 'enterChat': - const { chatId: Id } = data; - const enteredChat = chats.get(Id); - const currentSession = sessions.get(ws.sessionId); - currentSession.currentChat = Id; - if (enteredChat && enteredChat.participants.includes(ws.sessionId)) { - ws.send(JSON.stringify({ type: 'chat', chat: enteredChat })); - } - break; - case 'reconnect': - const userSession = sessions.get(data.sessionId); - if (userSession) { - sessions.set(ws.sessionId, userSession); - ws.sessionId = data.sessionId; - const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); - ws.send(JSON.stringify({ type: 'chats', chats: userChats })); - } - else { - console.log('Session not found:', data.sessionId); - } - - broadcastUserList(); - break; - default: - console.log('Unknown message type:', data.type); - } - } catch (err) { - console.error('Failed to parse message', err); - } - - } else { - // Handle binary data - handleAudioData(ws, message); - } - - }); - - ws.on('close', () => { - sessions.delete(ws.sessionId); - broadcastUserList(); - }); +wss.on('connection', ws => { + ws.on('message', async message => handleMessage(ws, message)); + ws.on('close', () => handleClose(ws)); }); -function handleAudioData(ws, data) { +// Handle WebSocket messages +async function handleMessage(ws, message) { + let data; + try { + data = JSON.parse(message); + } catch { + return handleAudioData(ws, message); + } + + try { + switch (data.type) { + case 'sessionId': + await handleSessionId(ws); + break; + case 'join': + await handleJoin(ws, data); + break; + case 'startChat': + await handleStartChat(ws, data); + break; + case 'enterChat': + await handleEnterChat(ws, data); + break; + case 'reconnect': + await handleReconnect(ws, data); + break; + default: + console.log('Unknown message type:', data.type); + } + } catch (err) { + console.error('Failed to handle message', err); + } +} + +function handleClose(ws) { + sessions.delete(ws.sessionId); + broadcastUserList(); +} + +// Handlers for specific message types +async function handleSessionId(ws) { + ws.sessionId = generateSessionId(); + sessions.set(ws.sessionId, { language: 'en' }); + await storage.setItem('sessions', Array.from(sessions.values())); +} + +async function handleJoin(ws, { username, language }) { + sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language }); + ws.send(JSON.stringify({ type: 'sessionId', sessionId: ws.sessionId, language, storeRecordings })); + + const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); + ws.send(JSON.stringify({ type: 'chats', chats: userChats })); + + broadcastUserList(); +} + +async function handleStartChat(ws, { users }) { + const chatId = generateChatId(); + let participants = [ws.sessionId, ...users]; + participants = [...new Set(participants)]; + + chats.set(chatId, { participants, messages: [] }); + await storage.setItem('chats', Array.from(chats.values())); + + notifyParticipants(participants); + broadcastUserList(); +} + +async function handleEnterChat(ws, { chatId }) { + const enteredChat = chats.get(chatId); + const currentSession = sessions.get(ws.sessionId); + currentSession.currentChat = chatId; + if (enteredChat && enteredChat.participants.includes(ws.sessionId)) { + ws.send(JSON.stringify({ type: 'chat', chat: enteredChat })); + } +} + +async function handleReconnect(ws, { sessionId }) { + const userSession = sessions.get(sessionId); + if (userSession) { + sessions.set(ws.sessionId, userSession); + ws.sessionId = sessionId; + const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); + ws.send(JSON.stringify({ type: 'chats', chats: userChats })); + } else { + console.log('Session not found:', sessionId); + } + broadcastUserList(); +} + +// Utility functions +function generateSessionId() { + return Math.random().toString(36).substring(2); +} + +function generateChatId() { + return Math.random().toString(36).substring(2); +} + +function broadcastUserList() { + const userList = Array.from(sessions.values()).map(user => ({ + username: user.username, + sessionId: user.sessionId, + currentChat: user.currentChat, + language: user.language + })); + + wss.clients.forEach(client => { + if (client.readyState === WebSocket.OPEN) { + client.send(JSON.stringify({ type: 'userList', users: userList })); + } + }); +} + +function notifyParticipants(participants) { + participants.forEach(sessionId => { + const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); + if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { + const userChats = Array.from(chats.entries()) + .filter(([id, chat]) => chat.participants.includes(sessionId)) + .map(([id, chat]) => ({ id, participants: chat.participants })); + participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats })); + } + }); +} + +async function handleAudioData(ws, data) { const sessionData = sessions.get(ws.sessionId); - let language = sessionData.language || 'en'; - let task = sessionData.task || 'transcribe'; + let { language, task } = sessionData; const formData = { - task, + task: task || 'transcribe', language, vad_filter: 'true', output: 'json', @@ -208,98 +199,29 @@ function handleAudioData(ws, data) { } }; - if (language === 'auto' || language === '') { - detectLanguage(ws, formData); + if (!language || language === 'auto') { + await detectLanguage(ws, formData); } else { - transcribeAudio(ws, formData, sessionData); + await transcribeAudio(ws, formData, sessionData); } } -function detectLanguage(ws, formData) { - request.post({ url: TTS_API_URL.replace('/asr', '/detect-language'), formData }, (err, httpResponse, body) => { - if (err) return console.error('Language detection failed:', err); - const result = JSON.parse(body); - if (result && result.language_code) { - const language = result.language_code; - const sessionData = sessions.get(ws.sessionId); - sessionData.language = language; - ws.send(JSON.stringify({ type: 'languageDetected', languageDetected: result.detected_language })); - transcribeAudio(ws, formData, sessionData); - } - }); -} - -async function translateText(originalText, originalLanguage, targetLanguage) { - const prompt = "Translate this text from " + originalLanguage + " to " + targetLanguage + ": " + originalText; - - - // const llm = new LLM(); - // llm.system("Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation."); - - // let result = await llm.chat(prompt, { service: "groq", model: "mixtral-8x7b-32768" }); - // return result; - - - return groq.chat.completions - .create({ - messages: [ - { - role: "system", - content: "You are translating voice transcriptions from '" + originalLanguage + "' to '" + targetLanguage + "'. Reply with just the translation. It will be converted to speech using TTS - you can add more context if needed.", - }, - { - role: "user", - content: originalText, - }, - ], - model: "llama3-8b-8192", - }) - .then((chatCompletion) => { - let result = chatCompletion.choices[0]?.message?.content || ""; - console.log(result); - return { response: result }; - }); - - - - - // return queryLLMAxios("translate this text from " + originalLanguage + " to " + targetLanguage + ": " + originalText) - // .then(response => { - // console.log('Translation response:', response); - // return response; - // }); -} -async function queryLLM(prompt) { - const requestData = { - model: LLN_MODEL || 'qwen2', // ollama3 - prompt: prompt, - system: "Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation.", - //format: "json" - }; - const ola = new ollama.Ollama({ host: LNN_API_URL }) - await ola.generate(requestData) -} - -///obsolete function -async function queryLLMAxios(prompt) { - const requestData = { - model: LLN_MODEL || 'qwen2', - prompt: prompt, - "system": "Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation.", - "stream": false - }; - +async function detectLanguage(ws, formData) { try { - const response = await axios.post(LNN_API_URL, requestData, { - headers: { - // 'Authorization': `Bearer ${OLLAMA_API_KEY}`, - 'Content-Type': 'application/json' - } + const result = await requestPromise({ + method: 'POST', + url: TTS_API_URL.replace('/asr', '/detect-language'), + formData }); - return response.data; - } catch (error) { - console.error('Error calling Ollama API:', error.response ? error.response.data : error.message); - throw error; + const { language_code } = JSON.parse(result); + if (language_code) { + const sessionData = sessions.get(ws.sessionId); + sessionData.language = language_code; + ws.send(JSON.stringify({ type: 'languageDetected', languageDetected: language_code })); + await transcribeAudio(ws, formData, sessionData); + } + } catch (err) { + console.error('Language detection failed:', err); } } @@ -307,9 +229,13 @@ async function transcribeAudio(ws, formData, sessionData) { const start = new Date().getTime(); queueCounter++; - request.post({ url: TTS_API_URL, formData }, (err, httpResponse, body) => { + try { + if(sessionData.language) { + formData.language = sessionData.language; + } + formData.vad_filter = 'true'; + const body = await requestPromise({ method: 'POST', url: TTS_API_URL, formData }); queueCounter--; - if (err) return console.error('Transcription failed:', err); const duration = new Date().getTime() - start; ws.send(JSON.stringify({ @@ -320,83 +246,77 @@ async function transcribeAudio(ws, formData, sessionData) { text: body })); - if (sessionData.currentChat) { - const chat = chats.get(sessionData.currentChat); - if (chat) { - let msg = { sender: sessionData.username, text: body, translations: [] }; - // init messages array if it doesn't exist - if (!chat.messages) chat.messages = []; - chat.messages.push(msg); + await handleChatTranscription(ws, body, sessionData); - chat.participants.forEach(sessionId => { - if (sessionId !== ws.sessionId) { - let targetLang = sessions.get(sessionId)?.language || 'en'; - //targetLang = "bg"; - if (targetLang !== sessionData.language) { - console.log('Translating message "' + body + '" from ' + sessionData.language + ' to ' + targetLang); - translateText(body, sessionData.language, targetLang) - .then(translation => { - let jsonResp; - if (typeof translation === 'string') { - try { - jsonResp = JSON.parse(translation); - } catch (e) { - console.error('Failed to parse translation response:', e); - ws.send(JSON.stringify({ type: 'error', message: 'Invalid translation response' })); - return; - } - } else { - jsonResp = translation; - } - - const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); - if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { - participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + jsonResp.response + "\n" })); - - // Generate and send the speech audio - generateSpeech(jsonResp.response) - .then(audioBuffer => { - console.log('Generated audio for translation:', audioBuffer.length); - msg.translations.push({ language: targetLang, text: jsonResp.response, audio: audioBuffer.toString('base64') }); - participantSocket.send(JSON.stringify({ type: 'audio', audio: audioBuffer.toString('base64') })); - }); - - } - }); - } - else { - const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); - if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { - participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + body + "\n" })); - participantSocket.send(JSON.stringify({ type: 'audio', audio: formData.toString('base64') })); - } - } - } - }); - - } - } - }); + } catch (err) { + console.error('Transcription failed:', err); + } if (storeRecordings) { const timestamp = Date.now(); - fs.mkdir('rec', { recursive: true }, (err) => { - if (err) throw err; - }); - fs.writeFile(`rec/audio${timestamp}.ogg`, formData.audio_file.value, (err) => { - if (err) console.log(err); - else console.log('Audio data saved to rec/audio' + timestamp + '.ogg'); + fs.mkdir('rec', { recursive: true }, err => { + if (err) console.error(err); + else { + fs.writeFile(`rec/audio${timestamp}.ogg`, formData.audio_file.value, err => { + if (err) console.error(err); + else console.log(`Audio data saved to rec/audio${timestamp}.ogg`); + }); + } }); } } -function broadcastUserList() { - const userList = Array.from(sessions.values()).map(user => ({ username: user.username, sessionId: user.sessionId, currentChat: user.currentChat, language: user.language })); - wss.clients.forEach(client => { - if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify({ type: 'userList', users: userList })); +async function handleChatTranscription(ws, body, sessionData) { + if (sessionData.currentChat) { + const chat = chats.get(sessionData.currentChat); + if (chat) { + let msg = { sender: sessionData.username, text: body, translations: [] }; + chat.messages.push(msg); + + for (let sessionId of chat.participants) { + if (sessionId !== ws.sessionId) { + const targetLang = sessions.get(sessionId)?.language || 'en'; + if (targetLang !== sessionData.language) { + const translation = await translateText(body, sessionData.language, targetLang); + msg.translations.push({ language: targetLang, text: translation }); + + const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); + if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { + participantSocket.send(JSON.stringify({ type: 'text', text: `${sessionData.username}: ${translation}` })); + const audioBuffer = await generateSpeech(translation); + participantSocket.send(JSON.stringify({ type: 'audio', audio: audioBuffer.toString('base64') })); + } + } else { + const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); + if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { + participantSocket.send(JSON.stringify({ type: 'text', text: `${sessionData.username}: ${body}` })); + participantSocket.send(JSON.stringify({ type: 'audio', audio: formData.toString('base64') })); + } + } + } + } } + } +} + +async function translateText(originalText, originalLanguage, targetLanguage) { + const prompt = `Translate this text from ${originalLanguage} to ${targetLanguage}: ${originalText}`; + + const response = await groq.chat.completions.create({ + messages: [ + { + role: "system", + content: `You are translating voice transcriptions from '${originalLanguage}' to '${targetLanguage}'. Reply with just the translation.`, + }, + { + role: "user", + content: originalText, + }, + ], + model: "llama3-8b-8192", }); + + return response.choices[0]?.message?.content || ""; } async function generateSpeech(text) { @@ -405,8 +325,7 @@ async function generateSpeech(text) { voice: "alloy", input: text, }); - const buffer = Buffer.from(await mp3.arrayBuffer()); - return buffer; + return Buffer.from(await mp3.arrayBuffer()); } // HTTP Server @@ -427,23 +346,23 @@ app.get('/wsurl', (req, res) => { res.status(200).send(process.env.WS_URL); }); -app.get('/settings', (req, res) => { +app.get('/settings', async (req, res) => { if (req.query.language) { language = req.query.language; - storage.setItem('language', language); + await storage.setItem('language', language); } if (req.query.storeRecordings) { storeRecordings = req.query.storeRecordings; - storage.setItem('storeRecordings', storeRecordings); + await storage.setItem('storeRecordings', storeRecordings); } res.status(200).send({ language, storeRecordings }); }); -app.post('/settings', (req, res) => { +app.post('/settings', async (req, res) => { const { sessionId, language, storeRecordings, task } = req.body; const sessionData = sessions.get(sessionId); if (language) sessionData.language = language; - if (storeRecordings) sessionData.storeRecordings = storeRecordings; + if (storeRecordings !== undefined) sessionData.storeRecordings = storeRecordings; if (task) sessionData.task = task; res.status(200).send('OK'); }); @@ -451,7 +370,7 @@ app.post('/settings', (req, res) => { app.post('/upload', (req, res) => { const timestamp = Date.now(); console.log('Received audio data:', timestamp); - fs.mkdir('rec', { recursive: true }, (err) => { + fs.mkdir('rec', { recursive: true }, err => { if (err) return res.status(500).send('ERROR'); const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`); req.pipe(file); @@ -468,3 +387,13 @@ app.get('/chats', (req, res) => { app.listen(PORT_HTTP, () => { console.log(`Server listening on port ${PORT_HTTP}`); }); + +// Helper to wrap request in a promise +function requestPromise(options) { + return new Promise((resolve, reject) => { + request(options, (error, response, body) => { + if (error) return reject(error); + resolve(body); + }); + }); +}