Files

221 lines
8.8 KiB
Python

from colorama import init, Fore, Style
from scipy.signal import butter, filtfilt, resample_poly
import pyaudio
import logging
DESIRED_RATE = 16000
CHUNK_SIZE = 1024
AUDIO_FORMAT = pyaudio.paInt16
CHANNELS = 1
class AudioInput:
def __init__(
self,
input_device_index: int = None,
debug_mode: bool = False,
target_samplerate: int = DESIRED_RATE,
chunk_size: int = CHUNK_SIZE,
audio_format: int = AUDIO_FORMAT,
channels: int = CHANNELS,
resample_to_target: bool = True,
):
self.input_device_index = input_device_index
self.debug_mode = debug_mode
self.audio_interface = None
self.stream = None
self.device_sample_rate = None
self.target_samplerate = target_samplerate
self.chunk_size = chunk_size
self.audio_format = audio_format
self.channels = channels
self.resample_to_target = resample_to_target
def get_supported_sample_rates(self, device_index):
"""Test which standard sample rates are supported by the specified device."""
standard_rates = [8000, 9600, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000]
supported_rates = []
device_info = self.audio_interface.get_device_info_by_index(device_index)
max_channels = device_info.get('maxInputChannels') # Changed from maxOutputChannels
for rate in standard_rates:
try:
if self.audio_interface.is_format_supported(
rate,
input_device=device_index, # Changed to input_device
input_channels=max_channels, # Changed to input_channels
input_format=self.audio_format, # Changed to input_format
):
supported_rates.append(rate)
except:
continue
return supported_rates
def _get_best_sample_rate(self, actual_device_index, desired_rate):
"""Determines the best available sample rate for the device."""
try:
device_info = self.audio_interface.get_device_info_by_index(actual_device_index)
supported_rates = self.get_supported_sample_rates(actual_device_index)
if desired_rate in supported_rates:
return desired_rate
return max(supported_rates)
# lower_rates = [r for r in supported_rates if r <= desired_rate]
# if lower_rates:
# return max(lower_rates)
# higher_rates = [r for r in supported_rates if r > desired_rate]
# if higher_rates:
# return min(higher_rates)
return int(device_info.get('defaultSampleRate', 44100))
except Exception as e:
logging.warning(f"Error determining sample rate: {e}")
return 44100 # Safe fallback
def list_devices(self):
"""List all available audio input devices with supported sample rates."""
try:
init() # Initialize colorama
self.audio_interface = pyaudio.PyAudio()
device_count = self.audio_interface.get_device_count()
print(f"Available audio input devices:")
#print(f"{Fore.LIGHTBLUE_EX}Available audio input devices:{Style.RESET_ALL}")
for i in range(device_count):
device_info = self.audio_interface.get_device_info_by_index(i)
device_name = device_info.get('name')
max_input_channels = device_info.get('maxInputChannels', 0)
if max_input_channels > 0: # Only consider devices with input capabilities
supported_rates = self.get_supported_sample_rates(i)
print(f"{Fore.LIGHTGREEN_EX}Device {Style.RESET_ALL}{i}{Fore.LIGHTGREEN_EX}: {device_name}{Style.RESET_ALL}")
# Format each rate in cyan
if supported_rates:
rates_formatted = ", ".join([f"{Fore.CYAN}{rate}{Style.RESET_ALL}" for rate in supported_rates])
print(f" {Fore.YELLOW}Supported sample rates: {rates_formatted}{Style.RESET_ALL}")
else:
print(f" {Fore.YELLOW}Supported sample rates: None{Style.RESET_ALL}")
except Exception as e:
print(f"Error listing devices: {e}")
finally:
if self.audio_interface:
self.audio_interface.terminate()
def setup(self):
"""Initialize audio interface and open stream"""
try:
self.audio_interface = pyaudio.PyAudio()
if self.debug_mode:
print(f"Input device index: {self.input_device_index}")
actual_device_index = (self.input_device_index if self.input_device_index is not None
else self.audio_interface.get_default_input_device_info()['index'])
if self.debug_mode:
print(f"Actual selected device index: {actual_device_index}")
self.input_device_index = actual_device_index
self.device_sample_rate = self._get_best_sample_rate(actual_device_index, self.target_samplerate)
if self.debug_mode:
print(f"Setting up audio on device {self.input_device_index} with sample rate {self.device_sample_rate}")
try:
self.stream = self.audio_interface.open(
format=self.audio_format,
channels=self.channels,
rate=self.device_sample_rate,
input=True,
frames_per_buffer=self.chunk_size,
input_device_index=self.input_device_index,
)
if self.debug_mode:
print(f"Audio recording initialized successfully at {self.device_sample_rate} Hz")
return True
except Exception as e:
print(f"Failed to initialize audio stream at {self.device_sample_rate} Hz: {e}")
return False
except Exception as e:
print(f"Error initializing audio recording: {e}")
if self.audio_interface:
self.audio_interface.terminate()
return False
def lowpass_filter(self, signal, cutoff_freq, sample_rate):
"""
Apply a low-pass Butterworth filter to prevent aliasing in the signal.
Args:
signal (np.ndarray): Input audio signal to filter
cutoff_freq (float): Cutoff frequency in Hz
sample_rate (float): Sampling rate of the input signal in Hz
Returns:
np.ndarray: Filtered audio signal
Notes:
- Uses a 5th order Butterworth filter
- Applies zero-phase filtering using filtfilt
"""
# Calculate the Nyquist frequency (half the sample rate)
nyquist_rate = sample_rate / 2.0
# Normalize cutoff frequency to Nyquist rate (required by butter())
normal_cutoff = cutoff_freq / nyquist_rate
# Design the Butterworth filter
b, a = butter(5, normal_cutoff, btype='low', analog=False)
# Apply zero-phase filtering (forward and backward)
filtered_signal = filtfilt(b, a, signal)
return filtered_signal
def resample_audio(self, pcm_data, target_sample_rate, original_sample_rate):
"""
Filter and resample audio data to a target sample rate.
Args:
pcm_data (np.ndarray): Input audio data
target_sample_rate (int): Desired output sample rate in Hz
original_sample_rate (int): Original sample rate of input in Hz
Returns:
np.ndarray: Resampled audio data
Notes:
- Applies anti-aliasing filter before resampling
- Uses polyphase filtering for high-quality resampling
"""
if target_sample_rate < original_sample_rate:
# Downsampling with low-pass filter
pcm_filtered = self.lowpass_filter(pcm_data, target_sample_rate / 2, original_sample_rate)
resampled = resample_poly(pcm_filtered, target_sample_rate, original_sample_rate)
else:
# Upsampling without low-pass filter
resampled = resample_poly(pcm_data, target_sample_rate, original_sample_rate)
return resampled
def read_chunk(self):
"""Read a chunk of audio data"""
return self.stream.read(self.chunk_size, exception_on_overflow=False)
def cleanup(self):
"""Clean up audio resources"""
try:
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.audio_interface:
self.audio_interface.terminate()
self.audio_interface = None
except Exception as e:
print(f"Error cleaning up audio resources: {e}")