gpt refactoring

This commit is contained in:
Dobromir Popov 2024-06-12 14:59:53 +03:00
parent 4627f16284
commit ad91e0f3d4

View File

@ -8,23 +8,21 @@ const path = require('path');
const dotenv = require('dotenv'); const dotenv = require('dotenv');
const ollama = require('ollama'); const ollama = require('ollama');
const axios = require('axios'); const axios = require('axios');
// import OpenAI from "openai";
const OpenAI = require('openai'); const OpenAI = require('openai');
const openai = new OpenAI({ apiKey: "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" });
const Groq = require('groq-sdk'); const Groq = require('groq-sdk');
//const LLM = require("@themaximalist/llm.js"); //https://www.npmjs.com/package/@themaximalist/llm.js
// Load environment variables
dotenv.config({ path: `.env${process.env.NODE_ENV === 'development' ? '.development' : ''}` });
// Initialize services
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const groq = new Groq({ apiKey: process.env.GROQ_API_KEY }); const groq = new Groq({ apiKey: process.env.GROQ_API_KEY });
// Express setup
if (dotenv) {
const envFile = process.env.NODE_ENV === 'development' ? '.env.development' : '.env';
dotenv.config({ path: envFile });
}
const app = express(); const app = express();
app.use(bodyParser.json()); app.use(bodyParser.json());
// Configuration constants
const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000; const PORT_HTTP = process.env.SERVER_PORT_HTTP || 3000;
const PORT_WS = process.env.SERVER_PORT_WS || 8080; const PORT_WS = process.env.SERVER_PORT_WS || 8080;
const TTS_API_URL = process.env.TTS_API_URL; const TTS_API_URL = process.env.TTS_API_URL;
@ -38,68 +36,75 @@ let queueCounter = 0;
const sessions = new Map(); const sessions = new Map();
const chats = new Map(); // Store chat rooms const chats = new Map(); // Store chat rooms
storage.init().then(() => { // Initialize storage and load initial values
storage.getItem('language').then((value) => { async function initStorage() {
if (value !== undefined) language = value; await storage.init();
else storage.setItem('language', language); language = await storage.getItem('language') || language;
}); storeRecordings = await storage.getItem('storeRecordings') || storeRecordings;
storage.getItem('storeRecordings').then((value) => {
if (value !== undefined) storeRecordings = value;
else storage.setItem('storeRecordings', storeRecordings);
});
// Load existing chats, sessions from storage const storedChats = await storage.getItem('chats') || [];
storage.getItem('chats').then((value) => { storedChats.forEach(chat => chats.set(chat.id, chat));
if (value !== undefined) {
value.forEach(chat => chats.set(chat.id, chat)); const storedSessions = await storage.getItem('sessions') || [];
storedSessions.forEach(session => sessions.set(session.sessionId, session));
} }
});
storage.getItem('sessions').then((value) => { initStorage();
if (value !== undefined) {
value.forEach(session => sessions.set(session.sessionId, session));
}
});
});
// WebSocket Server // WebSocket Server
const wss = new WebSocket.Server({ port: PORT_WS }); const wss = new WebSocket.Server({ port: PORT_WS });
wss.on('connection', (ws) => { wss.on('connection', ws => {
ws.on('message', async message => handleMessage(ws, message));
ws.on('close', () => handleClose(ws));
});
// Handle WebSocket messages
async function handleMessage(ws, message) {
ws.on('message', (message) => { let data;
let isJson = false;
let data = null;
// Check if message is JSON
try { try {
data = JSON.parse(message); data = JSON.parse(message);
isJson = true; } catch {
} return handleAudioData(ws, message);
catch (e) {
} }
if (isJson) {
// Handle JSON messages
try { try {
console.log('Received message:', data.type);
switch (data.type) { switch (data.type) {
case 'sessionId': case 'sessionId':
ws.sessionId = Math.random().toString(36).substring(2); await handleSessionId(ws);
sessions.set(ws.sessionId, { language: 'en' });
//store new session in storage sessions
storage.getItem('sessions').then((value) => {
if (value !== undefined) {
value.push({ sessionId: ws.sessionId, language: 'en' });
storage.setItem('sessions', value);
} else {
storage.setItem('sessions', [{ sessionId: ws.sessionId, language: 'en' }]);
}
}
);
break; break;
case 'join': case 'join':
const { username, language } = data; await handleJoin(ws, data);
break;
case 'startChat':
await handleStartChat(ws, data);
break;
case 'enterChat':
await handleEnterChat(ws, data);
break;
case 'reconnect':
await handleReconnect(ws, data);
break;
default:
console.log('Unknown message type:', data.type);
}
} catch (err) {
console.error('Failed to handle message', err);
}
}
function handleClose(ws) {
sessions.delete(ws.sessionId);
broadcastUserList();
}
// Handlers for specific message types
async function handleSessionId(ws) {
ws.sessionId = generateSessionId();
sessions.set(ws.sessionId, { language: 'en' });
await storage.setItem('sessions', Array.from(sessions.values()));
}
async function handleJoin(ws, { username, language }) {
sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language }); sessions.set(ws.sessionId, { username, sessionId: ws.sessionId, language });
ws.send(JSON.stringify({ type: 'sessionId', sessionId: ws.sessionId, language, storeRecordings })); ws.send(JSON.stringify({ type: 'sessionId', sessionId: ws.sessionId, language, storeRecordings }));
@ -107,33 +112,67 @@ wss.on('connection', (ws) => {
ws.send(JSON.stringify({ type: 'chats', chats: userChats })); ws.send(JSON.stringify({ type: 'chats', chats: userChats }));
broadcastUserList(); broadcastUserList();
break; }
case 'startChat':
const { users: chatUsers } = data; async function handleStartChat(ws, { users }) {
const chatId = Math.random().toString(36).substring(2); const chatId = generateChatId();
let participants = [ws.sessionId, ...chatUsers]; let participants = [ws.sessionId, ...users];
// Deduplicate participants
participants = [...new Set(participants)]; participants = [...new Set(participants)];
chats.set(chatId, { participants, messages: [] }); chats.set(chatId, { participants, messages: [] });
await storage.setItem('chats', Array.from(chats.values()));
//store new chat in storage chats notifyParticipants(participants);
storage.getItem('chats').then((value) => { broadcastUserList();
if (value !== undefined) { }
value.push({ id: chatId, participants });
storage.setItem('chats', value); async function handleEnterChat(ws, { chatId }) {
const enteredChat = chats.get(chatId);
const currentSession = sessions.get(ws.sessionId);
currentSession.currentChat = chatId;
if (enteredChat && enteredChat.participants.includes(ws.sessionId)) {
ws.send(JSON.stringify({ type: 'chat', chat: enteredChat }));
}
}
async function handleReconnect(ws, { sessionId }) {
const userSession = sessions.get(sessionId);
if (userSession) {
sessions.set(ws.sessionId, userSession);
ws.sessionId = sessionId;
const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId));
ws.send(JSON.stringify({ type: 'chats', chats: userChats }));
} else { } else {
storage.setItem('chats', [{ id: chatId, participants }]); console.log('Session not found:', sessionId);
}
broadcastUserList();
}
// Utility functions
function generateSessionId() {
return Math.random().toString(36).substring(2);
}
function generateChatId() {
return Math.random().toString(36).substring(2);
}
function broadcastUserList() {
const userList = Array.from(sessions.values()).map(user => ({
username: user.username,
sessionId: user.sessionId,
currentChat: user.currentChat,
language: user.language
}));
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type: 'userList', users: userList }));
} }
}); });
}
const userNames = participants.map(sessionId => { function notifyParticipants(participants) {
const user = sessions.get(sessionId);
return user ? `${user.username}(${user.sessionId})` : 'Unknown User';
});
console.log('Creating chat room. Users:', userNames);
participants.forEach(sessionId => { participants.forEach(sessionId => {
const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId);
if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
@ -143,62 +182,14 @@ wss.on('connection', (ws) => {
participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats })); participantSocket.send(JSON.stringify({ type: 'chats', chats: userChats }));
} }
}); });
broadcastUserList();
break;
// case 'audio':
// console.log('(queue ' + queueCounter + ') Received ' + (data.audio.length / 1024 / 1024).toFixed(3) + ' MB audio from client.');
// handleAudioData(ws, data.audiobase64);
// break;
case 'enterChat':
const { chatId: Id } = data;
const enteredChat = chats.get(Id);
const currentSession = sessions.get(ws.sessionId);
currentSession.currentChat = Id;
if (enteredChat && enteredChat.participants.includes(ws.sessionId)) {
ws.send(JSON.stringify({ type: 'chat', chat: enteredChat }));
}
break;
case 'reconnect':
const userSession = sessions.get(data.sessionId);
if (userSession) {
sessions.set(ws.sessionId, userSession);
ws.sessionId = data.sessionId;
const userChats = Array.from(chats.values()).filter(chat => chat.participants.includes(ws.sessionId));
ws.send(JSON.stringify({ type: 'chats', chats: userChats }));
}
else {
console.log('Session not found:', data.sessionId);
} }
broadcastUserList(); async function handleAudioData(ws, data) {
break;
default:
console.log('Unknown message type:', data.type);
}
} catch (err) {
console.error('Failed to parse message', err);
}
} else {
// Handle binary data
handleAudioData(ws, message);
}
});
ws.on('close', () => {
sessions.delete(ws.sessionId);
broadcastUserList();
});
});
function handleAudioData(ws, data) {
const sessionData = sessions.get(ws.sessionId); const sessionData = sessions.get(ws.sessionId);
let language = sessionData.language || 'en'; let { language, task } = sessionData;
let task = sessionData.task || 'transcribe';
const formData = { const formData = {
task, task: task || 'transcribe',
language, language,
vad_filter: 'true', vad_filter: 'true',
output: 'json', output: 'json',
@ -208,98 +199,29 @@ function handleAudioData(ws, data) {
} }
}; };
if (language === 'auto' || language === '') { if (!language || language === 'auto') {
detectLanguage(ws, formData); await detectLanguage(ws, formData);
} else { } else {
transcribeAudio(ws, formData, sessionData); await transcribeAudio(ws, formData, sessionData);
} }
} }
function detectLanguage(ws, formData) { async function detectLanguage(ws, formData) {
request.post({ url: TTS_API_URL.replace('/asr', '/detect-language'), formData }, (err, httpResponse, body) => {
if (err) return console.error('Language detection failed:', err);
const result = JSON.parse(body);
if (result && result.language_code) {
const language = result.language_code;
const sessionData = sessions.get(ws.sessionId);
sessionData.language = language;
ws.send(JSON.stringify({ type: 'languageDetected', languageDetected: result.detected_language }));
transcribeAudio(ws, formData, sessionData);
}
});
}
async function translateText(originalText, originalLanguage, targetLanguage) {
const prompt = "Translate this text from " + originalLanguage + " to " + targetLanguage + ": " + originalText;
// const llm = new LLM();
// llm.system("Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation.");
// let result = await llm.chat(prompt, { service: "groq", model: "mixtral-8x7b-32768" });
// return result;
return groq.chat.completions
.create({
messages: [
{
role: "system",
content: "You are translating voice transcriptions from '" + originalLanguage + "' to '" + targetLanguage + "'. Reply with just the translation. It will be converted to speech using TTS - you can add more context if needed.",
},
{
role: "user",
content: originalText,
},
],
model: "llama3-8b-8192",
})
.then((chatCompletion) => {
let result = chatCompletion.choices[0]?.message?.content || "";
console.log(result);
return { response: result };
});
// return queryLLMAxios("translate this text from " + originalLanguage + " to " + targetLanguage + ": " + originalText)
// .then(response => {
// console.log('Translation response:', response);
// return response;
// });
}
async function queryLLM(prompt) {
const requestData = {
model: LLN_MODEL || 'qwen2', // ollama3
prompt: prompt,
system: "Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation.",
//format: "json"
};
const ola = new ollama.Ollama({ host: LNN_API_URL })
await ola.generate(requestData)
}
///obsolete function
async function queryLLMAxios(prompt) {
const requestData = {
model: LLN_MODEL || 'qwen2',
prompt: prompt,
"system": "Translate voice transcriptions. some words may be omonymous, so please provide the most likely translation.",
"stream": false
};
try { try {
const response = await axios.post(LNN_API_URL, requestData, { const result = await requestPromise({
headers: { method: 'POST',
// 'Authorization': `Bearer ${OLLAMA_API_KEY}`, url: TTS_API_URL.replace('/asr', '/detect-language'),
'Content-Type': 'application/json' formData
}
}); });
return response.data; const { language_code } = JSON.parse(result);
} catch (error) { if (language_code) {
console.error('Error calling Ollama API:', error.response ? error.response.data : error.message); const sessionData = sessions.get(ws.sessionId);
throw error; sessionData.language = language_code;
ws.send(JSON.stringify({ type: 'languageDetected', languageDetected: language_code }));
await transcribeAudio(ws, formData, sessionData);
}
} catch (err) {
console.error('Language detection failed:', err);
} }
} }
@ -307,9 +229,13 @@ async function transcribeAudio(ws, formData, sessionData) {
const start = new Date().getTime(); const start = new Date().getTime();
queueCounter++; queueCounter++;
request.post({ url: TTS_API_URL, formData }, (err, httpResponse, body) => { try {
if(sessionData.language) {
formData.language = sessionData.language;
}
formData.vad_filter = 'true';
const body = await requestPromise({ method: 'POST', url: TTS_API_URL, formData });
queueCounter--; queueCounter--;
if (err) return console.error('Transcription failed:', err);
const duration = new Date().getTime() - start; const duration = new Date().getTime() - start;
ws.send(JSON.stringify({ ws.send(JSON.stringify({
@ -320,83 +246,77 @@ async function transcribeAudio(ws, formData, sessionData) {
text: body text: body
})); }));
await handleChatTranscription(ws, body, sessionData);
} catch (err) {
console.error('Transcription failed:', err);
}
if (storeRecordings) {
const timestamp = Date.now();
fs.mkdir('rec', { recursive: true }, err => {
if (err) console.error(err);
else {
fs.writeFile(`rec/audio${timestamp}.ogg`, formData.audio_file.value, err => {
if (err) console.error(err);
else console.log(`Audio data saved to rec/audio${timestamp}.ogg`);
});
}
});
}
}
async function handleChatTranscription(ws, body, sessionData) {
if (sessionData.currentChat) { if (sessionData.currentChat) {
const chat = chats.get(sessionData.currentChat); const chat = chats.get(sessionData.currentChat);
if (chat) { if (chat) {
let msg = { sender: sessionData.username, text: body, translations: [] }; let msg = { sender: sessionData.username, text: body, translations: [] };
// init messages array if it doesn't exist
if (!chat.messages) chat.messages = [];
chat.messages.push(msg); chat.messages.push(msg);
chat.participants.forEach(sessionId => { for (let sessionId of chat.participants) {
if (sessionId !== ws.sessionId) { if (sessionId !== ws.sessionId) {
let targetLang = sessions.get(sessionId)?.language || 'en'; const targetLang = sessions.get(sessionId)?.language || 'en';
//targetLang = "bg";
if (targetLang !== sessionData.language) { if (targetLang !== sessionData.language) {
console.log('Translating message "' + body + '" from ' + sessionData.language + ' to ' + targetLang); const translation = await translateText(body, sessionData.language, targetLang);
translateText(body, sessionData.language, targetLang) msg.translations.push({ language: targetLang, text: translation });
.then(translation => {
let jsonResp; const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId);
if (typeof translation === 'string') { if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
try { participantSocket.send(JSON.stringify({ type: 'text', text: `${sessionData.username}: ${translation}` }));
jsonResp = JSON.parse(translation); const audioBuffer = await generateSpeech(translation);
} catch (e) { participantSocket.send(JSON.stringify({ type: 'audio', audio: audioBuffer.toString('base64') }));
console.error('Failed to parse translation response:', e);
ws.send(JSON.stringify({ type: 'error', message: 'Invalid translation response' }));
return;
} }
} else { } else {
jsonResp = translation;
}
const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId); const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId);
if (participantSocket && participantSocket.readyState === WebSocket.OPEN) { if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + jsonResp.response + "\n" })); participantSocket.send(JSON.stringify({ type: 'text', text: `${sessionData.username}: ${body}` }));
// Generate and send the speech audio
generateSpeech(jsonResp.response)
.then(audioBuffer => {
console.log('Generated audio for translation:', audioBuffer.length);
msg.translations.push({ language: targetLang, text: jsonResp.response, audio: audioBuffer.toString('base64') });
participantSocket.send(JSON.stringify({ type: 'audio', audio: audioBuffer.toString('base64') }));
});
}
});
}
else {
const participantSocket = Array.from(wss.clients).find(client => client.sessionId === sessionId);
if (participantSocket && participantSocket.readyState === WebSocket.OPEN) {
participantSocket.send(JSON.stringify({ type: 'text', text: sessionData.username + ': ' + body + "\n" }));
participantSocket.send(JSON.stringify({ type: 'audio', audio: formData.toString('base64') })); participantSocket.send(JSON.stringify({ type: 'audio', audio: formData.toString('base64') }));
} }
} }
} }
});
} }
} }
});
if (storeRecordings) {
const timestamp = Date.now();
fs.mkdir('rec', { recursive: true }, (err) => {
if (err) throw err;
});
fs.writeFile(`rec/audio${timestamp}.ogg`, formData.audio_file.value, (err) => {
if (err) console.log(err);
else console.log('Audio data saved to rec/audio' + timestamp + '.ogg');
});
} }
} }
function broadcastUserList() { async function translateText(originalText, originalLanguage, targetLanguage) {
const userList = Array.from(sessions.values()).map(user => ({ username: user.username, sessionId: user.sessionId, currentChat: user.currentChat, language: user.language })); const prompt = `Translate this text from ${originalLanguage} to ${targetLanguage}: ${originalText}`;
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) { const response = await groq.chat.completions.create({
client.send(JSON.stringify({ type: 'userList', users: userList })); messages: [
} {
role: "system",
content: `You are translating voice transcriptions from '${originalLanguage}' to '${targetLanguage}'. Reply with just the translation.`,
},
{
role: "user",
content: originalText,
},
],
model: "llama3-8b-8192",
}); });
return response.choices[0]?.message?.content || "";
} }
async function generateSpeech(text) { async function generateSpeech(text) {
@ -405,8 +325,7 @@ async function generateSpeech(text) {
voice: "alloy", voice: "alloy",
input: text, input: text,
}); });
const buffer = Buffer.from(await mp3.arrayBuffer()); return Buffer.from(await mp3.arrayBuffer());
return buffer;
} }
// HTTP Server // HTTP Server
@ -427,23 +346,23 @@ app.get('/wsurl', (req, res) => {
res.status(200).send(process.env.WS_URL); res.status(200).send(process.env.WS_URL);
}); });
app.get('/settings', (req, res) => { app.get('/settings', async (req, res) => {
if (req.query.language) { if (req.query.language) {
language = req.query.language; language = req.query.language;
storage.setItem('language', language); await storage.setItem('language', language);
} }
if (req.query.storeRecordings) { if (req.query.storeRecordings) {
storeRecordings = req.query.storeRecordings; storeRecordings = req.query.storeRecordings;
storage.setItem('storeRecordings', storeRecordings); await storage.setItem('storeRecordings', storeRecordings);
} }
res.status(200).send({ language, storeRecordings }); res.status(200).send({ language, storeRecordings });
}); });
app.post('/settings', (req, res) => { app.post('/settings', async (req, res) => {
const { sessionId, language, storeRecordings, task } = req.body; const { sessionId, language, storeRecordings, task } = req.body;
const sessionData = sessions.get(sessionId); const sessionData = sessions.get(sessionId);
if (language) sessionData.language = language; if (language) sessionData.language = language;
if (storeRecordings) sessionData.storeRecordings = storeRecordings; if (storeRecordings !== undefined) sessionData.storeRecordings = storeRecordings;
if (task) sessionData.task = task; if (task) sessionData.task = task;
res.status(200).send('OK'); res.status(200).send('OK');
}); });
@ -451,7 +370,7 @@ app.post('/settings', (req, res) => {
app.post('/upload', (req, res) => { app.post('/upload', (req, res) => {
const timestamp = Date.now(); const timestamp = Date.now();
console.log('Received audio data:', timestamp); console.log('Received audio data:', timestamp);
fs.mkdir('rec', { recursive: true }, (err) => { fs.mkdir('rec', { recursive: true }, err => {
if (err) return res.status(500).send('ERROR'); if (err) return res.status(500).send('ERROR');
const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`); const file = fs.createWriteStream(`rec/audio_slice_${timestamp}.ogg`);
req.pipe(file); req.pipe(file);
@ -468,3 +387,13 @@ app.get('/chats', (req, res) => {
app.listen(PORT_HTTP, () => { app.listen(PORT_HTTP, () => {
console.log(`Server listening on port ${PORT_HTTP}`); console.log(`Server listening on port ${PORT_HTTP}`);
}); });
// Helper to wrap request in a promise
function requestPromise(options) {
return new Promise((resolve, reject) => {
request(options, (error, response, body) => {
if (error) return reject(error);
resolve(body);
});
});
}