221 lines
8.8 KiB
Python
221 lines
8.8 KiB
Python
from colorama import init, Fore, Style
|
|
from scipy.signal import butter, filtfilt, resample_poly
|
|
import pyaudio
|
|
import logging
|
|
|
|
DESIRED_RATE = 16000
|
|
CHUNK_SIZE = 1024
|
|
AUDIO_FORMAT = pyaudio.paInt16
|
|
CHANNELS = 1
|
|
|
|
class AudioInput:
|
|
def __init__(
|
|
self,
|
|
input_device_index: int = None,
|
|
debug_mode: bool = False,
|
|
target_samplerate: int = DESIRED_RATE,
|
|
chunk_size: int = CHUNK_SIZE,
|
|
audio_format: int = AUDIO_FORMAT,
|
|
channels: int = CHANNELS,
|
|
resample_to_target: bool = True,
|
|
):
|
|
|
|
self.input_device_index = input_device_index
|
|
self.debug_mode = debug_mode
|
|
self.audio_interface = None
|
|
self.stream = None
|
|
self.device_sample_rate = None
|
|
self.target_samplerate = target_samplerate
|
|
self.chunk_size = chunk_size
|
|
self.audio_format = audio_format
|
|
self.channels = channels
|
|
self.resample_to_target = resample_to_target
|
|
|
|
def get_supported_sample_rates(self, device_index):
|
|
"""Test which standard sample rates are supported by the specified device."""
|
|
standard_rates = [8000, 9600, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000]
|
|
supported_rates = []
|
|
|
|
device_info = self.audio_interface.get_device_info_by_index(device_index)
|
|
max_channels = device_info.get('maxInputChannels') # Changed from maxOutputChannels
|
|
|
|
for rate in standard_rates:
|
|
try:
|
|
if self.audio_interface.is_format_supported(
|
|
rate,
|
|
input_device=device_index, # Changed to input_device
|
|
input_channels=max_channels, # Changed to input_channels
|
|
input_format=self.audio_format, # Changed to input_format
|
|
):
|
|
supported_rates.append(rate)
|
|
except:
|
|
continue
|
|
return supported_rates
|
|
|
|
def _get_best_sample_rate(self, actual_device_index, desired_rate):
|
|
"""Determines the best available sample rate for the device."""
|
|
try:
|
|
device_info = self.audio_interface.get_device_info_by_index(actual_device_index)
|
|
supported_rates = self.get_supported_sample_rates(actual_device_index)
|
|
|
|
if desired_rate in supported_rates:
|
|
return desired_rate
|
|
|
|
return max(supported_rates)
|
|
|
|
# lower_rates = [r for r in supported_rates if r <= desired_rate]
|
|
# if lower_rates:
|
|
# return max(lower_rates)
|
|
|
|
# higher_rates = [r for r in supported_rates if r > desired_rate]
|
|
# if higher_rates:
|
|
# return min(higher_rates)
|
|
|
|
return int(device_info.get('defaultSampleRate', 44100))
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Error determining sample rate: {e}")
|
|
return 44100 # Safe fallback
|
|
|
|
def list_devices(self):
|
|
"""List all available audio input devices with supported sample rates."""
|
|
try:
|
|
init() # Initialize colorama
|
|
self.audio_interface = pyaudio.PyAudio()
|
|
device_count = self.audio_interface.get_device_count()
|
|
|
|
print(f"Available audio input devices:")
|
|
#print(f"{Fore.LIGHTBLUE_EX}Available audio input devices:{Style.RESET_ALL}")
|
|
for i in range(device_count):
|
|
device_info = self.audio_interface.get_device_info_by_index(i)
|
|
device_name = device_info.get('name')
|
|
max_input_channels = device_info.get('maxInputChannels', 0)
|
|
|
|
if max_input_channels > 0: # Only consider devices with input capabilities
|
|
supported_rates = self.get_supported_sample_rates(i)
|
|
print(f"{Fore.LIGHTGREEN_EX}Device {Style.RESET_ALL}{i}{Fore.LIGHTGREEN_EX}: {device_name}{Style.RESET_ALL}")
|
|
|
|
# Format each rate in cyan
|
|
if supported_rates:
|
|
rates_formatted = ", ".join([f"{Fore.CYAN}{rate}{Style.RESET_ALL}" for rate in supported_rates])
|
|
print(f" {Fore.YELLOW}Supported sample rates: {rates_formatted}{Style.RESET_ALL}")
|
|
else:
|
|
print(f" {Fore.YELLOW}Supported sample rates: None{Style.RESET_ALL}")
|
|
|
|
except Exception as e:
|
|
print(f"Error listing devices: {e}")
|
|
finally:
|
|
if self.audio_interface:
|
|
self.audio_interface.terminate()
|
|
|
|
def setup(self):
|
|
"""Initialize audio interface and open stream"""
|
|
try:
|
|
self.audio_interface = pyaudio.PyAudio()
|
|
|
|
if self.debug_mode:
|
|
print(f"Input device index: {self.input_device_index}")
|
|
actual_device_index = (self.input_device_index if self.input_device_index is not None
|
|
else self.audio_interface.get_default_input_device_info()['index'])
|
|
|
|
if self.debug_mode:
|
|
print(f"Actual selected device index: {actual_device_index}")
|
|
self.input_device_index = actual_device_index
|
|
self.device_sample_rate = self._get_best_sample_rate(actual_device_index, self.target_samplerate)
|
|
|
|
if self.debug_mode:
|
|
print(f"Setting up audio on device {self.input_device_index} with sample rate {self.device_sample_rate}")
|
|
|
|
try:
|
|
self.stream = self.audio_interface.open(
|
|
format=self.audio_format,
|
|
channels=self.channels,
|
|
rate=self.device_sample_rate,
|
|
input=True,
|
|
frames_per_buffer=self.chunk_size,
|
|
input_device_index=self.input_device_index,
|
|
)
|
|
if self.debug_mode:
|
|
print(f"Audio recording initialized successfully at {self.device_sample_rate} Hz")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Failed to initialize audio stream at {self.device_sample_rate} Hz: {e}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Error initializing audio recording: {e}")
|
|
if self.audio_interface:
|
|
self.audio_interface.terminate()
|
|
return False
|
|
|
|
def lowpass_filter(self, signal, cutoff_freq, sample_rate):
|
|
"""
|
|
Apply a low-pass Butterworth filter to prevent aliasing in the signal.
|
|
|
|
Args:
|
|
signal (np.ndarray): Input audio signal to filter
|
|
cutoff_freq (float): Cutoff frequency in Hz
|
|
sample_rate (float): Sampling rate of the input signal in Hz
|
|
|
|
Returns:
|
|
np.ndarray: Filtered audio signal
|
|
|
|
Notes:
|
|
- Uses a 5th order Butterworth filter
|
|
- Applies zero-phase filtering using filtfilt
|
|
"""
|
|
# Calculate the Nyquist frequency (half the sample rate)
|
|
nyquist_rate = sample_rate / 2.0
|
|
|
|
# Normalize cutoff frequency to Nyquist rate (required by butter())
|
|
normal_cutoff = cutoff_freq / nyquist_rate
|
|
|
|
# Design the Butterworth filter
|
|
b, a = butter(5, normal_cutoff, btype='low', analog=False)
|
|
|
|
# Apply zero-phase filtering (forward and backward)
|
|
filtered_signal = filtfilt(b, a, signal)
|
|
return filtered_signal
|
|
|
|
def resample_audio(self, pcm_data, target_sample_rate, original_sample_rate):
|
|
"""
|
|
Filter and resample audio data to a target sample rate.
|
|
|
|
Args:
|
|
pcm_data (np.ndarray): Input audio data
|
|
target_sample_rate (int): Desired output sample rate in Hz
|
|
original_sample_rate (int): Original sample rate of input in Hz
|
|
|
|
Returns:
|
|
np.ndarray: Resampled audio data
|
|
|
|
Notes:
|
|
- Applies anti-aliasing filter before resampling
|
|
- Uses polyphase filtering for high-quality resampling
|
|
"""
|
|
if target_sample_rate < original_sample_rate:
|
|
# Downsampling with low-pass filter
|
|
pcm_filtered = self.lowpass_filter(pcm_data, target_sample_rate / 2, original_sample_rate)
|
|
resampled = resample_poly(pcm_filtered, target_sample_rate, original_sample_rate)
|
|
else:
|
|
# Upsampling without low-pass filter
|
|
resampled = resample_poly(pcm_data, target_sample_rate, original_sample_rate)
|
|
return resampled
|
|
|
|
def read_chunk(self):
|
|
"""Read a chunk of audio data"""
|
|
return self.stream.read(self.chunk_size, exception_on_overflow=False)
|
|
|
|
def cleanup(self):
|
|
"""Clean up audio resources"""
|
|
try:
|
|
if self.stream:
|
|
self.stream.stop_stream()
|
|
self.stream.close()
|
|
self.stream = None
|
|
if self.audio_interface:
|
|
self.audio_interface.terminate()
|
|
self.audio_interface = None
|
|
except Exception as e:
|
|
print(f"Error cleaning up audio resources: {e}")
|