gogo2/web/chat-server.js
2024-06-11 22:53:39 +03:00

398 lines
15 KiB
JavaScript

const express = require('express');
const bodyParser = require('body-parser');
const WebSocket = require('ws');
const storage = require('node-persist');
const request = require('request');
const fs = require('fs');
const path = require('path');
const dotenv = require('dotenv');
const ollama = require('ollama');
const axios = require('axios');
if (dotenv) {
const envFile = process.env.NODE_ENV === 'development' ? '.env.development' : '.env';
dotenv.config({ path: envFile });
}
const app = express();
app.use(bodyParser.json());
const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000;
const PORT_WS = process.env.SERVER_PORT_WS || 8080;
const TTS_API_URL = process.env.TTS_API_URL;
const LNN_API_URL = process.env.LNN_API_URL;
let language = "en";
let storeRecordings = false;
let queueCounter = 0;
const sessions = new Map();
const chats = new Map(); // Store chat rooms
storage.init().then(() => {
storage.getItem('language').then((value) => {
if (value !== undefined) language = value;
else storage.setItem('language', language);
});
storage.getItem('storeRecordings').then((value) => {
if (value !== undefined) storeRecordings = value;
else storage.setItem('storeRecordings', storeRecordings);
});
// Load existing chats, sessions from storage
storage.getItem('chats').then((value) => {
if (value !== undefined) {
value.forEach(chat => chats.set(chat.id, chat));
}
});
storage.getItem('sessions').then((value) => {
if (value !== undefined) {
value.forEach(session => sessions.set(session.sessionId, session));
}
});
});
// WebSocket Server
const wss = new WebSocket.Server({ port: PORT_WS });
wss.on('connection', (ws) => {
ws.on('message', (message) => {
let isJson = false;
let data = null;
// Check if message is JSON
try {
data = JSON.parse(message);
isJson = true;
}
catch (e) {
}
if (isJson) {
// Handle JSON messages
try {
console.log('Received message:', data.type);
switch (data.type) {
case 'sessionId':
ws.sessionId = Math.random().toString(36).substring(2);
sessions.set(ws.sessionId, { language: 'en' });
//store new session in storage sessions
storage.getItem('sessions').then((value) => {
if (value !== undefined) {
value.push({ sessionId: ws.sessionId, language: 'en' });
storage.setItem('sessions', value);
} else {
storage.setItem('sessions', [{ sessionId: ws.sessionId, language: 'en' }]);
}
}
);
break;
case 'join':
const { username, language } = data;
sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language });
ws.send(JSON.stringify({ type: 'sessionId', sessionId: ws.sessionId, language, storeRecordings }));
const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId));
ws.send(JSON.stringify({ type: 'chats', chats: userChats }));
broadcastUserList();
break;
case 'startChat':
const { users: chatUsers } = data;
const chatId = Math.random().toString(36).substring(2);
let participants = [ws.sessionId, ...chatUsers];
// Deduplicate participants
participants = [...new Set(participants)];
chats.set(chatId, { participants, messages: [] });
//store new chat in storage chats
storage.getItem('chats').then((value) => {
if (value !== undefined) {
value.push({ id: chatId, participants });
storage.setItem('chats', value);
} else {
storage.setItem('chats', [{ id: chatId, participants }]);
}
});
const userNames = participants.map(sessionId => {
const user = sessions.get(sessionId);
return user ? `${user.username}(${user.sessionId})` : 'Unknown User';
});
console.log('Creating chat room. Users:', userNames);
participants.forEach(sessionId => {
const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId);
if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
const userChats = Array.from(chats.entries())
.filter(([id, chat]) => chat.participants.includes(sessionId))
.map(([id, chat]) => ({ id, participants: chat.participants }));
participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats }));
}
});
broadcastUserList();
break;
// case 'audio':
// console.log('(queue ' + queueCounter + ') Received ' + (data.audio.length / 1024 / 1024).toFixed(3) + ' MB audio from client.');
// handleAudioData(ws, data.audiobase64);
// break;
case 'enterChat':
const { chatId: Id } = data;
const enteredChat = chats.get(Id);
const currentSession = sessions.get(ws.sessionId);
currentSession.currentChat = Id;
if (enteredChat && enteredChat.participants.includes(ws.sessionId)) {
ws.send(JSON.stringify({ type: 'chat', chat: enteredChat }));
}
break;
case 'reconnect':
const userSession = sessions.get(data.sessionId);
if (userSession) {
sessions.set(ws.sessionId, userSession);
ws.sessionId = data.sessionId;
const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId));
ws.send(JSON.stringify({ type: 'chats', chats: userChats }));
}
else {
console.log('Session not found:', data.sessionId);
}
broadcastUserList();
break;
default:
console.log('Unknown message type:', data.type);
}
} catch (err) {
console.error('Failed to parse message', err);
}
} else {
// Handle binary data
handleAudioData(ws, message);
}
});
ws.on('close', () => {
sessions.delete(ws.sessionId);
broadcastUserList();
});
});
function handleAudioData(ws, data) {
const sessionData = sessions.get(ws.sessionId);
let language = sessionData.language || 'en';
let task = sessionData.task || 'transcribe';
const formData = {
task,
language,
vad_filter: 'true',
output: 'json',
audio_file: {
value: data,
options: { filename: 'audio.ogg', contentType: 'audio/ogg' }
}
};
if (language === 'auto' || language === '') {
detectLanguage(ws, formData);
} else {
transcribeAudio(ws, formData, sessionData);
}
}
function detectLanguage(ws, formData) {
request.post({ url: TTS_API_URL.replace('/asr', '/detect-language'), formData }, (err, httpResponse, body) => {
if (err) return console.error('Language detection failed:', err);
const result = JSON.parse(body);
if (result && result.language_code) {
const language = result.language_code;
const sessionData = sessions.get(ws.sessionId);
sessionData.language = language;
ws.send(JSON.stringify({ type: 'languageDetected', languageDetected: result.detected_language }));
transcribeAudio(ws, formData, sessionData);
}
});
}
async function translateText(originalText, originalLanguage, targetLanguage) {
return queryLLMAxios("translate this text from " + originalLanguage + " to " + targetLanguage + ": " + originalText)
.then(response => {
console.log('Translation response:', response);
return response;
});
}
async function queryLLM(prompt) {
const requestData = {
model: 'qwen2', // ollama3
prompt: prompt,
system: "you provide translations to the text transcribed from audio. The text is in a language you understand, and you can provide translations to any language you know.",
//format: "json"
};
const ola = new ollama.Ollama({ host: LNN_API_URL })
await ola.generate(requestData)
}
///obsolete function
async function queryLLMAxios(prompt) {
const requestData = {
model: 'qwen2',
prompt: prompt,
"system": "talk like a pirate",
"stream": false
};
try {
const response = await axios.post(LNN_API_URL + "/api/generate", requestData, {
headers: {
// 'Authorization': `Bearer ${OLLAMA_API_KEY}`,
'Content-Type': 'application/json'
}
});
return response.data;
} catch (error) {
console.error('Error calling Ollama API:', error.response ? error.response.data : error.message);
throw error;
}
}
function transcribeAudio(ws, formData, sessionData) {
const start = new Date().getTime();
queueCounter++;
request.post({ url: TTS_API_URL, formData }, (err, httpResponse, body) => {
queueCounter--;
if (err) return console.error('Transcription failed:', err);
const duration = new Date().getTime() - start;
ws.send(JSON.stringify({
type: 'text',
queueCounter,
duration,
language: sessionData.language,
text: body
}));
if (sessionData.currentChat) {
const chat = chats.get(sessionData.currentChat);
if (chat) {
let msg = { sender: sessionData.username, text: body, translations: [] };
// init messages array if it doesn't exist
if (!chat.messages) chat.messages = [];
chat.messages.push(msg);
chat.participants.forEach(sessionId => {
if (sessionId !== ws.sessionId) {
let targetLang = sessions.get(sessionId)?.language || 'en';
targetLang = "bg";
if (targetLang !== sessionData.language) {
console.log('Translating message "'+body+'" from ' + sessionData.language + ' to ' + targetLang);
translateText(body, sessionData.language, targetLang)
.then(translation => {
const jsonResp = JSON.parse(translation);
msg.translations.push({ language: targetLang, text: jsonResp.response });
const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId);
if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + jsonResp.response + "\n" }));
}
});
}
else {
const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId);
if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + body + "\n" }));
}
}
}
});
}
}
});
if (storeRecordings) {
const timestamp = Date.now();
fs.mkdir('rec', { recursive: true }, (err) => {
if (err) throw err;
});
fs.writeFile(`rec/audio${timestamp}.ogg`, formData.audio_file.value, (err) => {
if (err) console.log(err);
else console.log('Audio data saved to rec/audio' + timestamp + '.ogg');
});
}
}
function broadcastUserList() {
const userList = Array.from(sessions.values()).map(user => ({ username: user.username, sessionId: user.sessionId, currentChat: user.currentChat, language: user.language }));
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type: 'userList', users: userList }));
}
});
}
// HTTP Server
app.get('/', (req, res) => {
res.sendFile(path.join(__dirname, 'chat-client.html'));
});
app.get('/audio.js', (req, res) => {
res.sendFile(path.join(__dirname, 'audio.js'));
});
app.post('/log', (req, res) => {
console.log(`[LOG ${new Date().toISOString()}] ${req.body.message}`);
res.status(200).send('OK');
});
app.get('/wsurl', (req, res) => {
res.status(200).send(process.env.WS_URL);
});
app.get('/settings', (req, res) => {
if (req.query.language) {
language = req.query.language;
storage.setItem('language', language);
}
if (req.query.storeRecordings) {
storeRecordings = req.query.storeRecordings;
storage.setItem('storeRecordings', storeRecordings);
}
res.status(200).send({ language, storeRecordings });
});
app.post('/settings', (req, res) => {
const { sessionId, language, storeRecordings, task } = req.body;
const sessionData = sessions.get(sessionId);
if (language) sessionData.language = language;
if (storeRecordings) sessionData.storeRecordings = storeRecordings;
if (task) sessionData.task = task;
res.status(200).send('OK');
});
app.post('/upload', (req, res) => {
const timestamp = Date.now();
console.log('Received audio data:', timestamp);
fs.mkdir('rec', { recursive: true }, (err) => {
if (err) return res.status(500).send('ERROR');
const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`);
req.pipe(file);
file.on('finish', () => res.status(200).send('OK'));
});
});
app.get('/chats', (req, res) => {
const { username } = req.query;
const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(username));
res.status(200).send({ chats: userChats });
});
app.listen(PORT_HTTP, () => {
console.log(`Server listening on port ${PORT_HTTP}`);
});