minimo reconocimiento de voz
This commit is contained in:
220
minimal_server/RealtimeSTT/audio_input.py
Normal file
220
minimal_server/RealtimeSTT/audio_input.py
Normal file
@ -0,0 +1,220 @@
|
||||
from colorama import init, Fore, Style
|
||||
from scipy.signal import butter, filtfilt, resample_poly
|
||||
import pyaudio
|
||||
import logging
|
||||
|
||||
DESIRED_RATE = 16000
|
||||
CHUNK_SIZE = 1024
|
||||
AUDIO_FORMAT = pyaudio.paInt16
|
||||
CHANNELS = 1
|
||||
|
||||
class AudioInput:
|
||||
def __init__(
|
||||
self,
|
||||
input_device_index: int = None,
|
||||
debug_mode: bool = False,
|
||||
target_samplerate: int = DESIRED_RATE,
|
||||
chunk_size: int = CHUNK_SIZE,
|
||||
audio_format: int = AUDIO_FORMAT,
|
||||
channels: int = CHANNELS,
|
||||
resample_to_target: bool = True,
|
||||
):
|
||||
|
||||
self.input_device_index = input_device_index
|
||||
self.debug_mode = debug_mode
|
||||
self.audio_interface = None
|
||||
self.stream = None
|
||||
self.device_sample_rate = None
|
||||
self.target_samplerate = target_samplerate
|
||||
self.chunk_size = chunk_size
|
||||
self.audio_format = audio_format
|
||||
self.channels = channels
|
||||
self.resample_to_target = resample_to_target
|
||||
|
||||
def get_supported_sample_rates(self, device_index):
|
||||
"""Test which standard sample rates are supported by the specified device."""
|
||||
standard_rates = [8000, 9600, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000]
|
||||
supported_rates = []
|
||||
|
||||
device_info = self.audio_interface.get_device_info_by_index(device_index)
|
||||
max_channels = device_info.get('maxInputChannels') # Changed from maxOutputChannels
|
||||
|
||||
for rate in standard_rates:
|
||||
try:
|
||||
if self.audio_interface.is_format_supported(
|
||||
rate,
|
||||
input_device=device_index, # Changed to input_device
|
||||
input_channels=max_channels, # Changed to input_channels
|
||||
input_format=self.audio_format, # Changed to input_format
|
||||
):
|
||||
supported_rates.append(rate)
|
||||
except:
|
||||
continue
|
||||
return supported_rates
|
||||
|
||||
def _get_best_sample_rate(self, actual_device_index, desired_rate):
|
||||
"""Determines the best available sample rate for the device."""
|
||||
try:
|
||||
device_info = self.audio_interface.get_device_info_by_index(actual_device_index)
|
||||
supported_rates = self.get_supported_sample_rates(actual_device_index)
|
||||
|
||||
if desired_rate in supported_rates:
|
||||
return desired_rate
|
||||
|
||||
return max(supported_rates)
|
||||
|
||||
# lower_rates = [r for r in supported_rates if r <= desired_rate]
|
||||
# if lower_rates:
|
||||
# return max(lower_rates)
|
||||
|
||||
# higher_rates = [r for r in supported_rates if r > desired_rate]
|
||||
# if higher_rates:
|
||||
# return min(higher_rates)
|
||||
|
||||
return int(device_info.get('defaultSampleRate', 44100))
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"Error determining sample rate: {e}")
|
||||
return 44100 # Safe fallback
|
||||
|
||||
def list_devices(self):
|
||||
"""List all available audio input devices with supported sample rates."""
|
||||
try:
|
||||
init() # Initialize colorama
|
||||
self.audio_interface = pyaudio.PyAudio()
|
||||
device_count = self.audio_interface.get_device_count()
|
||||
|
||||
print(f"Available audio input devices:")
|
||||
#print(f"{Fore.LIGHTBLUE_EX}Available audio input devices:{Style.RESET_ALL}")
|
||||
for i in range(device_count):
|
||||
device_info = self.audio_interface.get_device_info_by_index(i)
|
||||
device_name = device_info.get('name')
|
||||
max_input_channels = device_info.get('maxInputChannels', 0)
|
||||
|
||||
if max_input_channels > 0: # Only consider devices with input capabilities
|
||||
supported_rates = self.get_supported_sample_rates(i)
|
||||
print(f"{Fore.LIGHTGREEN_EX}Device {Style.RESET_ALL}{i}{Fore.LIGHTGREEN_EX}: {device_name}{Style.RESET_ALL}")
|
||||
|
||||
# Format each rate in cyan
|
||||
if supported_rates:
|
||||
rates_formatted = ", ".join([f"{Fore.CYAN}{rate}{Style.RESET_ALL}" for rate in supported_rates])
|
||||
print(f" {Fore.YELLOW}Supported sample rates: {rates_formatted}{Style.RESET_ALL}")
|
||||
else:
|
||||
print(f" {Fore.YELLOW}Supported sample rates: None{Style.RESET_ALL}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error listing devices: {e}")
|
||||
finally:
|
||||
if self.audio_interface:
|
||||
self.audio_interface.terminate()
|
||||
|
||||
def setup(self):
|
||||
"""Initialize audio interface and open stream"""
|
||||
try:
|
||||
self.audio_interface = pyaudio.PyAudio()
|
||||
|
||||
if self.debug_mode:
|
||||
print(f"Input device index: {self.input_device_index}")
|
||||
actual_device_index = (self.input_device_index if self.input_device_index is not None
|
||||
else self.audio_interface.get_default_input_device_info()['index'])
|
||||
|
||||
if self.debug_mode:
|
||||
print(f"Actual selected device index: {actual_device_index}")
|
||||
self.input_device_index = actual_device_index
|
||||
self.device_sample_rate = self._get_best_sample_rate(actual_device_index, self.target_samplerate)
|
||||
|
||||
if self.debug_mode:
|
||||
print(f"Setting up audio on device {self.input_device_index} with sample rate {self.device_sample_rate}")
|
||||
|
||||
try:
|
||||
self.stream = self.audio_interface.open(
|
||||
format=self.audio_format,
|
||||
channels=self.channels,
|
||||
rate=self.device_sample_rate,
|
||||
input=True,
|
||||
frames_per_buffer=self.chunk_size,
|
||||
input_device_index=self.input_device_index,
|
||||
)
|
||||
if self.debug_mode:
|
||||
print(f"Audio recording initialized successfully at {self.device_sample_rate} Hz")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Failed to initialize audio stream at {self.device_sample_rate} Hz: {e}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error initializing audio recording: {e}")
|
||||
if self.audio_interface:
|
||||
self.audio_interface.terminate()
|
||||
return False
|
||||
|
||||
def lowpass_filter(self, signal, cutoff_freq, sample_rate):
|
||||
"""
|
||||
Apply a low-pass Butterworth filter to prevent aliasing in the signal.
|
||||
|
||||
Args:
|
||||
signal (np.ndarray): Input audio signal to filter
|
||||
cutoff_freq (float): Cutoff frequency in Hz
|
||||
sample_rate (float): Sampling rate of the input signal in Hz
|
||||
|
||||
Returns:
|
||||
np.ndarray: Filtered audio signal
|
||||
|
||||
Notes:
|
||||
- Uses a 5th order Butterworth filter
|
||||
- Applies zero-phase filtering using filtfilt
|
||||
"""
|
||||
# Calculate the Nyquist frequency (half the sample rate)
|
||||
nyquist_rate = sample_rate / 2.0
|
||||
|
||||
# Normalize cutoff frequency to Nyquist rate (required by butter())
|
||||
normal_cutoff = cutoff_freq / nyquist_rate
|
||||
|
||||
# Design the Butterworth filter
|
||||
b, a = butter(5, normal_cutoff, btype='low', analog=False)
|
||||
|
||||
# Apply zero-phase filtering (forward and backward)
|
||||
filtered_signal = filtfilt(b, a, signal)
|
||||
return filtered_signal
|
||||
|
||||
def resample_audio(self, pcm_data, target_sample_rate, original_sample_rate):
|
||||
"""
|
||||
Filter and resample audio data to a target sample rate.
|
||||
|
||||
Args:
|
||||
pcm_data (np.ndarray): Input audio data
|
||||
target_sample_rate (int): Desired output sample rate in Hz
|
||||
original_sample_rate (int): Original sample rate of input in Hz
|
||||
|
||||
Returns:
|
||||
np.ndarray: Resampled audio data
|
||||
|
||||
Notes:
|
||||
- Applies anti-aliasing filter before resampling
|
||||
- Uses polyphase filtering for high-quality resampling
|
||||
"""
|
||||
if target_sample_rate < original_sample_rate:
|
||||
# Downsampling with low-pass filter
|
||||
pcm_filtered = self.lowpass_filter(pcm_data, target_sample_rate / 2, original_sample_rate)
|
||||
resampled = resample_poly(pcm_filtered, target_sample_rate, original_sample_rate)
|
||||
else:
|
||||
# Upsampling without low-pass filter
|
||||
resampled = resample_poly(pcm_data, target_sample_rate, original_sample_rate)
|
||||
return resampled
|
||||
|
||||
def read_chunk(self):
|
||||
"""Read a chunk of audio data"""
|
||||
return self.stream.read(self.chunk_size, exception_on_overflow=False)
|
||||
|
||||
def cleanup(self):
|
||||
"""Clean up audio resources"""
|
||||
try:
|
||||
if self.stream:
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
if self.audio_interface:
|
||||
self.audio_interface.terminate()
|
||||
self.audio_interface = None
|
||||
except Exception as e:
|
||||
print(f"Error cleaning up audio resources: {e}")
|
Reference in New Issue
Block a user