Compare commits

..

No commits in common. "main" and "0.1" have entirely different histories.
main ... 0.1

4 changed files with 120 additions and 152 deletions

View file

@ -7,6 +7,6 @@ edition = "2024"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
toml = "0.9"
alsa = "0.9"
cpal = "0.16"
rumqttc = "0.24"
tokio = { version = "1", features = ["full"] }

View file

@ -51,30 +51,7 @@ Alle 2 Sekunden wird ein JSON wie folgt gesendet:
- Bei fehlender Verbindung wird automatisch reconnectet.
- Fehler beim Senden werden geloggt, blockieren aber nicht den Hauptloop.
## Autostart
```bash
sudo apt install tmux
nano ~/bashrc
```
Am Ende hinzufügen
```bash
if ! tmux has-session -t loudnessspy 2>/dev/null; then
tmux new-session -d -s loudnessspy '~/loudnessspy'
fi
```
### tmux Pane Control
```tmux attach```
Ctrl b, " Split pane horizontally \
Ctrl b, % Split pane vertically \
Ctrl b, o Next pane
rc.local script to start LoudnessSpy on boot
---
Autor: Leonhard Suckau

7
rc.local Executable file
View file

@ -0,0 +1,7 @@
#!/bin/sh -e
# rc.local
# Start LoudnessSpy
/opt/loudnessspy/loudnessspy &
exit 0

View file

