Refactor audio input handling to use ALSA instead of CPAL, improving device management and sample capture

This commit is contained in:
audioprog 2025-09-01 22:19:21 +02:00
parent 2463b342e5
commit 6ae68f5f71
2 changed files with 123 additions and 110 deletions

View file

@ -7,6 +7,6 @@ edition = "2024"
[dependencies] [dependencies]
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
toml = "0.9" toml = "0.9"
cpal = "0.16" alsa = "0.9"
rumqttc = "0.24" rumqttc = "0.24"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View file

@ -1,4 +1,5 @@
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use alsa::pcm::{PCM, HwParams, Access, Format, State};
use alsa::Card;
use rumqttc::{AsyncClient, MqttOptions, QoS}; use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
@ -6,7 +7,11 @@ use tokio::time::sleep;
use serde::Deserialize; use serde::Deserialize;
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use std::time::Instant; use alsa::card::Iter;
use alsa::device_name::HintIter;
use alsa::ctl::{Ctl, DeviceIter};
use alsa::{Direction, Error};
// RMS-Berechnung // RMS-Berechnung
@ -50,117 +55,60 @@ async fn main() {
mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password); mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password);
let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10); let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10);
// Audio Setup
let host = cpal::default_host();
let device = host.default_input_device().expect("Kein Mikrofon gefunden");
println!("Gefundene Eingabegeräte:");
for (idx, dev) in host.input_devices().unwrap().enumerate() {
println!("{}: {}", idx, dev.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
}
// Versuche, das erste Gerät mit "USB" oder "usb" im Namen zu finden, sonst Standardgerät
let config = if let Some(usb_device) = host.input_devices().unwrap()
.find(|dev| dev.name().map(|n| n.contains("USB") || n.contains("usb")).unwrap_or(false))
{
println!("Benutze USB-Gerät: {}", usb_device.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
usb_device.default_input_config().unwrap()
} else {
device.default_input_config().unwrap()
};
// Buffer für 2 Sekunden Samples // ALSA Setup
let sample_rate = config.sample_rate().0 as usize; // Verfügbare ALSA-Geräte auflisten
let channels = config.channels() as usize; println!("\n--- Hardware capture devices ---");
let mut buffer = Arc::new(Mutex::new(Vec::with_capacity(sample_rate * channels * 2))); list_hw_devices(Direction::Capture);
let buffer_clone = buffer.clone(); println!("\n--- PCM capture devices ---");
list_pcm_devices(Direction::Capture);
// Stream-Callback let device_name = find_first_usb_device(Direction::Capture).unwrap_or_else(|| "default".to_string());
let err_fn = |err| eprintln!("Stream error: {}", err); let sample_rate = 48000u32;
let stream = match config.sample_format() { let channels = 1u32;
cpal::SampleFormat::F32 => device.build_input_stream( let pcm = PCM::new(&device_name, alsa::Direction::Capture, false).expect("Konnte PCM-Device nicht öffnen");
&config.into(), let hwp = HwParams::any(&pcm).unwrap();
move |data: &[f32], _| { hwp.set_channels(channels).unwrap();
let mut buf = buffer_clone.lock().unwrap(); hwp.set_rate(sample_rate, alsa::ValueOr::Nearest).unwrap();
buf.extend_from_slice(data); hwp.set_format(Format::s16()).unwrap();
// Maximal 2 Sekunden im Buffer halten hwp.set_access(Access::RWInterleaved).unwrap();
let max_len = sample_rate * channels * 2; pcm.hw_params(&hwp).unwrap();
let buf_len = buf.len(); let io = pcm.io_i16().unwrap();
if buf_len > max_len { let alsa_buffer_size = (sample_rate * 2) as usize; // 2 Sekunden Buffer
buf.drain(0..(buf_len - max_len)); let mut buffer: Vec<i16> = Vec::with_capacity(alsa_buffer_size);
}
},
err_fn,
None,
),
cpal::SampleFormat::I16 => device.build_input_stream(
&config.into(),
move |data: &[i16], _| {
let mut buf = buffer_clone.lock().unwrap();
buf.extend(data.iter().map(|&s| s as f32 / i16::MAX as f32));
let max_len = sample_rate * channels * 2;
let buf_len = buf.len();
if buf_len > max_len {
buf.drain(0..(buf_len - max_len));
}
},
err_fn,
None,
),
cpal::SampleFormat::U16 => device.build_input_stream(
&config.into(),
move |data: &[u16], _| {
let mut buf = buffer_clone.lock().unwrap();
buf.extend(data.iter().map(|&s| s as f32 / u16::MAX as f32 - 0.5));
let max_len = sample_rate * channels * 2;
let buf_len = buf.len();
if buf_len > max_len {
buf.drain(0..(buf_len - max_len));
}
},
err_fn,
None,
),
_ => unreachable!("Nicht unterstütztes SampleFormat"),
}.expect("Stream konnte nicht gebaut werden");
stream.play().expect("Stream konnte nicht gestartet werden");
// Alle 2 Sekunden RMS berechnen und per MQTT senden
// Hauptloop: Samples lesen, RMS berechnen, MQTT senden
loop { loop {
println!("loopBegin"); // PCM-Buffer füllen (2 Sekunden)
// 2 Sekunden warten, dabei regelmäßig eventloop.poll() aufrufen buffer.clear();
let wait_total = Duration::from_secs(2); let mut read_samples = 0;
let wait_step = Duration::from_millis(100); while read_samples < alsa_buffer_size {
let wait_start = Instant::now(); let mut tmp = vec![0i16; alsa_buffer_size - read_samples];
match io.readi(&mut tmp) {
println!("Vor dem eventloop.poll() während Wartezeit"); Ok(n) => {
let _ = eventloop.poll().await; buffer.extend_from_slice(&tmp[..n]);
println!("Nach dem eventloop.poll() während Wartezeit"); read_samples += n;
},
while wait_start.elapsed() < wait_total { Err(e) => {
sleep(wait_step).await; if pcm.state() == State::XRun {
pcm.prepare().unwrap();
continue;
} else {
eprintln!("ALSA read error: {e}");
break;
}
}
}
// Während Aufnahme MQTT-Eventloop pollen
let _ = eventloop.poll().await;
} }
println!("Nach 2 Sekunden Wartezeit"); // RMS berechnen und senden
if !buffer.is_empty() {
let start = Instant::now(); let samples_f32: Vec<f32> = buffer.iter().map(|&s| s as f32 / i16::MAX as f32).collect();
println!("Vor dem Lock"); let rms_val = rms(&samples_f32);
let buf = loop {
println!("Versuche Buffer zu sperren");
if let Ok(guard) = buffer.lock() {
println!("Buffer gesperrt nach {:?}", start.elapsed());
// Clone the buffer data to avoid holding the lock during await
break guard.clone();
}
if start.elapsed() > Duration::from_secs(2) {
eprintln!("Konnte Buffer nicht innerhalb des Timeouts (2 Sekunden) sperren");
continue;
}
std::thread::sleep(Duration::from_millis(10));
};
println!("Nach dem Lock, Buffer-Länge: {}", buf.len());
if buf.len() >= sample_rate * channels * 2 {
let rms_val = rms(&buf);
let payload = format!("{{\"rms\":{}}}", rms_val); let payload = format!("{{\"rms\":{}}}", rms_val);
println!("Payload: {}", payload); println!("Payload: {}", payload);
// Buffer lock is dropped before this await
match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await { match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await {
Ok(Ok(_)) => println!("Payload gesendet"), Ok(Ok(_)) => println!("Payload gesendet"),
Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"), Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"),
@ -170,9 +118,74 @@ async fn main() {
), ),
} }
} }
println!("Vor dem eventloop.poll()"); // Nach jedem Zyklus noch einmal eventloop pollen
// MQTT Eventloop weiterlaufen lassen
let _ = eventloop.poll().await; let _ = eventloop.poll().await;
println!("loopEnde")
} }
} }
// Each card can have multiple devices and subdevDurationt them all
fn list_devices_for_card(card: &Card, direction: Direction) -> Result<bool, Error>{
// Get a Ctl for the card
let ctl_id = format!("hw:{}", card.get_index());
let ctl = Ctl::new(&ctl_id, false)?;
// Read card id and name
let cardinfo = ctl.card_info()?;
let card_id = cardinfo.get_id()?;
let card_name = cardinfo.get_name()?;
for device in DeviceIter::new(&ctl) {
// Read info from Ctl
let pcm_info = ctl.pcm_info(device as u32, 0, direction)?;
// Read PCM name
let pcm_name = pcm_info.get_name()?.to_string();
println!("card: {:<2} id: {:<10} device: {:<2} card name: '{}' PCM name: '{}'", card.get_index(), card_id, device, card_name, pcm_name);
// Loop through subdevices and get their names
let subdevs = pcm_info.get_subdevices_count();
for subdev in 0..subdevs {
// Get subdevice name
let pcm_info = ctl.pcm_info(device as u32, subdev, direction)?;
let subdev_name = pcm_info.get_subdevice_name()?;
println!(" subdevice: {:<2} name: '{}'", subdev, subdev_name);
}
}
if card_name.to_lowercase().contains("usb") {
return Ok(true);
}
Ok(false)
}
pub fn list_hw_devices(direction: Direction) {
let cards = Iter::new();
cards.for_each(|card| if let Ok(c) = card { list_devices_for_card(&c, direction).unwrap_or_default(); });
}
pub fn find_first_usb_device(direction: Direction) -> Option<String> {
let cards = Iter::new();
for card in cards {
if let Ok(c) = card {
if let Ok(true) = list_devices_for_card(&c, direction) {
// Build device name string, e.g., "hw:1,0"
let index = c.get_index();
return Some(format!("hw:{},0", index));
}
}
}
None
}
pub fn list_pcm_devices(direction: Direction) {
let hints = HintIter::new_str(None, "pcm").unwrap();
for hint in hints {
// When Direction is None it means that both the PCM supports both playback and capture
if hint.name.is_some() && hint.desc.is_some() && (hint.direction.is_none() || hint.direction.map(|dir| dir == direction).unwrap_or_default()) {
println!("name: {:<35} desc: {:?}", hint.name.unwrap(), hint.desc.unwrap());
}
}
}