const express = require('express'); const bodyParser = require('body-parser'); const WebSocket = require('ws'); const storage = require('node-persist'); const request = require('request'); const fs = require('fs'); const path = require('path'); const dotenv = require('dotenv'); const ollama = require('ollama'); const axios = require('axios'); // import OpenAI from "openai"; const OpenAI = require('openai'); const openai = new OpenAI({ apiKey: "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" }); const Groq = require('groq-sdk'); //const LLM = require("@themaximalist/llm.js"); //https://www.npmjs.com/package/@themaximalist/llm.js const groq = new Groq({ apiKey: process.env.GROQ_API_KEY }); if (dotenv) { const envFile = process.env.NODE_ENV === 'development' ? '.env.development' : '.env'; dotenv.config({ path: envFile }); } const app = express(); app.use(bodyParser.json()); const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000; const PORT_WS = process.env.SERVER_PORT_WS || 8080; const TTS_API_URL = process.env.TTS_API_URL; const LNN_API_URL = process.env.LNN_API_URL; const LLN_MODEL = process.env.LLN_MODEL; let language = "en"; let storeRecordings = false; let queueCounter = 0; const sessions = new Map(); const chats = new Map(); // Store chat rooms storage.init().then(() => { storage.getItem('language').then((value) => { if (value !== undefined) language = value; else storage.setItem('language', language); }); storage.getItem('storeRecordings').then((value) => { if (value !== undefined) storeRecordings = value; else storage.setItem('storeRecordings', storeRecordings); }); // Load existing chats, sessions from storage storage.getItem('chats').then((value) => { if (value !== undefined) { value.forEach(chat => chats.set(chat.id, chat)); } }); storage.getItem('sessions').then((value) => { if (value !== undefined) { value.forEach(session => sessions.set(session.sessionId, session)); } }); }); // WebSocket Server const wss = new WebSocket.Server({ port: PORT_WS }); wss.on('connection', (ws) => { ws.on('message', (message) => { let isJson = false; let data = null; // Check if message is JSON try { data = JSON.parse(message); isJson = true; } catch (e) { } if (isJson) { // Handle JSON messages try { console.log('Received message:', data.type); switch (data.type) { case 'sessionId': ws.sessionId = Math.random().toString(36).substring(2); sessions.set(ws.sessionId, { language: 'en' }); //store new session in storage sessions storage.getItem('sessions').then((value) => { if (value !== undefined) { value.push({ sessionId: ws.sessionId, language: 'en' }); storage.setItem('sessions', value); } else { storage.setItem('sessions', [{ sessionId: ws.sessionId, language: 'en' }]); } } ); break; case 'join': const { username, language } = data; sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language }); ws.send(JSON.stringify({ type: 'sessionId', sessionId: ws.sessionId, language, storeRecordings })); const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); ws.send(JSON.stringify({ type: 'chats', chats: userChats })); broadcastUserList(); break; case 'startChat': const { users: chatUsers } = data; const chatId = Math.random().toString(36).substring(2); let participants = [ws.sessionId, ...chatUsers]; // Deduplicate participants participants = [...new Set(participants)]; chats.set(chatId, { participants, messages: [] }); //store new chat in storage chats storage.getItem('chats').then((value) => { if (value !== undefined) { value.push({ id: chatId, participants }); storage.setItem('chats', value); } else { storage.setItem('chats', [{ id: chatId, participants }]); } }); const userNames = participants.map(sessionId => { const user = sessions.get(sessionId); return user ? `${user.username}(${user.sessionId})` : 'Unknown User'; }); console.log('Creating chat room. Users:', userNames); participants.forEach(sessionId => { const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { const userChats = Array.from(chats.entries()) .filter(([id, chat]) => chat.participants.includes(sessionId)) .map(([id, chat]) => ({ id, participants: chat.participants })); participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats })); } }); broadcastUserList(); break; // case 'audio': // console.log('(queue ' + queueCounter + ') Received ' + (data.audio.length / 1024 / 1024).toFixed(3) + ' MB audio from client.'); // handleAudioData(ws, data.audiobase64); // break; case 'enterChat': const { chatId: Id } = data; const enteredChat = chats.get(Id); const currentSession = sessions.get(ws.sessionId); currentSession.currentChat = Id; if (enteredChat && enteredChat.participants.includes(ws.sessionId)) { ws.send(JSON.stringify({ type: 'chat', chat: enteredChat })); } break; case 'reconnect': const userSession = sessions.get(data.sessionId); if (userSession) { sessions.set(ws.sessionId, userSession); ws.sessionId = data.sessionId; const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); ws.send(JSON.stringify({ type: 'chats', chats: userChats })); } else { console.log('Session not found:', data.sessionId); } broadcastUserList(); break; default: console.log('Unknown message type:', data.type); } } catch (err) { console.error('Failed to parse message', err); } } else { // Handle binary data handleAudioData(ws, message); } }); ws.on('close', () => { sessions.delete(ws.sessionId); broadcastUserList(); }); }); function handleAudioData(ws, data) { const sessionData = sessions.get(ws.sessionId); let language = sessionData.language || 'en'; let task = sessionData.task || 'transcribe'; const formData = { task, language, vad_filter: 'true', output: 'json', audio_file: { value: data, options: { filename: 'audio.ogg', contentType: 'audio/ogg' } } }; if (language === 'auto' || language === '') { detectLanguage(ws, formData); } else { transcribeAudio(ws, formData, sessionData); } } function detectLanguage(ws, formData) { request.post({ url: TTS_API_URL.replace('/asr', '/detect-language'), formData }, (err, httpResponse, body) => { if (err) return console.error('Language detection failed:', err); const result = JSON.parse(body); if (result && result.language_code) { const language = result.language_code; const sessionData = sessions.get(ws.sessionId); sessionData.language = language; ws.send(JSON.stringify({ type: 'languageDetected', languageDetected: result.detected_language })); transcribeAudio(ws, formData, sessionData); } }); } async function translateText(originalText, originalLanguage, targetLanguage) { const prompt = "Translate this text from " + originalLanguage + " to " + targetLanguage + ": " + originalText; // const llm = new LLM(); // llm.system("Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation."); // let result = await llm.chat(prompt, { service: "groq", model: "mixtral-8x7b-32768" }); // return result; return groq.chat.completions .create({ messages: [ { role: "system", content: "You are translating voice transcriptions from '" + originalLanguage + "' to '" + targetLanguage + "'. Reply with just the translation. It will be converted to speech using TTS - you can add more context if needed.", }, { role: "user", content: originalText, }, ], model: "llama3-8b-8192", }) .then((chatCompletion) => { let result = chatCompletion.choices[0]?.message?.content || ""; console.log(result); return { response: result }; }); // return queryLLMAxios("translate this text from " + originalLanguage + " to " + targetLanguage + ": " + originalText) // .then(response => { // console.log('Translation response:', response); // return response; // }); } async function queryLLM(prompt) { const requestData = { model: LLN_MODEL || 'qwen2', // ollama3 prompt: prompt, system: "Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation.", //format: "json" }; const ola = new ollama.Ollama({ host: LNN_API_URL }) await ola.generate(requestData) } ///obsolete function async function queryLLMAxios(prompt) { const requestData = { model: LLN_MODEL || 'qwen2', prompt: prompt, "system": "Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation.", "stream": false }; try { const response = await axios.post(LNN_API_URL, requestData, { headers: { // 'Authorization': `Bearer ${OLLAMA_API_KEY}`, 'Content-Type': 'application/json' } }); return response.data; } catch (error) { console.error('Error calling Ollama API:', error.response ? error.response.data : error.message); throw error; } } async function transcribeAudio(ws, formData, sessionData) { const start = new Date().getTime(); queueCounter++; request.post({ url: TTS_API_URL, formData }, (err, httpResponse, body) => { queueCounter--; if (err) return console.error('Transcription failed:', err); const duration = new Date().getTime() - start; ws.send(JSON.stringify({ type: 'text', queueCounter, duration, language: sessionData.language, text: body })); if (sessionData.currentChat) { const chat = chats.get(sessionData.currentChat); if (chat) { let msg = { sender: sessionData.username, text: body, translations: [] }; // init messages array if it doesn't exist if (!chat.messages) chat.messages = []; chat.messages.push(msg); chat.participants.forEach(sessionId => { if (sessionId !== ws.sessionId) { let targetLang = sessions.get(sessionId)?.language || 'en'; //targetLang = "bg"; if (targetLang !== sessionData.language) { console.log('Translating message "' + body + '" from ' + sessionData.language + ' to ' + targetLang); translateText(body, sessionData.language, targetLang) .then(translation => { let jsonResp; if (typeof translation === 'string') { try { jsonResp = JSON.parse(translation); } catch (e) { console.error('Failed to parse translation response:', e); ws.send(JSON.stringify({ type: 'error', message: 'Invalid translation response' })); return; } } else { jsonResp = translation; } const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + jsonResp.response + "\n" })); // Generate and send the speech audio generateSpeech(jsonResp.response) .then(audioBuffer => { console.log('Generated audio for translation:', audioBuffer.length); msg.translations.push({ language: targetLang, text: jsonResp.response, audio: audioBuffer.toString('base64') }); participantSocket.send(JSON.stringify({ type: 'audio', audio: audioBuffer.toString('base64') })); }); } }); } else { const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + body + "\n" })); participantSocket.send(JSON.stringify({ type: 'audio', audio: formData.toString('base64') })); } } } }); } } }); if (storeRecordings) { const timestamp = Date.now(); fs.mkdir('rec', { recursive: true }, (err) => { if (err) throw err; }); fs.writeFile(`rec/audio${timestamp}.ogg`, formData.audio_file.value, (err) => { if (err) console.log(err); else console.log('Audio data saved to rec/audio' + timestamp + '.ogg'); }); } } function broadcastUserList() { const userList = Array.from(sessions.values()).map(user => ({ username: user.username, sessionId: user.sessionId, currentChat: user.currentChat, language: user.language })); wss.clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify({ type: 'userList', users: userList })); } }); } async function generateSpeech(text) { const mp3 = await openai.audio.speech.create({ model: "tts-1", voice: "alloy", input: text, }); const buffer = Buffer.from(await mp3.arrayBuffer()); return buffer; } // HTTP Server app.get('/', (req, res) => { res.sendFile(path.join(__dirname, 'chat-client.html')); }); app.get('/audio.js', (req, res) => { res.sendFile(path.join(__dirname, 'audio.js')); }); app.post('/log', (req, res) => { console.log(`[LOG ${new Date().toISOString()}] ${req.body.message}`); res.status(200).send('OK'); }); app.get('/wsurl', (req, res) => { res.status(200).send(process.env.WS_URL); }); app.get('/settings', (req, res) => { if (req.query.language) { language = req.query.language; storage.setItem('language', language); } if (req.query.storeRecordings) { storeRecordings = req.query.storeRecordings; storage.setItem('storeRecordings', storeRecordings); } res.status(200).send({ language, storeRecordings }); }); app.post('/settings', (req, res) => { const { sessionId, language, storeRecordings, task } = req.body; const sessionData = sessions.get(sessionId); if (language) sessionData.language = language; if (storeRecordings) sessionData.storeRecordings = storeRecordings; if (task) sessionData.task = task; res.status(200).send('OK'); }); app.post('/upload', (req, res) => { const timestamp = Date.now(); console.log('Received audio data:', timestamp); fs.mkdir('rec', { recursive: true }, (err) => { if (err) return res.status(500).send('ERROR'); const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`); req.pipe(file); file.on('finish', () => res.status(200).send('OK')); }); }); app.get('/chats', (req, res) => { const { username } = req.query; const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(username)); res.status(200).send({ chats: userChats }); }); app.listen(PORT_HTTP, () => { console.log(`Server listening on port ${PORT_HTTP}`); });