const express = require('express'); const bodyParser = require('body-parser'); const WebSocket = require('ws'); const storage = require('node-persist'); const request = require('request'); const fs = require('fs'); const path = require('path'); const dotenv = require('dotenv'); const ollama = require('ollama'); const axios = require('axios'); const OpenAI = require('openai'); const Groq = require('groq-sdk'); const { PrismaClient } = require('@prisma/client'); const prisma = new PrismaClient(); // Load environment variables dotenv.config({ path: `.env${ process.env.NODE_ENV === 'development' ? '.development' : '.' + process.env.NODE_ENV }` }) console.log(`loaded env file: ${process.env.NODE_ENV}`) // Initialize services const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }) const groq = new Groq({ apiKey: process.env.GROQ_API_KEY }) // Express setup const app = express() app.use(bodyParser.json()) // Configuration constants const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000 const PORT_WS = process.env.SERVER_PORT_WS || 8082 const TTS_API_URL = process.env.TTS_API_URL const LNN_API_URL = process.env.LNN_API_URL const LLN_MODEL = process.env.LLN_MODEL console.log(`TTS API URL: ${TTS_API_URL}`) let language = 'en' let storeRecordings = false let queueCounter = 0 const sessions = new Map() const chats = new Map() // Store chat rooms // User registration app.post('/register', async (req, res) => { const { username, password } = req.body; try { const user = await prisma.user.create({ data: { username, password: hashPassword(password) }, }); res.status(201).json({ message: 'User registered successfully', userId: user.id }); } catch (error) { res.status(400).json({ error: 'Username already exists' }); } }); // User login app.post('/login', async (req, res) => { const { username, password } = req.body; const user = await prisma.user.findUnique({ where: { username } }); if (user && verifyPassword(password, user.password)) { const session = await prisma.session.create({ data: { userId: user.id, lastLogin: new Date() }, }); res.json({ sessionId: session.id, userId: user.id }); } else { res.status(401).json({ error: 'Invalid credentials' }); } }); // // Initialize storage and load initial values // async function initStorage () { // await storage.init() // language = (await storage.getItem('language')) || language // storeRecordings = // (await storage.getItem('storeRecordings')) || storeRecordings // const storedChats = (await storage.getItem('chats')) || [] // storedChats.forEach(chat => chats.set(chat.id, chat)) // const storedSessions = (await storage.getItem('sessions')) || [] // storedSessions.forEach(session => sessions.set(session.sessionId, session)) // } // initStorage() // WebSocket Server const wss = new WebSocket.Server({ port: PORT_WS }) wss.on('connection', ws => { ws.on('message', async message => handleMessage(ws, message)) ws.on('close', () => handleClose(ws)) }) // Handle WebSocket messages async function handleMessage (ws, message) { let data try { data = JSON.parse(message) } catch { return handleAudioData(ws, message) } try { switch (data.type) { case 'sessionId': await handleSessionId(ws) break case 'join': await handleJoin(ws, data) break case 'startChat': await handleStartChat(ws, data) break case 'enterChat': await handleEnterChat(ws, data) break case 'reconnect': await handleReconnect(ws, data) break default: console.log('Unknown message type:', data.type) } } catch (err) { console.error('Failed to handle message', err) } } function handleClose (ws) { sessions.delete(ws.sessionId) broadcastUserList() } // Handlers for specific message types async function handleSessionId (ws) { ws.sessionId = generateSessionId() sessions.set(ws.sessionId, { language: 'en' }) await storage.setItem('sessions', Array.from(sessions.values())) } // Modified handleJoin function async function handleJoin(ws, { username, language, sessionId }) { const session = await prisma.session.update({ where: { id: sessionId }, data: { language }, include: { user: true }, }); ws.sessionId = sessionId; ws.userId = session.userId; ws.send(JSON.stringify({ type: 'sessionId', sessionId, language, storeRecordings })); const userChats = await prisma.chat.findMany({ where: { participants: { some: { userId: session.userId } } }, include: { participants: true }, }); ws.send(JSON.stringify({ type: 'chats', chats: userChats })); broadcastUserList(); } async function handleStartChat(ws, { users }) { const chatId = generateChatId(); let participants = [ws.userId, ...users]; participants = [...new Set(participants)]; const chat = await prisma.chat.create({ data: { id: chatId, participants: { connect: participants.map(userId => ({ id: userId })), }, }, include: { participants: true }, }); notifyParticipants(participants); broadcastUserList(); } async function handleEnterChat(ws, { chatId }) { const enteredChat = await prisma.chat.findUnique({ where: { id: chatId }, include: { participants: true, messages: true }, }); if (enteredChat && enteredChat.participants.some(p => p.id === ws.userId)) { await prisma.session.update({ where: { id: ws.sessionId }, data: { currentChatId: chatId }, }); ws.send(JSON.stringify({ type: 'chat', chat: enteredChat })); } } async function handleReconnect (ws, { sessionId }) { const userSession = sessions.get(sessionId) if (userSession) { sessions.set(ws.sessionId, userSession) ws.sessionId = sessionId const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId) ) ws.send(JSON.stringify({ type: 'chats', chats: userChats })) } else { console.log('Session not found:', sessionId) } broadcastUserList() } // Utility functions function generateSessionId () { return Math.random().toString(36).substring(2) } function generateChatId () { return Math.random().toString(36).substring(2) } async function broadcastUserList() { const users = await prisma.session.findMany({ include: { user: true }, }); const userList = users.map(session => ({ username: session.user.username, sessionId: session.id, currentChat: session.currentChatId, language: session.language, })); wss.clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify({ type: 'userList', users: userList })); } }); } function notifyParticipants(participants) { participants.forEach(sessionId => { const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { const userChats = Array.from(chats.entries()) .filter(([id, chat]) => chat.participants.includes(sessionId)) .map(([id, chat]) => ({ id, participants: chat.participants })); participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats })); } }); } async function handleAudioData (ws, data) { const sessionData = sessions.get(ws.sessionId) let { language, task } = sessionData const formData = { task: task || 'transcribe', language, vad_filter: 'true', output: 'json', audio_file: { value: data, options: { filename: 'audio.ogg', contentType: 'audio/ogg' } } } if (!language || language === 'auto') { await detectLanguage(ws, formData) } else { await transcribeAudio(ws, formData, sessionData) } } async function detectLanguage (ws, formData) { try { const result = await requestPromise({ method: 'POST', url: TTS_API_URL.replace('/asr', '/detect-language'), formData }) const { language_code } = JSON.parse(result) if (language_code) { const sessionData = sessions.get(ws.sessionId) sessionData.language = language_code ws.send( JSON.stringify({ type: 'languageDetected', languageDetected: language_code }) ) await transcribeAudio(ws, formData, sessionData) } } catch (err) { console.error('Language detection failed:', err) } } async function transcribeAudio (ws, formData, sessionData) { const start = new Date().getTime() queueCounter++ try { if (sessionData.language) { formData.language = sessionData.language } formData.vad_filter = 'true' const body = await requestPromise({ method: 'POST', url: TTS_API_URL, formData }) queueCounter-- const duration = new Date().getTime() - start ws.send( JSON.stringify({ type: 'text', queueCounter, duration, language: sessionData.language, text: body }) ) await handleChatTranscription(ws, body, sessionData) } catch (err) { console.error('Transcription failed:', err) } if (storeRecordings) { const timestamp = Date.now() fs.mkdir('rec', { recursive: true }, err => { if (err) console.error(err) else { fs.writeFile( `rec/audio${timestamp}.ogg`, formData.audio_file.value, err => { if (err) console.error(err) else console.log(`Audio data saved to rec/audio${timestamp}.ogg`) } ) } }) } } async function handleChatTranscription (ws, body, sessionData) { if (sessionData.currentChat) { const chat = chats.get(sessionData.currentChat) if (chat) { let msg = { sender: sessionData.username, text: body, translations: [] } chat.messages.push(msg) for (let sessionId of chat.participants) { if (sessionId !== ws.sessionId) { const targetLang = sessions.get(sessionId)?.language || 'en' if (targetLang !== sessionData.language) { const translation = await translateText( body, sessionData.language, targetLang ) msg.translations.push({ language: targetLang, text: translation }) const participantSocket = Array.from(wss.clients).find( client => client.sessionId === sessionId ) if ( participantSocket && participantSocket.readyState === WebSocket.OPEN ) { participantSocket.send( JSON.stringify({ type: 'text', text: `${sessionData.username}: ${translation}` }) ) const audioBuffer = await generateSpeech(translation) participantSocket.send( JSON.stringify({ type: 'audio', audio: audioBuffer.toString('base64') }) ) } } else { const participantSocket = Array.from(wss.clients).find( client => client.sessionId === sessionId ) if ( participantSocket && participantSocket.readyState === WebSocket.OPEN ) { participantSocket.send( JSON.stringify({ type: 'text', text: `${sessionData.username}: ${body}` }) ) participantSocket.send( JSON.stringify({ type: 'audio', audio: formData.toString('base64') }) ) } } } } } } } async function translateText (originalText, originalLanguage, targetLanguage) { const prompt = `Translate this text from ${originalLanguage} to ${targetLanguage}: ${originalText}` const response = await groq.chat.completions.create({ messages: [ { role: 'system', content: `You are translating voice transcriptions from '${originalLanguage}' to '${targetLanguage}'. Reply with just the translation.` }, { role: 'user', content: originalText } ], model: 'llama3-8b-8192' }) return response.choices[0]?.message?.content || '' } async function generateSpeech (text) { const mp3 = await openai.audio.speech.create({ model: 'tts-1', voice: 'alloy', input: text }) return Buffer.from(await mp3.arrayBuffer()) } // HTTP Server app.get('/', (req, res) => { res.sendFile(path.join(__dirname, 'chat-client.html')) }) app.get('/audio.js', (req, res) => { res.sendFile(path.join(__dirname, 'audio.js')) }) app.post('/log', (req, res) => { console.log(`[LOG ${new Date().toISOString()}] ${req.body.message}`) res.status(200).send('OK') }) app.get('/wsurl', (req, res) => { // if(process.env.PUBLIC_HOSTNAME){ // process.env.WS_URL = `wss://${process.env.PUBLIC_HOSTNAME}` // } console.log('Request for WS URL resolved with:', process.env.WS_URL) res.status(200).send(process.env.WS_URL) }) app.get('/settings', async (req, res) => { if (req.query.language) { language = req.query.language await storage.setItem('language', language) } if (req.query.storeRecordings) { storeRecordings = req.query.storeRecordings await storage.setItem('storeRecordings', storeRecordings) } res.status(200).send({ language, storeRecordings }) }) app.post('/settings', async (req, res) => { const { sessionId, language, storeRecordings, task } = req.body const sessionData = sessions.get(sessionId) if (language) sessionData.language = language if (storeRecordings !== undefined) sessionData.storeRecordings = storeRecordings if (task) sessionData.task = task res.status(200).send('OK') }) app.post('/upload', (req, res) => { const timestamp = Date.now() console.log('Received audio data:', timestamp) fs.mkdir('rec', { recursive: true }, err => { if (err) return res.status(500).send('ERROR') const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`) req.pipe(file) file.on('finish', () => res.status(200).send('OK')) }) }) app.get('/chats', (req, res) => { const { username } = req.query const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(username) ) res.status(200).send({ chats: userChats }) }) app.listen(PORT_HTTP, () => { console.log(`Server listening on port ${PORT_HTTP}`) console.log(process.env.TTS_BACKEND_URL) }) // Helper to wrap request in a promise function requestPromise (options) { return new Promise((resolve, reject) => { request(options, (error, response, body) => { if (error) return reject(error) resolve(body) }) }) }