use memory stream and save to disk async in background (broken)

This commit is contained in:
Dobromir Popov 2024-09-10 12:16:51 +03:00
parent 4974af0678
commit 54b0f1661b

View File

@ -11,10 +11,10 @@ import pystray
from pystray import MenuItem as item
from PIL import Image
import ctypes
import io
import time
import json5
# Load configuration from config.json
def load_config():
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
@ -35,44 +35,65 @@ AUTO_START_PATH = os.path.expanduser(r"~\AppData\Roaming\Microsoft\Windows\Start
# Initialize the Groq client
client = Groq(api_key=API_KEY)
def record_audio(filename):
"""Records audio when key and mouse button is pressed."""
def save_audio_to_disk(filename, audio_data, audio_format, channels, rate):
"""Save the audio data to disk asynchronously."""
with wave.open(filename, 'wb') as wave_file:
wave_file.setnchannels(channels)
wave_file.setsampwidth(audio_format)
wave_file.setframerate(rate)
wave_file.writeframes(audio_data)
def record_audio():
"""Records audio when the key and mouse button is pressed, stores in memory."""
audio = pyaudio.PyAudio()
stream = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=1024)
frames = []
print("Recording...")
# Record while button or mouse is pressed
start_time = time.time()
# Record while both keyboard and mouse button are pressed
while keyboard.is_pressed(KB_KEY) and mouse.is_pressed(button=MOUSE_BTN):
data = stream.read(1024)
frames.append(data)
print("Recording stopped.")
recording_duration = len(frames) * 1024 / 16000 # Calculate audio duration in seconds
print(f"Recording stopped. Duration: {recording_duration:.2f} seconds.")
stream.stop_stream()
stream.close()
audio.terminate()
# Save the recorded audio
wave_file = wave.open(filename, 'wb')
wave_file.setnchannels(1)
wave_file.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
wave_file.setframerate(44100)
wave_file.writeframes(b''.join(frames))
wave_file.close()
# Store recorded audio in an in-memory stream
audio_data = b''.join(frames)
memory_stream = io.BytesIO(audio_data)
# Save audio to disk asynchronously as a side task
threading.Thread(target=save_audio_to_disk, args=("output.wav", audio_data, audio.get_sample_size(pyaudio.paInt16), 1, 16000)).start()
return memory_stream
def transcribe_audio(filename):
def transcribe_audio(memory_stream):
"""Transcribes the recorded audio using the Groq Whisper model."""
with open(filename, "rb") as file:
transcription = client.audio.transcriptions.create(
file=(filename, file.read()),
model=MODEL, #"distil-whisper-large-v3-en",
prompt="Specify context or spelling",
language=config['language'],
response_format="json",
temperature=0.0
)
return transcription.text
memory_stream.seek(0) # Reset the stream position to the beginning
start_time = time.time()
transcription = client.audio.transcriptions.create(
file=('audio.wav', memory_stream),
model=MODEL,
prompt="Specify context or spelling",
language=config['language'],
response_format="json",
temperature=0.0
)
end_time = time.time()
transcription_time = end_time - start_time
print(f"Transcription took: {transcription_time:.2f} seconds.")
return transcription.text
def simulate_keypress(text):
"""Simulates typing of transcribed text quickly."""
@ -110,6 +131,9 @@ def setup_tray_icon():
response_times = []
ma_window_size = 10 # Moving average over the last 10 responses
def main_loop():
"""Continuously listen for key or mouse press and transcribe audio."""
filename = "output.wav"
@ -122,11 +146,11 @@ def main_loop():
time.sleep(0.1) # Small sleep to avoid busy-waiting
# Record audio
record_audio(filename)
memory_stream = record_audio()
# Transcribe audio
print("Transcribing audio...")
transcribed_text = transcribe_audio(filename)
transcribed_text = transcribe_audio(memory_stream)
# Simulate typing the transcribed text
print("Typing transcribed text...")