Compare commits
No commits in common. "main" and "0.2" have entirely different histories.
2 changed files with 111 additions and 124 deletions
|
|
@ -7,6 +7,6 @@ edition = "2024"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
toml = "0.9"
|
toml = "0.9"
|
||||||
alsa = "0.9"
|
cpal = "0.16"
|
||||||
rumqttc = "0.24"
|
rumqttc = "0.24"
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
|
|
||||||
229
src/main.rs
229
src/main.rs
|
|
@ -1,5 +1,4 @@
|
||||||
use alsa::pcm::{PCM, HwParams, Access, Format, State};
|
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||||
use alsa::Card;
|
|
||||||
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
@ -7,11 +6,7 @@ use tokio::time::sleep;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use alsa::card::Iter;
|
use std::time::Instant;
|
||||||
use alsa::device_name::HintIter;
|
|
||||||
use alsa::ctl::{Ctl, DeviceIter};
|
|
||||||
use alsa::{Direction, Error};
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// RMS-Berechnung
|
// RMS-Berechnung
|
||||||
|
|
@ -55,60 +50,117 @@ async fn main() {
|
||||||
mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password);
|
mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password);
|
||||||
let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10);
|
let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10);
|
||||||
|
|
||||||
|
// Audio Setup
|
||||||
// ALSA Setup
|
let host = cpal::default_host();
|
||||||
// Verfügbare ALSA-Geräte auflisten
|
let device = host.default_input_device().expect("Kein Mikrofon gefunden");
|
||||||
println!("\n--- Hardware capture devices ---");
|
println!("Gefundene Eingabegeräte:");
|
||||||
list_hw_devices(Direction::Capture);
|
for (idx, dev) in host.input_devices().unwrap().enumerate() {
|
||||||
println!("\n--- PCM capture devices ---");
|
println!("{}: {}", idx, dev.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
|
||||||
list_pcm_devices(Direction::Capture);
|
}
|
||||||
|
// Versuche, das erste Gerät mit "USB" oder "usb" im Namen zu finden, sonst Standardgerät
|
||||||
let device_name = find_first_usb_device(Direction::Capture).unwrap_or_else(|| "default".to_string());
|
let config = if let Some(usb_device) = host.input_devices().unwrap()
|
||||||
let sample_rate = 48000u32;
|
.find(|dev| dev.name().map(|n| n.contains("USB") || n.contains("usb")).unwrap_or(false))
|
||||||
let channels = 1u32;
|
{
|
||||||
let pcm = PCM::new(&device_name, alsa::Direction::Capture, false).expect("Konnte PCM-Device nicht öffnen");
|
println!("Benutze USB-Gerät: {}", usb_device.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
|
||||||
let hwp = HwParams::any(&pcm).unwrap();
|
usb_device.default_input_config().unwrap()
|
||||||
hwp.set_channels(channels).unwrap();
|
|
||||||
hwp.set_rate(sample_rate, alsa::ValueOr::Nearest).unwrap();
|
|
||||||
hwp.set_format(Format::s16()).unwrap();
|
|
||||||
hwp.set_access(Access::RWInterleaved).unwrap();
|
|
||||||
pcm.hw_params(&hwp).unwrap();
|
|
||||||
let io = pcm.io_i16().unwrap();
|
|
||||||
let alsa_buffer_size = (sample_rate * 2) as usize; // 2 Sekunden Buffer
|
|
||||||
let mut buffer: Vec<i16> = Vec::with_capacity(alsa_buffer_size);
|
|
||||||
|
|
||||||
|
|
||||||
// Hauptloop: Samples lesen, RMS berechnen, MQTT senden
|
|
||||||
loop {
|
|
||||||
// PCM-Buffer füllen (2 Sekunden)
|
|
||||||
buffer.clear();
|
|
||||||
let mut read_samples = 0;
|
|
||||||
while read_samples < alsa_buffer_size {
|
|
||||||
let mut tmp = vec![0i16; alsa_buffer_size - read_samples];
|
|
||||||
match io.readi(&mut tmp) {
|
|
||||||
Ok(n) => {
|
|
||||||
buffer.extend_from_slice(&tmp[..n]);
|
|
||||||
read_samples += n;
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
if pcm.state() == State::XRun {
|
|
||||||
pcm.prepare().unwrap();
|
|
||||||
continue;
|
|
||||||
} else {
|
} else {
|
||||||
eprintln!("ALSA read error: {e}");
|
device.default_input_config().unwrap()
|
||||||
break;
|
};
|
||||||
|
|
||||||
|
// Buffer für 2 Sekunden Samples
|
||||||
|
let sample_rate = config.sample_rate().0 as usize;
|
||||||
|
let channels = config.channels() as usize;
|
||||||
|
let mut buffer = Arc::new(Mutex::new(Vec::with_capacity(sample_rate * channels * 2)));
|
||||||
|
let buffer_clone = buffer.clone();
|
||||||
|
|
||||||
|
// Stream-Callback
|
||||||
|
let err_fn = |err| eprintln!("Stream error: {}", err);
|
||||||
|
let stream = match config.sample_format() {
|
||||||
|
cpal::SampleFormat::F32 => device.build_input_stream(
|
||||||
|
&config.into(),
|
||||||
|
move |data: &[f32], _| {
|
||||||
|
let mut buf = buffer_clone.lock().unwrap();
|
||||||
|
buf.extend_from_slice(data);
|
||||||
|
// Maximal 2 Sekunden im Buffer halten
|
||||||
|
let max_len = sample_rate * channels * 2;
|
||||||
|
let buf_len = buf.len();
|
||||||
|
if buf_len > max_len {
|
||||||
|
buf.drain(0..(buf_len - max_len));
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
err_fn,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
cpal::SampleFormat::I16 => device.build_input_stream(
|
||||||
|
&config.into(),
|
||||||
|
move |data: &[i16], _| {
|
||||||
|
let mut buf = buffer_clone.lock().unwrap();
|
||||||
|
buf.extend(data.iter().map(|&s| s as f32 / i16::MAX as f32));
|
||||||
|
let max_len = sample_rate * channels * 2;
|
||||||
|
let buf_len = buf.len();
|
||||||
|
if buf_len > max_len {
|
||||||
|
buf.drain(0..(buf_len - max_len));
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
err_fn,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
cpal::SampleFormat::U16 => device.build_input_stream(
|
||||||
|
&config.into(),
|
||||||
|
move |data: &[u16], _| {
|
||||||
|
let mut buf = buffer_clone.lock().unwrap();
|
||||||
|
buf.extend(data.iter().map(|&s| s as f32 / u16::MAX as f32 - 0.5));
|
||||||
|
let max_len = sample_rate * channels * 2;
|
||||||
|
let buf_len = buf.len();
|
||||||
|
if buf_len > max_len {
|
||||||
|
buf.drain(0..(buf_len - max_len));
|
||||||
}
|
}
|
||||||
// Während Aufnahme MQTT-Eventloop pollen
|
},
|
||||||
|
err_fn,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
_ => unreachable!("Nicht unterstütztes SampleFormat"),
|
||||||
|
}.expect("Stream konnte nicht gebaut werden");
|
||||||
|
stream.play().expect("Stream konnte nicht gestartet werden");
|
||||||
|
|
||||||
|
// Alle 2 Sekunden RMS berechnen und per MQTT senden
|
||||||
|
loop {
|
||||||
|
println!("loopBegin");
|
||||||
|
// 2 Sekunden warten, dabei regelmäßig eventloop.poll() aufrufen
|
||||||
|
let wait_total = Duration::from_secs(2);
|
||||||
|
let wait_step = Duration::from_millis(100);
|
||||||
|
let wait_start = Instant::now();
|
||||||
|
|
||||||
|
println!("Vor dem eventloop.poll() während Wartezeit");
|
||||||
let _ = eventloop.poll().await;
|
let _ = eventloop.poll().await;
|
||||||
|
println!("Nach dem eventloop.poll() während Wartezeit");
|
||||||
|
|
||||||
|
while wait_start.elapsed() < wait_total {
|
||||||
|
sleep(wait_step).await;
|
||||||
}
|
}
|
||||||
// RMS berechnen und senden
|
println!("Nach 2 Sekunden Wartezeit");
|
||||||
if !buffer.is_empty() {
|
|
||||||
let samples_f32: Vec<f32> = buffer.iter().map(|&s| s as f32 / i16::MAX as f32).collect();
|
let start = Instant::now();
|
||||||
let rms_val = rms(&samples_f32);
|
println!("Vor dem Lock");
|
||||||
|
let buf = loop {
|
||||||
|
println!("Versuche Buffer zu sperren");
|
||||||
|
if let Ok(guard) = buffer.lock() {
|
||||||
|
println!("Buffer gesperrt nach {:?}", start.elapsed());
|
||||||
|
// Clone the buffer data to avoid holding the lock during await
|
||||||
|
break guard.clone();
|
||||||
|
}
|
||||||
|
if start.elapsed() > Duration::from_secs(2) {
|
||||||
|
eprintln!("Konnte Buffer nicht innerhalb des Timeouts (2 Sekunden) sperren");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
std::thread::sleep(Duration::from_millis(10));
|
||||||
|
};
|
||||||
|
println!("Nach dem Lock, Buffer-Länge: {}", buf.len());
|
||||||
|
if buf.len() >= sample_rate * channels * 2 {
|
||||||
|
let rms_val = rms(&buf);
|
||||||
let payload = format!("{{\"rms\":{}}}", rms_val);
|
let payload = format!("{{\"rms\":{}}}", rms_val);
|
||||||
println!("Payload: {}", payload);
|
println!("Payload: {}", payload);
|
||||||
|
// Buffer lock is dropped before this await
|
||||||
match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await {
|
match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await {
|
||||||
Ok(Ok(_)) => println!("Payload gesendet"),
|
Ok(Ok(_)) => println!("Payload gesendet"),
|
||||||
Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"),
|
Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"),
|
||||||
|
|
@ -118,74 +170,9 @@ async fn main() {
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Nach jedem Zyklus noch einmal eventloop pollen
|
println!("Vor dem eventloop.poll()");
|
||||||
|
// MQTT Eventloop weiterlaufen lassen
|
||||||
let _ = eventloop.poll().await;
|
let _ = eventloop.poll().await;
|
||||||
|
println!("loopEnde")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Each card can have multiple devices and subdevDurationt them all
|
|
||||||
fn list_devices_for_card(card: &Card, direction: Direction) -> Result<bool, Error>{
|
|
||||||
// Get a Ctl for the card
|
|
||||||
let ctl_id = format!("hw:{}", card.get_index());
|
|
||||||
let ctl = Ctl::new(&ctl_id, false)?;
|
|
||||||
|
|
||||||
// Read card id and name
|
|
||||||
let cardinfo = ctl.card_info()?;
|
|
||||||
let card_id = cardinfo.get_id()?;
|
|
||||||
let card_name = cardinfo.get_name()?;
|
|
||||||
for device in DeviceIter::new(&ctl) {
|
|
||||||
// Read info from Ctl
|
|
||||||
let pcm_info = ctl.pcm_info(device as u32, 0, direction)?;
|
|
||||||
|
|
||||||
// Read PCM name
|
|
||||||
let pcm_name = pcm_info.get_name()?.to_string();
|
|
||||||
|
|
||||||
println!("card: {:<2} id: {:<10} device: {:<2} card name: '{}' PCM name: '{}'", card.get_index(), card_id, device, card_name, pcm_name);
|
|
||||||
|
|
||||||
// Loop through subdevices and get their names
|
|
||||||
let subdevs = pcm_info.get_subdevices_count();
|
|
||||||
for subdev in 0..subdevs {
|
|
||||||
// Get subdevice name
|
|
||||||
let pcm_info = ctl.pcm_info(device as u32, subdev, direction)?;
|
|
||||||
let subdev_name = pcm_info.get_subdevice_name()?;
|
|
||||||
|
|
||||||
println!(" subdevice: {:<2} name: '{}'", subdev, subdev_name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if card_name.to_lowercase().contains("usb") {
|
|
||||||
return Ok(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn list_hw_devices(direction: Direction) {
|
|
||||||
let cards = Iter::new();
|
|
||||||
cards.for_each(|card| if let Ok(c) = card { list_devices_for_card(&c, direction).unwrap_or_default(); });
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn find_first_usb_device(direction: Direction) -> Option<String> {
|
|
||||||
let cards = Iter::new();
|
|
||||||
for card in cards {
|
|
||||||
if let Ok(c) = card {
|
|
||||||
if let Ok(true) = list_devices_for_card(&c, direction) {
|
|
||||||
// Build device name string, e.g., "hw:1,0"
|
|
||||||
let index = c.get_index();
|
|
||||||
return Some(format!("hw:{},0", index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn list_pcm_devices(direction: Direction) {
|
|
||||||
let hints = HintIter::new_str(None, "pcm").unwrap();
|
|
||||||
for hint in hints {
|
|
||||||
// When Direction is None it means that both the PCM supports both playback and capture
|
|
||||||
if hint.name.is_some() && hint.desc.is_some() && (hint.direction.is_none() || hint.direction.map(|dir| dir == direction).unwrap_or_default()) {
|
|
||||||
println!("name: {:<35} desc: {:?}", hint.name.unwrap(), hint.desc.unwrap());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue