const express = require('express'); const bodyParser = require('body-parser'); const WebSocket = require('ws'); const storage = require('node-persist'); const request = require('request'); const fs = require('fs'); const path = require('path'); const dotenv = require('dotenv'); const ollama = require('ollama'); const axios = require('axios'); const OpenAI = require('openai'); const Groq = require('groq-sdk'); // Load environment variables dotenv.config({ path: `.env${process.env.NODE_ENV === 'development' ? '.development' : ''}` }); // Initialize services const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); const groq = new Groq({ apiKey: process.env.GROQ_API_KEY }); // Express setup const app = express(); app.use(bodyParser.json()); // Configuration constants const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000; const PORT_WS = process.env.SERVER_PORT_WS || 8080; const TTS_API_URL = process.env.TTS_API_URL; const LNN_API_URL = process.env.LNN_API_URL; const LLN_MODEL = process.env.LLN_MODEL; let language = "en"; let storeRecordings = false; let queueCounter = 0; const sessions = new Map(); const chats = new Map(); // Store chat rooms // Initialize storage and load initial values async function initStorage() { await storage.init(); language = await storage.getItem('language') || language; storeRecordings = await storage.getItem('storeRecordings') || storeRecordings; const storedChats = await storage.getItem('chats') || []; storedChats.forEach(chat => chats.set(chat.id, chat)); const storedSessions = await storage.getItem('sessions') || []; storedSessions.forEach(session => sessions.set(session.sessionId, session)); } initStorage(); // WebSocket Server const wss = new WebSocket.Server({ port: PORT_WS }); wss.on('connection', ws => { ws.on('message', async message => handleMessage(ws, message)); ws.on('close', () => handleClose(ws)); }); // Handle WebSocket messages async function handleMessage(ws, message) { let data; try { data = JSON.parse(message); } catch { return handleAudioData(ws, message); } try { switch (data.type) { case 'sessionId': await handleSessionId(ws); break; case 'join': await handleJoin(ws, data); break; case 'startChat': await handleStartChat(ws, data); break; case 'enterChat': await handleEnterChat(ws, data); break; case 'reconnect': await handleReconnect(ws, data); break; default: console.log('Unknown message type:', data.type); } } catch (err) { console.error('Failed to handle message', err); } } function handleClose(ws) { sessions.delete(ws.sessionId); broadcastUserList(); } // Handlers for specific message types async function handleSessionId(ws) { ws.sessionId = generateSessionId(); sessions.set(ws.sessionId, { language: 'en' }); await storage.setItem('sessions', Array.from(sessions.values())); } async function handleJoin(ws, { username, language }) { sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language }); ws.send(JSON.stringify({ type: 'sessionId', sessionId: ws.sessionId, language, storeRecordings })); const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); ws.send(JSON.stringify({ type: 'chats', chats: userChats })); broadcastUserList(); } async function handleStartChat(ws, { users }) { const chatId = generateChatId(); let participants = [ws.sessionId, ...users]; participants = [...new Set(participants)]; chats.set(chatId, { participants, messages: [] }); await storage.setItem('chats', Array.from(chats.values())); notifyParticipants(participants); broadcastUserList(); } async function handleEnterChat(ws, { chatId }) { const enteredChat = chats.get(chatId); const currentSession = sessions.get(ws.sessionId); currentSession.currentChat = chatId; if (enteredChat && enteredChat.participants.includes(ws.sessionId)) { ws.send(JSON.stringify({ type: 'chat', chat: enteredChat })); } } async function handleReconnect(ws, { sessionId }) { const userSession = sessions.get(sessionId); if (userSession) { sessions.set(ws.sessionId, userSession); ws.sessionId = sessionId; const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId)); ws.send(JSON.stringify({ type: 'chats', chats: userChats })); } else { console.log('Session not found:', sessionId); } broadcastUserList(); } // Utility functions function generateSessionId() { return Math.random().toString(36).substring(2); } function generateChatId() { return Math.random().toString(36).substring(2); } function broadcastUserList() { const userList = Array.from(sessions.values()).map(user => ({ username: user.username, sessionId: user.sessionId, currentChat: user.currentChat, language: user.language })); wss.clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify({ type: 'userList', users: userList })); } }); } function notifyParticipants(participants) { participants.forEach(sessionId => { const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { const userChats = Array.from(chats.entries()) .filter(([id, chat]) => chat.participants.includes(sessionId)) .map(([id, chat]) => ({ id, participants: chat.participants })); participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats })); } }); } async function handleAudioData(ws, data) { const sessionData = sessions.get(ws.sessionId); let { language, task } = sessionData; const formData = { task: task || 'transcribe', language, vad_filter: 'true', output: 'json', audio_file: { value: data, options: { filename: 'audio.ogg', contentType: 'audio/ogg' } } }; if (!language || language === 'auto') { await detectLanguage(ws, formData); } else { await transcribeAudio(ws, formData, sessionData); } } async function detectLanguage(ws, formData) { try { const result = await requestPromise({ method: 'POST', url: TTS_API_URL.replace('/asr', '/detect-language'), formData }); const { language_code } = JSON.parse(result); if (language_code) { const sessionData = sessions.get(ws.sessionId); sessionData.language = language_code; ws.send(JSON.stringify({ type: 'languageDetected', languageDetected: language_code })); await transcribeAudio(ws, formData, sessionData); } } catch (err) { console.error('Language detection failed:', err); } } async function transcribeAudio(ws, formData, sessionData) { const start = new Date().getTime(); queueCounter++; try { if(sessionData.language) { formData.language = sessionData.language; } formData.vad_filter = 'true'; const body = await requestPromise({ method: 'POST', url: TTS_API_URL, formData }); queueCounter--; const duration = new Date().getTime() - start; ws.send(JSON.stringify({ type: 'text', queueCounter, duration, language: sessionData.language, text: body })); await handleChatTranscription(ws, body, sessionData); } catch (err) { console.error('Transcription failed:', err); } if (storeRecordings) { const timestamp = Date.now(); fs.mkdir('rec', { recursive: true }, err => { if (err) console.error(err); else { fs.writeFile(`rec/audio${timestamp}.ogg`, formData.audio_file.value, err => { if (err) console.error(err); else console.log(`Audio data saved to rec/audio${timestamp}.ogg`); }); } }); } } async function handleChatTranscription(ws, body, sessionData) { if (sessionData.currentChat) { const chat = chats.get(sessionData.currentChat); if (chat) { let msg = { sender: sessionData.username, text: body, translations: [] }; chat.messages.push(msg); for (let sessionId of chat.participants) { if (sessionId !== ws.sessionId) { const targetLang = sessions.get(sessionId)?.language || 'en'; if (targetLang !== sessionData.language) { const translation = await translateText(body, sessionData.language, targetLang); msg.translations.push({ language: targetLang, text: translation }); const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { participantSocket.send(JSON.stringify({ type: 'text', text: `${sessionData.username}: ${translation}` })); const audioBuffer = await generateSpeech(translation); participantSocket.send(JSON.stringify({ type: 'audio', audio: audioBuffer.toString('base64') })); } } else { const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { participantSocket.send(JSON.stringify({ type: 'text', text: `${sessionData.username}: ${body}` })); participantSocket.send(JSON.stringify({ type: 'audio', audio: formData.toString('base64') })); } } } } } } } async function translateText(originalText, originalLanguage, targetLanguage) { const prompt = `Translate this text from ${originalLanguage} to ${targetLanguage}: ${originalText}`; const response = await groq.chat.completions.create({ messages: [ { role: "system", content: `You are translating voice transcriptions from '${originalLanguage}' to '${targetLanguage}'. Reply with just the translation.`, }, { role: "user", content: originalText, }, ], model: "llama3-8b-8192", }); return response.choices[0]?.message?.content || ""; } async function generateSpeech(text) { const mp3 = await openai.audio.speech.create({ model: "tts-1", voice: "alloy", input: text, }); return Buffer.from(await mp3.arrayBuffer()); } // HTTP Server app.get('/', (req, res) => { res.sendFile(path.join(__dirname, 'chat-client.html')); }); app.get('/audio.js', (req, res) => { res.sendFile(path.join(__dirname, 'audio.js')); }); app.post('/log', (req, res) => { console.log(`[LOG ${new Date().toISOString()}] ${req.body.message}`); res.status(200).send('OK'); }); app.get('/wsurl', (req, res) => { res.status(200).send(process.env.WS_URL); }); app.get('/settings', async (req, res) => { if (req.query.language) { language = req.query.language; await storage.setItem('language', language); } if (req.query.storeRecordings) { storeRecordings = req.query.storeRecordings; await storage.setItem('storeRecordings', storeRecordings); } res.status(200).send({ language, storeRecordings }); }); app.post('/settings', async (req, res) => { const { sessionId, language, storeRecordings, task } = req.body; const sessionData = sessions.get(sessionId); if (language) sessionData.language = language; if (storeRecordings !== undefined) sessionData.storeRecordings = storeRecordings; if (task) sessionData.task = task; res.status(200).send('OK'); }); app.post('/upload', (req, res) => { const timestamp = Date.now(); console.log('Received audio data:', timestamp); fs.mkdir('rec', { recursive: true }, err => { if (err) return res.status(500).send('ERROR'); const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`); req.pipe(file); file.on('finish', () => res.status(200).send('OK')); }); }); app.get('/chats', (req, res) => { const { username } = req.query; const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(username)); res.status(200).send({ chats: userChats }); }); app.listen(PORT_HTTP, () => { console.log(`Server listening on port ${PORT_HTTP}`); }); // Helper to wrap request in a promise function requestPromise(options) { return new Promise((resolve, reject) => { request(options, (error, response, body) => { if (error) return reject(error); resolve(body); }); }); }