gogo2/agent-mAId/main.py
2024-09-10 15:08:01 +03:00

268 lines
9.1 KiB
Python

import os
import sys
import pyaudio
import wave
import pyautogui
import keyboard
import mouse
import threading
from groq import Groq
import pystray
from pystray import MenuItem as item
from PIL import Image
import ctypes
import io
import time
import json5
import wave
import pyperclip
import argparse
import atexit
# # Load configuration from config.json
DEFAULT_CONFIG = {
"api_key": "xxx",
"kb_key": "ctrl",
"mouse_btn": "left",
"model": "distil-whisper-large-v3-en",
"language": "en", # whisper-large-v3 or distil-whisper-large-v3-en
"action": "type" # type, copy
}
def parse_args():
"""Parse command line arguments for config file."""
parser = argparse.ArgumentParser(description='Run the AI transcription app.')
parser.add_argument(
'--config', type=str, help='Path to config file', default=None
)
return parser.parse_args()
def load_config(config_path=None):
"""Load the configuration file, adjusting for PyInstaller's temp path when bundled."""
config = DEFAULT_CONFIG.copy() # Start with default configuration
try:
if config_path is None:
# Determine if the script is running as a PyInstaller bundle
if getattr(sys, 'frozen', False):
# If running in a bundle, use the temp path where PyInstaller extracts files
config_path = os.path.join(sys._MEIPASS, 'config.json')
else:
# If running in development (normal execution), use the local directory
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
print(f'Trying to load config from: {config_path}')
with open(config_path, 'r') as config_file:
loaded_config = json5.load(config_file)
# Update the default config with any values from config.json
config.update(loaded_config)
except FileNotFoundError as ex:
print("Config file not found, using defaults." + ex.strerror)
raise ex
except json5.JSONDecodeError as ex:
print("Error decoding config file, using defaults." + ex.msg)
except Exception as e:
print(f"Unexpected error while loading config: {e}, using defaults.")
return config
# Load the config
# config = load_config()
# Parse command line arguments
args = parse_args()
# Load the config from the specified path or default location
config = load_config(args.config)
# Extract API key and button from the config file
API_KEY = config['api_key']
KB_KEY = config['kb_key']
MOUSE_BTN = config['mouse_btn']
MODEL = config['model']
POST_TRANSCRIBE = config['action']
# Constants
AUTO_START_PATH = os.path.expanduser(r"~\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup") # For autostart
# Initialize the Groq client
client = Groq(api_key=API_KEY)
def save_audio_to_disk(filename, audio_data, audio_format, channels, rate):
"""Save the audio data to disk asynchronously."""
with wave.open(filename, 'wb') as wave_file:
wave_file.setnchannels(channels)
wave_file.setsampwidth(audio_format)
wave_file.setframerate(rate)
wave_file.writeframes(audio_data)
def record_audio():
"""Records audio when the key and mouse button is pressed, stores in memory."""
audio = pyaudio.PyAudio()
stream = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=1024)
frames = []
print("Recording...")
# Record while both keyboard and mouse button are pressed
while keyboard.is_pressed(KB_KEY) and mouse.is_pressed(button=MOUSE_BTN):
data = stream.read(1024)
frames.append(data)
recording_duration = len(frames) * 1024 / 16000 # Calculate audio duration in seconds
print(f"Recording stopped. Duration: {recording_duration:.2f} seconds.")
stream.stop_stream()
stream.close()
audio.terminate()
# Store the recorded audio in an in-memory stream as a valid WAV file
memory_stream = io.BytesIO()
with wave.open(memory_stream, 'wb') as wave_file:
wave_file.setnchannels(1)
wave_file.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
wave_file.setframerate(16000)
wave_file.writeframes(b''.join(frames))
memory_stream.seek(0) # Reset the stream position to the beginning for reading
# Save audio to disk asynchronously as a side task (optional)
threading.Thread(target=save_audio_to_disk, args=("output.wav", b''.join(frames), audio.get_sample_size(pyaudio.paInt16), 1, 16000)).start()
return memory_stream
def transcribe_audio(memory_stream):
"""Transcribes the recorded audio using the Groq Whisper model."""
memory_stream.seek(0) # Reset the stream position to the beginning
start_time = time.time()
transcription = client.audio.transcriptions.create(
file=('audio.wav', memory_stream),
model=MODEL,
prompt="Transcribe the following audio",
language=config['language'],
response_format="json",
temperature=0.0
)
end_time = time.time()
transcription_time = end_time - start_time
print(f"Transcription took: {transcription_time:.2f} seconds. Result: {transcription.text}")
log_transcription_time(transcription_time)
return transcription.text
def simulate_keypress(text):
"""Simulates typing of transcribed text quickly."""
pyautogui.typewrite(text, interval=0.01) # Reduce interval between characters for faster typing
# pyautogui.press('enter')
def add_to_autostart():
"""Registers the app to autostart on login."""
script_path = os.path.abspath(__file__)
shortcut_path = os.path.join(AUTO_START_PATH, "mAId.lnk")
# Use ctypes to create the shortcut (this is Windows specific)
shell = ctypes.windll.shell32
shell.ShellExecuteW(None, "runas", "cmd.exe", f'/C mklink "{shortcut_path}" "{script_path}"', None, 1)
print("App added to autostart.")
icon = None # Global variable to store the tray icon object
def cleanup_and_exit():
"""Clean up the tray icon and exit the application."""
global icon
if icon:
print("Stopping and removing tray icon...")
icon.stop() # Stop the tray icon to remove it from the tray
sys.exit()
def setup_tray_icon():
global icon
"""Setup system tray icon and menu."""
if getattr(sys, 'frozen', False):
# If running as a bundle, use the temp path where PyInstaller extracts files
icon_path = os.path.join(sys._MEIPASS, 'mic.webp')
else:
# If running in development (normal execution), use the local directory
icon_path = os.path.join(os.path.dirname(__file__), 'mic.webp')
try:
# Load the tray icon
icon_image = Image.open(icon_path)
except FileNotFoundError:
print(f"Icon file not found at {icon_path}")
icon_image = Image.new('RGB', (64, 64), color=(255, 0, 0)) # Red icon as an example
return
menu = (
item('Register to Autostart', add_to_autostart),
item('Exit', lambda: quit_app(icon))
)
icon = pystray.Icon("mAId", icon_image, menu=pystray.Menu(*menu))
icon.run()
# Ensure the tray icon is removed when the app exits
atexit.register(cleanup_and_exit)
response_times = []
ma_window_size = 10 # Moving average over the last 10 responses
def log_transcription_time(transcription_time):
"""Logs the transcription time and updates the moving average."""
global response_times
# Add the transcription time to the list
response_times.append(transcription_time)
# If the number of logged times exceeds the window size, remove the oldest entry
if len(response_times) > ma_window_size:
response_times.pop(0)
# Calculate and print the moving average
moving_average = sum(response_times) / len(response_times)
print(f"Moving Average of Transcription Time (last {ma_window_size} responses): {moving_average:.2f} seconds.")
def main_loop():
"""Continuously listen for key or mouse press and transcribe audio."""
filename = "output.wav"
while True:
print("Waiting for key and mouse press...")
# Wait for KB_KEY or mouse press
while not (keyboard.is_pressed(KB_KEY) and mouse.is_pressed(button=MOUSE_BTN)):
time.sleep(0.1) # Small sleep to avoid busy-waiting
# Record audio
memory_stream = record_audio()
# Transcribe audio
print("Transcribing audio...")
transcribed_text = transcribe_audio(memory_stream)
if POST_TRANSCRIBE == "type":
# Simulate typing the transcribed text
print("Typing transcribed text...")
simulate_keypress(transcribed_text)
elif POST_TRANSCRIBE == "copy":
# Copy the transcribed text to clipboard
pyperclip.copy(transcribed_text)
print("Transcribed text copied to clipboard.")
if __name__ == "__main__":
# Start the tray icon in a separate thread so it doesn't block the main functionality
tray_thread = threading.Thread(target=setup_tray_icon)
tray_thread.daemon = True
tray_thread.start()
# Run the main loop that listens for key or mouse presses in the background
main_loop()