gogo2/web/chat-server.js

480 lines
13 KiB
JavaScript

const express = require('express')
const bodyParser = require('body-parser')
const WebSocket = require('ws')
const storage = require('node-persist')
const request = require('request')
const fs = require('fs')
const path = require('path')
const dotenv = require('dotenv')
const ollama = require('ollama')
const axios = require('axios')
const OpenAI = require('openai')
const Groq = require('groq-sdk')
// Load environment variables
dotenv.config({
path: `.env${
process.env.NODE_ENV === 'development'
? '.development'
: '.' + process.env.NODE_ENV
}`
})
console.log(`loaded env file: ${process.env.NODE_ENV}`)
// Initialize services
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
const groq = new Groq({ apiKey: process.env.GROQ_API_KEY })
// Express setup
const app = express()
app.use(bodyParser.json())
// Configuration constants
const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000
const PORT_WS = process.env.SERVER_PORT_WS || 8082
const TTS_API_URL = process.env.TTS_API_URL
const LNN_API_URL = process.env.LNN_API_URL
const LLN_MODEL = process.env.LLN_MODEL
console.log(`TTS API URL: ${TTS_API_URL}`)
let language = 'en'
let storeRecordings = false
let queueCounter = 0
const sessions = new Map()
const chats = new Map() // Store chat rooms
// Initialize storage and load initial values
async function initStorage () {
await storage.init()
language = (await storage.getItem('language')) || language
storeRecordings =
(await storage.getItem('storeRecordings')) || storeRecordings
const storedChats = (await storage.getItem('chats')) || []
storedChats.forEach(chat => chats.set(chat.id, chat))
const storedSessions = (await storage.getItem('sessions')) || []
storedSessions.forEach(session => sessions.set(session.sessionId, session))
}
initStorage()
// WebSocket Server
const wss = new WebSocket.Server({ port: PORT_WS })
wss.on('connection', ws => {
ws.on('message', async message => handleMessage(ws, message))
ws.on('close', () => handleClose(ws))
})
// Handle WebSocket messages
async function handleMessage (ws, message) {
let data
try {
data = JSON.parse(message)
} catch {
return handleAudioData(ws, message)
}
try {
switch (data.type) {
case 'sessionId':
await handleSessionId(ws)
break
case 'join':
await handleJoin(ws, data)
break
case 'startChat':
await handleStartChat(ws, data)
break
case 'enterChat':
await handleEnterChat(ws, data)
break
case 'reconnect':
await handleReconnect(ws, data)
break
default:
console.log('Unknown message type:', data.type)
}
} catch (err) {
console.error('Failed to handle message', err)
}
}
function handleClose (ws) {
sessions.delete(ws.sessionId)
broadcastUserList()
}
// Handlers for specific message types
async function handleSessionId (ws) {
ws.sessionId = generateSessionId()
sessions.set(ws.sessionId, { language: 'en' })
await storage.setItem('sessions', Array.from(sessions.values()))
}
async function handleJoin (ws, { username, language }) {
sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language })
ws.send(
JSON.stringify({
type: 'sessionId',
sessionId: ws.sessionId,
language,
storeRecordings
})
)
const userChats = Array.from(chats.values()).filter(chat =>
chat.participants.includes(ws.sessionId)
)
ws.send(JSON.stringify({ type: 'chats', chats: userChats }))
broadcastUserList()
}
async function handleStartChat (ws, { users }) {
const chatId = generateChatId()
let participants = [ws.sessionId, ...users]
participants = [...new Set(participants)]
chats.set(chatId, { participants, messages: [] })
await storage.setItem('chats', Array.from(chats.values()))
notifyParticipants(participants)
broadcastUserList()
}
async function handleEnterChat (ws, { chatId }) {
const enteredChat = chats.get(chatId)
const currentSession = sessions.get(ws.sessionId)
currentSession.currentChat = chatId
if (enteredChat && enteredChat.participants.includes(ws.sessionId)) {
ws.send(JSON.stringify({ type: 'chat', chat: enteredChat }))
}
}
async function handleReconnect (ws, { sessionId }) {
const userSession = sessions.get(sessionId)
if (userSession) {
sessions.set(ws.sessionId, userSession)
ws.sessionId = sessionId
const userChats = Array.from(chats.values()).filter(chat =>
chat.participants.includes(ws.sessionId)
)
ws.send(JSON.stringify({ type: 'chats', chats: userChats }))
} else {
console.log('Session not found:', sessionId)
}
broadcastUserList()
}
// Utility functions
function generateSessionId () {
return Math.random().toString(36).substring(2)
}
function generateChatId () {
return Math.random().toString(36).substring(2)
}
function broadcastUserList () {
const userList = Array.from(sessions.values()).map(user => ({
username: user.username,
sessionId: user.sessionId,
currentChat: user.currentChat,
language: user.language
}))
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type: 'userList', users: userList }))
}
})
}
function notifyParticipants (participants) {
participants.forEach(sessionId => {
const participantSocket = Array.from(wss.clients).find(
client => client.sessionId === sessionId
)
if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
const userChats = Array.from(chats.entries())
.filter(([id, chat]) => chat.participants.includes(sessionId))
.map(([id, chat]) => ({ id, participants: chat.participants }))
participantSocket.send(
JSON.stringify({ type: 'chats', chats: userChats })
)
}
})
}
async function handleAudioData (ws, data) {
const sessionData = sessions.get(ws.sessionId)
let { language, task } = sessionData
const formData = {
task: task || 'transcribe',
language,
vad_filter: 'true',
output: 'json',
audio_file: {
value: data,
options: { filename: 'audio.ogg', contentType: 'audio/ogg' }
}
}
if (!language || language === 'auto') {
await detectLanguage(ws, formData)
} else {
await transcribeAudio(ws, formData, sessionData)
}
}
async function detectLanguage (ws, formData) {
try {
const result = await requestPromise({
method: 'POST',
url: TTS_API_URL.replace('/asr', '/detect-language'),
formData
})
const { language_code } = JSON.parse(result)
if (language_code) {
const sessionData = sessions.get(ws.sessionId)
sessionData.language = language_code
ws.send(
JSON.stringify({
type: 'languageDetected',
languageDetected: language_code
})
)
await transcribeAudio(ws, formData, sessionData)
}
} catch (err) {
console.error('Language detection failed:', err)
}
}
async function transcribeAudio (ws, formData, sessionData) {
const start = new Date().getTime()
queueCounter++
try {
if (sessionData.language) {
formData.language = sessionData.language
}
formData.vad_filter = 'true'
const body = await requestPromise({
method: 'POST',
url: TTS_API_URL,
formData
})
queueCounter--
const duration = new Date().getTime() - start
ws.send(
JSON.stringify({
type: 'text',
queueCounter,
duration,
language: sessionData.language,
text: body
})
)
await handleChatTranscription(ws, body, sessionData)
} catch (err) {
console.error('Transcription failed:', err)
}
if (storeRecordings) {
const timestamp = Date.now()
fs.mkdir('rec', { recursive: true }, err => {
if (err) console.error(err)
else {
fs.writeFile(
`rec/audio${timestamp}.ogg`,
formData.audio_file.value,
err => {
if (err) console.error(err)
else console.log(`Audio data saved to rec/audio${timestamp}.ogg`)
}
)
}
})
}
}
async function handleChatTranscription (ws, body, sessionData) {
if (sessionData.currentChat) {
const chat = chats.get(sessionData.currentChat)
if (chat) {
let msg = { sender: sessionData.username, text: body, translations: [] }
chat.messages.push(msg)
for (let sessionId of chat.participants) {
if (sessionId !== ws.sessionId) {
const targetLang = sessions.get(sessionId)?.language || 'en'
if (targetLang !== sessionData.language) {
const translation = await translateText(
body,
sessionData.language,
targetLang
)
msg.translations.push({ language: targetLang, text: translation })
const participantSocket = Array.from(wss.clients).find(
client => client.sessionId === sessionId
)
if (
participantSocket &&
participantSocket.readyState === WebSocket.OPEN
) {
participantSocket.send(
JSON.stringify({
type: 'text',
text: `${sessionData.username}: ${translation}`
})
)
const audioBuffer = await generateSpeech(translation)
participantSocket.send(
JSON.stringify({
type: 'audio',
audio: audioBuffer.toString('base64')
})
)
}
} else {
const participantSocket = Array.from(wss.clients).find(
client => client.sessionId === sessionId
)
if (
participantSocket &&
participantSocket.readyState === WebSocket.OPEN
) {
participantSocket.send(
JSON.stringify({
type: 'text',
text: `${sessionData.username}: ${body}`
})
)
participantSocket.send(
JSON.stringify({
type: 'audio',
audio: formData.toString('base64')
})
)
}
}
}
}
}
}
}
async function translateText (originalText, originalLanguage, targetLanguage) {
const prompt = `Translate this text from ${originalLanguage} to ${targetLanguage}: ${originalText}`
const response = await groq.chat.completions.create({
messages: [
{
role: 'system',
content: `You are translating voice transcriptions from '${originalLanguage}' to '${targetLanguage}'. Reply with just the translation.`
},
{
role: 'user',
content: originalText
}
],
model: 'llama3-8b-8192'
})
return response.choices[0]?.message?.content || ''
}
async function generateSpeech (text) {
const mp3 = await openai.audio.speech.create({
model: 'tts-1',
voice: 'alloy',
input: text
})
return Buffer.from(await mp3.arrayBuffer())
}
// HTTP Server
app.get('/', (req, res) => {
res.sendFile(path.join(__dirname, 'chat-client.html'))
})
app.get('/audio.js', (req, res) => {
res.sendFile(path.join(__dirname, 'audio.js'))
})
app.post('/log', (req, res) => {
console.log(`[LOG ${new Date().toISOString()}] ${req.body.message}`)
res.status(200).send('OK')
})
app.get('/wsurl', (req, res) => {
// if(process.env.PUBLIC_HOSTNAME){
// process.env.WS_URL = `wss://${process.env.PUBLIC_HOSTNAME}`
// }
console.log('Request for WS URL resolved with:', process.env.WS_URL)
res.status(200).send(process.env.WS_URL)
})
app.get('/settings', async (req, res) => {
if (req.query.language) {
language = req.query.language
await storage.setItem('language', language)
}
if (req.query.storeRecordings) {
storeRecordings = req.query.storeRecordings
await storage.setItem('storeRecordings', storeRecordings)
}
res.status(200).send({ language, storeRecordings })
})
app.post('/settings', async (req, res) => {
const { sessionId, language, storeRecordings, task } = req.body
const sessionData = sessions.get(sessionId)
if (language) sessionData.language = language
if (storeRecordings !== undefined)
sessionData.storeRecordings = storeRecordings
if (task) sessionData.task = task
res.status(200).send('OK')
})
app.post('/upload', (req, res) => {
const timestamp = Date.now()
console.log('Received audio data:', timestamp)
fs.mkdir('rec', { recursive: true }, err => {
if (err) return res.status(500).send('ERROR')
const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`)
req.pipe(file)
file.on('finish', () => res.status(200).send('OK'))
})
})
app.get('/chats', (req, res) => {
const { username } = req.query
const userChats = Array.from(chats.values()).filter(chat =>
chat.participants.includes(username)
)
res.status(200).send({ chats: userChats })
})
app.listen(PORT_HTTP, () => {
console.log(`Server listening on port ${PORT_HTTP}`)
console.log(process.env.TTS_BACKEND_URL)
})
// Helper to wrap request in a promise
function requestPromise (options) {
return new Promise((resolve, reject) => {
request(options, (error, response, body) => {
if (error) return reject(error)
resolve(body)
})
})
}