@ -1,5 +1,4 @@
use alsa::pcm::{PCM, HwParams, Access, Format, State};
use alsa::Card;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@ -7,11 +6,7 @@ use tokio::time::sleep;
use serde::Deserialize;
use std::fs;
use std::path::Path;
use alsa::card::Iter;
use alsa::device_name::HintIter;
use alsa::ctl::{Ctl, DeviceIter};
use alsa::{Direction, Error};
use std::time::Instant;
// RMS-Berechnung
@ -55,137 +50,126 @@ async fn main() {
mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password);
let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10);
// Audio Setup
let host = cpal::default_host();
let device = host.default_input_device().expect("Kein Mikrofon gefunden");
println!("Gefundene Eingabegeräte:");
for (idx, dev) in host.input_devices().unwrap().enumerate() {
println!("{}: {}", idx, dev.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
}
// Versuche, das erste Gerät mit "USB" oder "usb" im Namen zu finden, sonst Standardgerät
let config = if let Some(usb_device) = host.input_devices().unwrap()
.find(|dev| dev.name().map(|n| n.contains("USB") || n.contains("usb")).unwrap_or(false))
{
println!("Benutze USB-Gerät: {}", usb_device.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
usb_device.default_input_config().unwrap()
} else {
device.default_input_config().unwrap()
};
// ALSA Setup
// Verfügbare ALSA-Geräte auflisten
println!("\n--- Hardware capture devices ---");
list_hw_devices(Direction::Capture);
println!("\n--- PCM capture devices ---");
list_pcm_devices(Direction::Capture);
let device_name = find_first_usb_device(Direction::Capture).unwrap_or_else(|| "default".to_string());
let sample_rate = 48000u32;
let channels = 1u32;
let pcm = PCM::new(&device_name, alsa::Direction::Capture, false).expect("Konnte PCM-Device nicht öffnen");
let hwp = HwParams::any(&pcm).unwrap();
hwp.set_channels(channels).unwrap();
hwp.set_rate(sample_rate, alsa::ValueOr::Nearest).unwrap();
hwp.set_format(Format::s16()).unwrap();
hwp.set_access(Access::RWInterleaved).unwrap();
pcm.hw_params(&hwp).unwrap();
let io = pcm.io_i16().unwrap();
let alsa_buffer_size = (sample_rate * 2) as usize; // 2 Sekunden Buffer
let mut buffer: Vec<i16> = Vec::with_capacity(alsa_buffer_size);
// Buffer für 2 Sekunden Samples
let sample_rate = config.sample_rate().0 as usize;
let channels = config.channels() as usize;
let mut buffer = Arc::new(Mutex::new(Vec::with_capacity(sample_rate * channels * 2)));
let buffer_clone = buffer.clone();
// Hauptloop: Samples lesen, RMS berechnen, MQTT senden
loop {
// PCM-Buffer füllen (2 Sekunden)
buffer.clear();
let mut read_samples = 0;
while read_samples < alsa_buffer_size {
let mut tmp = vec![0i16; alsa_buffer_size - read_samples];
match io.readi(&mut tmp) {
Ok(n) => {
buffer.extend_from_slice(&tmp[..n]);
read_samples += n;
},
Err(e) => {
if pcm.state() == State::XRun {
pcm.prepare().unwrap();
continue;
} else {
eprintln!("ALSA read error: {e}");
break;
}
// Stream-Callback
let err_fn = |err| eprintln!("Stream error: {}", err);
let stream = match config.sample_format() {
cpal::SampleFormat::F32 => device.build_input_stream(
&config.into(),
move |data: &[f32], _| {
let mut buf = buffer_clone.lock().unwrap();
buf.extend_from_slice(data);
// Maximal 2 Sekunden im Buffer halten
let max_len = sample_rate * channels * 2;
let buf_len = buf.len();
if buf_len > max_len {
buf.drain(0..(buf_len - max_len));
}
}
// Während Aufnahme MQTT-Eventloop pollen
let _ = eventloop.poll().await;
},
err_fn,
None,
),
cpal::SampleFormat::I16 => device.build_input_stream(
&config.into(),
move |data: &[i16], _| {
let mut buf = buffer_clone.lock().unwrap();
buf.extend(data.iter().map(|&s| s as f32 / i16::MAX as f32));
let max_len = sample_rate * channels * 2;
let buf_len = buf.len();
if buf_len > max_len {
buf.drain(0..(buf_len - max_len));
}
},
err_fn,
None,
),
cpal::SampleFormat::U16 => device.build_input_stream(
&config.into(),
move |data: &[u16], _| {
let mut buf = buffer_clone.lock().unwrap();
buf.extend(data.iter().map(|&s| s as f32 / u16::MAX as f32 - 0.5));
let max_len = sample_rate * channels * 2;
let buf_len = buf.len();
if buf_len > max_len {
buf.drain(0..(buf_len - max_len));
}
},
err_fn,
None,
),
_ => unreachable!("Nicht unterstütztes SampleFormat"),
}.expect("Stream konnte nicht gebaut werden");
stream.play().expect("Stream konnte nicht gestartet werden");
// Alle 2 Sekunden RMS berechnen und per MQTT senden
loop {
println!("loopBegin");
// 2 Sekunden warten, dabei regelmäßig eventloop.poll() aufrufen
let wait_total = Duration::from_secs(2);
let wait_step = Duration::from_millis(100);
let wait_start = Instant::now();
println!("Vor dem eventloop.poll() während Wartezeit");
let _ = eventloop.poll().await;
println!("Nach dem eventloop.poll() während Wartezeit");
while wait_start.elapsed() < wait_total {
sleep(wait_step).await;
}
// RMS berechnen und senden
if !buffer.is_empty() {
let samples_f32: Vec<f32> = buffer.iter().map(|&s| s as f32 / i16::MAX as f32).collect();
let rms_val = rms(&samples_f32);
println!("Nach 2 Sekunden Wartezeit");
let start = Instant::now();
println!("Vor dem Lock");
let buf = loop {
println!("Versuche Buffer zu sperren");
if let Ok(guard) = buffer.lock() {
println!("Buffer gesperrt nach {:?}", start.elapsed());
// Clone the buffer data to avoid holding the lock during await
break guard.clone();
}
if start.elapsed() > Duration::from_secs(2) {
eprintln!("Konnte Buffer nicht innerhalb des Timeouts (2 Sekunden) sperren");
continue;
}
std::thread::sleep(Duration::from_millis(10));
};
println!("Nach dem Lock, Buffer-Länge: {}", buf.len());
if buf.len() >= sample_rate * channels * 2 {
let rms_val = rms(&buf);
let payload = format!("{{\"rms\":{}}}", rms_val);
println!("Payload: {}", payload);
// Buffer lock is dropped before this await
match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await {
Ok(Ok(_)) => println!("Payload gesendet"),
Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"),
Err(_) => eprintln!(
"Timeout beim Senden an MQTT (keine Verbindung zu {}:{})",
mqtt_config.host, mqtt_config.port
),
Err(_) => eprintln!("Timeout beim Senden an MQTT (keine Verbindung?)"),
}
}
// Nach jedem Zyklus noch einmal eventloop pollen
println!("Vor dem eventloop.poll()");
// MQTT Eventloop weiterlaufen lassen
let _ = eventloop.poll().await;
println!("loopEnde")
}
}
// Each card can have multiple devices and subdevDurationt them all
fn list_devices_for_card(card: &Card, direction: Direction) -> Result<bool, Error>{
// Get a Ctl for the card
let ctl_id = format!("hw:{}", card.get_index());
let ctl = Ctl::new(&ctl_id, false)?;
// Read card id and name
let cardinfo = ctl.card_info()?;
let card_id = cardinfo.get_id()?;
let card_name = cardinfo.get_name()?;
for device in DeviceIter::new(&ctl) {
// Read info from Ctl
let pcm_info = ctl.pcm_info(device as u32, 0, direction)?;
// Read PCM name
let pcm_name = pcm_info.get_name()?.to_string();
println!("card: {:<2} id: {:<10} device: {:<2} card name: '{}' PCM name: '{}'", card.get_index(), card_id, device, card_name, pcm_name);
// Loop through subdevices and get their names
let subdevs = pcm_info.get_subdevices_count();
for subdev in 0..subdevs {
// Get subdevice name
let pcm_info = ctl.pcm_info(device as u32, subdev, direction)?;
let subdev_name = pcm_info.get_subdevice_name()?;
println!(" subdevice: {:<2} name: '{}'", subdev, subdev_name);
}
}
if card_name.to_lowercase().contains("usb") {
return Ok(true);
}
Ok(false)
}
pub fn list_hw_devices(direction: Direction) {
let cards = Iter::new();
cards.for_each(|card| if let Ok(c) = card { list_devices_for_card(&c, direction).unwrap_or_default(); });
}
pub fn find_first_usb_device(direction: Direction) -> Option<String> {
let cards = Iter::new();
for card in cards {
if let Ok(c) = card {
if let Ok(true) = list_devices_for_card(&c, direction) {
// Build device name string, e.g., "hw:1,0"
let index = c.get_index();
return Some(format!("hw:{},0", index));
}
}
}
None
}
pub fn list_pcm_devices(direction: Direction) {
let hints = HintIter::new_str(None, "pcm").unwrap();
for hint in hints {
// When Direction is None it means that both the PCM supports both playback and capture
if hint.name.is_some() && hint.desc.is_some() && (hint.direction.is_none() || hint.direction.map(|dir| dir == direction).unwrap_or_default()) {
println!("name: {:<35} desc: {:?}", hint.name.unwrap(), hint.desc.unwrap());
}
}
}