diff --git a/Cargo.toml b/Cargo.toml index 86734e3..f9171fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,6 @@ edition = "2024" [dependencies] serde = { version = "1.0", features = ["derive"] } toml = "0.9" -cpal = "0.16" +alsa = "0.9" rumqttc = "0.24" tokio = { version = "1", features = ["full"] } diff --git a/src/main.rs b/src/main.rs index 73e31e2..ea80fd8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use alsa::pcm::{PCM, HwParams, Access, Format, State}; +use alsa::Card; use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -6,7 +7,11 @@ use tokio::time::sleep; use serde::Deserialize; use std::fs; use std::path::Path; -use std::time::Instant; +use alsa::card::Iter; +use alsa::device_name::HintIter; +use alsa::ctl::{Ctl, DeviceIter}; +use alsa::{Direction, Error}; + // RMS-Berechnung @@ -50,117 +55,60 @@ async fn main() { mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password); let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10); - // Audio Setup - let host = cpal::default_host(); - let device = host.default_input_device().expect("Kein Mikrofon gefunden"); - println!("Gefundene Eingabegeräte:"); - for (idx, dev) in host.input_devices().unwrap().enumerate() { - println!("{}: {}", idx, dev.name().unwrap_or_else(|_| "".to_string())); - } - // Versuche, das erste Gerät mit "USB" oder "usb" im Namen zu finden, sonst Standardgerät - let config = if let Some(usb_device) = host.input_devices().unwrap() - .find(|dev| dev.name().map(|n| n.contains("USB") || n.contains("usb")).unwrap_or(false)) - { - println!("Benutze USB-Gerät: {}", usb_device.name().unwrap_or_else(|_| "".to_string())); - usb_device.default_input_config().unwrap() - } else { - device.default_input_config().unwrap() - }; - // Buffer für 2 Sekunden Samples - let sample_rate = config.sample_rate().0 as usize; - let channels = config.channels() as usize; - let mut buffer = Arc::new(Mutex::new(Vec::with_capacity(sample_rate * channels * 2))); - let buffer_clone = buffer.clone(); + // ALSA Setup + // Verfügbare ALSA-Geräte auflisten + println!("\n--- Hardware capture devices ---"); + list_hw_devices(Direction::Capture); + println!("\n--- PCM capture devices ---"); + list_pcm_devices(Direction::Capture); + + let device_name = find_first_usb_device(Direction::Capture).unwrap_or_else(|| "default".to_string()); + let sample_rate = 48000u32; + let channels = 1u32; + let pcm = PCM::new(&device_name, alsa::Direction::Capture, false).expect("Konnte PCM-Device nicht öffnen"); + let hwp = HwParams::any(&pcm).unwrap(); + hwp.set_channels(channels).unwrap(); + hwp.set_rate(sample_rate, alsa::ValueOr::Nearest).unwrap(); + hwp.set_format(Format::s16()).unwrap(); + hwp.set_access(Access::RWInterleaved).unwrap(); + pcm.hw_params(&hwp).unwrap(); + let io = pcm.io_i16().unwrap(); + let alsa_buffer_size = (sample_rate * 2) as usize; // 2 Sekunden Buffer + let mut buffer: Vec = Vec::with_capacity(alsa_buffer_size); - // Stream-Callback - let err_fn = |err| eprintln!("Stream error: {}", err); - let stream = match config.sample_format() { - cpal::SampleFormat::F32 => device.build_input_stream( - &config.into(), - move |data: &[f32], _| { - let mut buf = buffer_clone.lock().unwrap(); - buf.extend_from_slice(data); - // Maximal 2 Sekunden im Buffer halten - let max_len = sample_rate * channels * 2; - let buf_len = buf.len(); - if buf_len > max_len { - buf.drain(0..(buf_len - max_len)); - } - }, - err_fn, - None, - ), - cpal::SampleFormat::I16 => device.build_input_stream( - &config.into(), - move |data: &[i16], _| { - let mut buf = buffer_clone.lock().unwrap(); - buf.extend(data.iter().map(|&s| s as f32 / i16::MAX as f32)); - let max_len = sample_rate * channels * 2; - let buf_len = buf.len(); - if buf_len > max_len { - buf.drain(0..(buf_len - max_len)); - } - }, - err_fn, - None, - ), - cpal::SampleFormat::U16 => device.build_input_stream( - &config.into(), - move |data: &[u16], _| { - let mut buf = buffer_clone.lock().unwrap(); - buf.extend(data.iter().map(|&s| s as f32 / u16::MAX as f32 - 0.5)); - let max_len = sample_rate * channels * 2; - let buf_len = buf.len(); - if buf_len > max_len { - buf.drain(0..(buf_len - max_len)); - } - }, - err_fn, - None, - ), - _ => unreachable!("Nicht unterstütztes SampleFormat"), - }.expect("Stream konnte nicht gebaut werden"); - stream.play().expect("Stream konnte nicht gestartet werden"); - // Alle 2 Sekunden RMS berechnen und per MQTT senden + // Hauptloop: Samples lesen, RMS berechnen, MQTT senden loop { - println!("loopBegin"); - // 2 Sekunden warten, dabei regelmäßig eventloop.poll() aufrufen - let wait_total = Duration::from_secs(2); - let wait_step = Duration::from_millis(100); - let wait_start = Instant::now(); - - println!("Vor dem eventloop.poll() während Wartezeit"); - let _ = eventloop.poll().await; - println!("Nach dem eventloop.poll() während Wartezeit"); - - while wait_start.elapsed() < wait_total { - sleep(wait_step).await; + // PCM-Buffer füllen (2 Sekunden) + buffer.clear(); + let mut read_samples = 0; + while read_samples < alsa_buffer_size { + let mut tmp = vec![0i16; alsa_buffer_size - read_samples]; + match io.readi(&mut tmp) { + Ok(n) => { + buffer.extend_from_slice(&tmp[..n]); + read_samples += n; + }, + Err(e) => { + if pcm.state() == State::XRun { + pcm.prepare().unwrap(); + continue; + } else { + eprintln!("ALSA read error: {e}"); + break; + } + } + } + // Während Aufnahme MQTT-Eventloop pollen + let _ = eventloop.poll().await; } - println!("Nach 2 Sekunden Wartezeit"); - - let start = Instant::now(); - println!("Vor dem Lock"); - let buf = loop { - println!("Versuche Buffer zu sperren"); - if let Ok(guard) = buffer.lock() { - println!("Buffer gesperrt nach {:?}", start.elapsed()); - // Clone the buffer data to avoid holding the lock during await - break guard.clone(); - } - if start.elapsed() > Duration::from_secs(2) { - eprintln!("Konnte Buffer nicht innerhalb des Timeouts (2 Sekunden) sperren"); - continue; - } - std::thread::sleep(Duration::from_millis(10)); - }; - println!("Nach dem Lock, Buffer-Länge: {}", buf.len()); - if buf.len() >= sample_rate * channels * 2 { - let rms_val = rms(&buf); + // RMS berechnen und senden + if !buffer.is_empty() { + let samples_f32: Vec = buffer.iter().map(|&s| s as f32 / i16::MAX as f32).collect(); + let rms_val = rms(&samples_f32); let payload = format!("{{\"rms\":{}}}", rms_val); println!("Payload: {}", payload); - // Buffer lock is dropped before this await match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await { Ok(Ok(_)) => println!("Payload gesendet"), Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"), @@ -170,9 +118,74 @@ async fn main() { ), } } - println!("Vor dem eventloop.poll()"); - // MQTT Eventloop weiterlaufen lassen + // Nach jedem Zyklus noch einmal eventloop pollen let _ = eventloop.poll().await; - println!("loopEnde") } } + +// Each card can have multiple devices and subdevDurationt them all +fn list_devices_for_card(card: &Card, direction: Direction) -> Result{ + // Get a Ctl for the card + let ctl_id = format!("hw:{}", card.get_index()); + let ctl = Ctl::new(&ctl_id, false)?; + + // Read card id and name + let cardinfo = ctl.card_info()?; + let card_id = cardinfo.get_id()?; + let card_name = cardinfo.get_name()?; + for device in DeviceIter::new(&ctl) { + // Read info from Ctl + let pcm_info = ctl.pcm_info(device as u32, 0, direction)?; + + // Read PCM name + let pcm_name = pcm_info.get_name()?.to_string(); + + println!("card: {:<2} id: {:<10} device: {:<2} card name: '{}' PCM name: '{}'", card.get_index(), card_id, device, card_name, pcm_name); + + // Loop through subdevices and get their names + let subdevs = pcm_info.get_subdevices_count(); + for subdev in 0..subdevs { + // Get subdevice name + let pcm_info = ctl.pcm_info(device as u32, subdev, direction)?; + let subdev_name = pcm_info.get_subdevice_name()?; + + println!(" subdevice: {:<2} name: '{}'", subdev, subdev_name); + } + } + + if card_name.to_lowercase().contains("usb") { + return Ok(true); + } + + Ok(false) +} + +pub fn list_hw_devices(direction: Direction) { + let cards = Iter::new(); + cards.for_each(|card| if let Ok(c) = card { list_devices_for_card(&c, direction).unwrap_or_default(); }); +} + +pub fn find_first_usb_device(direction: Direction) -> Option { + let cards = Iter::new(); + for card in cards { + if let Ok(c) = card { + if let Ok(true) = list_devices_for_card(&c, direction) { + // Build device name string, e.g., "hw:1,0" + let index = c.get_index(); + return Some(format!("hw:{},0", index)); + } + } + } + None +} + +pub fn list_pcm_devices(direction: Direction) { + let hints = HintIter::new_str(None, "pcm").unwrap(); + for hint in hints { + // When Direction is None it means that both the PCM supports both playback and capture + if hint.name.is_some() && hint.desc.is_some() && (hint.direction.is_none() || hint.direction.map(|dir| dir == direction).unwrap_or_default()) { + println!("name: {:<35} desc: {:?}", hint.name.unwrap(), hint.desc.unwrap()); + } + } +} +