Compare commits
3 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ae68f5f71 | ||
|
|
2463b342e5 | ||
|
|
5f44d76bed |
4 changed files with 151 additions and 119 deletions
|
|
@ -7,6 +7,6 @@ edition = "2024"
|
|||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
toml = "0.9"
|
||||
cpal = "0.16"
|
||||
alsa = "0.9"
|
||||
rumqttc = "0.24"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
|
|
|||
25
Readme.md
25
Readme.md
|
|
@ -51,7 +51,30 @@ Alle 2 Sekunden wird ein JSON wie folgt gesendet:
|
|||
- Bei fehlender Verbindung wird automatisch reconnectet.
|
||||
- Fehler beim Senden werden geloggt, blockieren aber nicht den Hauptloop.
|
||||
|
||||
rc.local script to start LoudnessSpy on boot
|
||||
## Autostart
|
||||
|
||||
```bash
|
||||
sudo apt install tmux
|
||||
|
||||
nano ~/bashrc
|
||||
```
|
||||
|
||||
Am Ende hinzufügen
|
||||
|
||||
```bash
|
||||
if ! tmux has-session -t loudnessspy 2>/dev/null; then
|
||||
tmux new-session -d -s loudnessspy '~/loudnessspy'
|
||||
fi
|
||||
```
|
||||
|
||||
|
||||
### tmux Pane Control
|
||||
|
||||
```tmux attach```
|
||||
|
||||
Ctrl b, " Split pane horizontally \
|
||||
Ctrl b, % Split pane vertically \
|
||||
Ctrl b, o Next pane
|
||||
|
||||
---
|
||||
Autor: Leonhard Suckau
|
||||
|
|
|
|||
7
rc.local
7
rc.local
|
|
@ -1,7 +0,0 @@
|
|||
#!/bin/sh -e
|
||||
# rc.local
|
||||
|
||||
# Start LoudnessSpy
|
||||
/opt/loudnessspy/loudnessspy &
|
||||
|
||||
exit 0
|
||||
236
src/main.rs
236
src/main.rs
|
|
@ -1,4 +1,5 @@
|
|||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||
use alsa::pcm::{PCM, HwParams, Access, Format, State};
|
||||
use alsa::Card;
|
||||
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
|
@ -6,7 +7,11 @@ use tokio::time::sleep;
|
|||
use serde::Deserialize;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::time::Instant;
|
||||
use alsa::card::Iter;
|
||||
use alsa::device_name::HintIter;
|
||||
use alsa::ctl::{Ctl, DeviceIter};
|
||||
use alsa::{Direction, Error};
|
||||
|
||||
|
||||
|
||||
// RMS-Berechnung
|
||||
|
|
@ -50,126 +55,137 @@ async fn main() {
|
|||
mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password);
|
||||
let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10);
|
||||
|
||||
// Audio Setup
|
||||
let host = cpal::default_host();
|
||||
let device = host.default_input_device().expect("Kein Mikrofon gefunden");
|
||||
println!("Gefundene Eingabegeräte:");
|
||||
for (idx, dev) in host.input_devices().unwrap().enumerate() {
|
||||
println!("{}: {}", idx, dev.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
|
||||
}
|
||||
// Versuche, das erste Gerät mit "USB" oder "usb" im Namen zu finden, sonst Standardgerät
|
||||
let config = if let Some(usb_device) = host.input_devices().unwrap()
|
||||
.find(|dev| dev.name().map(|n| n.contains("USB") || n.contains("usb")).unwrap_or(false))
|
||||
{
|
||||
println!("Benutze USB-Gerät: {}", usb_device.name().unwrap_or_else(|_| "<Unbekannt>".to_string()));
|
||||
usb_device.default_input_config().unwrap()
|
||||
} else {
|
||||
device.default_input_config().unwrap()
|
||||
};
|
||||
|
||||
// Buffer für 2 Sekunden Samples
|
||||
let sample_rate = config.sample_rate().0 as usize;
|
||||
let channels = config.channels() as usize;
|
||||
let mut buffer = Arc::new(Mutex::new(Vec::with_capacity(sample_rate * channels * 2)));
|
||||
let buffer_clone = buffer.clone();
|
||||
// ALSA Setup
|
||||
// Verfügbare ALSA-Geräte auflisten
|
||||
println!("\n--- Hardware capture devices ---");
|
||||
list_hw_devices(Direction::Capture);
|
||||
println!("\n--- PCM capture devices ---");
|
||||
list_pcm_devices(Direction::Capture);
|
||||
|
||||
// Stream-Callback
|
||||
let err_fn = |err| eprintln!("Stream error: {}", err);
|
||||
let stream = match config.sample_format() {
|
||||
cpal::SampleFormat::F32 => device.build_input_stream(
|
||||
&config.into(),
|
||||
move |data: &[f32], _| {
|
||||
let mut buf = buffer_clone.lock().unwrap();
|
||||
buf.extend_from_slice(data);
|
||||
// Maximal 2 Sekunden im Buffer halten
|
||||
let max_len = sample_rate * channels * 2;
|
||||
let buf_len = buf.len();
|
||||
if buf_len > max_len {
|
||||
buf.drain(0..(buf_len - max_len));
|
||||
}
|
||||
},
|
||||
err_fn,
|
||||
None,
|
||||
),
|
||||
cpal::SampleFormat::I16 => device.build_input_stream(
|
||||
&config.into(),
|
||||
move |data: &[i16], _| {
|
||||
let mut buf = buffer_clone.lock().unwrap();
|
||||
buf.extend(data.iter().map(|&s| s as f32 / i16::MAX as f32));
|
||||
let max_len = sample_rate * channels * 2;
|
||||
let buf_len = buf.len();
|
||||
if buf_len > max_len {
|
||||
buf.drain(0..(buf_len - max_len));
|
||||
}
|
||||
},
|
||||
err_fn,
|
||||
None,
|
||||
),
|
||||
cpal::SampleFormat::U16 => device.build_input_stream(
|
||||
&config.into(),
|
||||
move |data: &[u16], _| {
|
||||
let mut buf = buffer_clone.lock().unwrap();
|
||||
buf.extend(data.iter().map(|&s| s as f32 / u16::MAX as f32 - 0.5));
|
||||
let max_len = sample_rate * channels * 2;
|
||||
let buf_len = buf.len();
|
||||
if buf_len > max_len {
|
||||
buf.drain(0..(buf_len - max_len));
|
||||
}
|
||||
},
|
||||
err_fn,
|
||||
None,
|
||||
),
|
||||
_ => unreachable!("Nicht unterstütztes SampleFormat"),
|
||||
}.expect("Stream konnte nicht gebaut werden");
|
||||
stream.play().expect("Stream konnte nicht gestartet werden");
|
||||
let device_name = find_first_usb_device(Direction::Capture).unwrap_or_else(|| "default".to_string());
|
||||
let sample_rate = 48000u32;
|
||||
let channels = 1u32;
|
||||
let pcm = PCM::new(&device_name, alsa::Direction::Capture, false).expect("Konnte PCM-Device nicht öffnen");
|
||||
let hwp = HwParams::any(&pcm).unwrap();
|
||||
hwp.set_channels(channels).unwrap();
|
||||
hwp.set_rate(sample_rate, alsa::ValueOr::Nearest).unwrap();
|
||||
hwp.set_format(Format::s16()).unwrap();
|
||||
hwp.set_access(Access::RWInterleaved).unwrap();
|
||||
pcm.hw_params(&hwp).unwrap();
|
||||
let io = pcm.io_i16().unwrap();
|
||||
let alsa_buffer_size = (sample_rate * 2) as usize; // 2 Sekunden Buffer
|
||||
let mut buffer: Vec<i16> = Vec::with_capacity(alsa_buffer_size);
|
||||
|
||||
// Alle 2 Sekunden RMS berechnen und per MQTT senden
|
||||
|
||||
// Hauptloop: Samples lesen, RMS berechnen, MQTT senden
|
||||
loop {
|
||||
println!("loopBegin");
|
||||
// 2 Sekunden warten, dabei regelmäßig eventloop.poll() aufrufen
|
||||
let wait_total = Duration::from_secs(2);
|
||||
let wait_step = Duration::from_millis(100);
|
||||
let wait_start = Instant::now();
|
||||
|
||||
println!("Vor dem eventloop.poll() während Wartezeit");
|
||||
let _ = eventloop.poll().await;
|
||||
println!("Nach dem eventloop.poll() während Wartezeit");
|
||||
|
||||
while wait_start.elapsed() < wait_total {
|
||||
sleep(wait_step).await;
|
||||
// PCM-Buffer füllen (2 Sekunden)
|
||||
buffer.clear();
|
||||
let mut read_samples = 0;
|
||||
while read_samples < alsa_buffer_size {
|
||||
let mut tmp = vec![0i16; alsa_buffer_size - read_samples];
|
||||
match io.readi(&mut tmp) {
|
||||
Ok(n) => {
|
||||
buffer.extend_from_slice(&tmp[..n]);
|
||||
read_samples += n;
|
||||
},
|
||||
Err(e) => {
|
||||
if pcm.state() == State::XRun {
|
||||
pcm.prepare().unwrap();
|
||||
continue;
|
||||
} else {
|
||||
eprintln!("ALSA read error: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Während Aufnahme MQTT-Eventloop pollen
|
||||
let _ = eventloop.poll().await;
|
||||
}
|
||||
println!("Nach 2 Sekunden Wartezeit");
|
||||
|
||||
let start = Instant::now();
|
||||
println!("Vor dem Lock");
|
||||
let buf = loop {
|
||||
println!("Versuche Buffer zu sperren");
|
||||
if let Ok(guard) = buffer.lock() {
|
||||
println!("Buffer gesperrt nach {:?}", start.elapsed());
|
||||
// Clone the buffer data to avoid holding the lock during await
|
||||
break guard.clone();
|
||||
}
|
||||
if start.elapsed() > Duration::from_secs(2) {
|
||||
eprintln!("Konnte Buffer nicht innerhalb des Timeouts (2 Sekunden) sperren");
|
||||
continue;
|
||||
}
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
};
|
||||
println!("Nach dem Lock, Buffer-Länge: {}", buf.len());
|
||||
if buf.len() >= sample_rate * channels * 2 {
|
||||
let rms_val = rms(&buf);
|
||||
// RMS berechnen und senden
|
||||
if !buffer.is_empty() {
|
||||
let samples_f32: Vec<f32> = buffer.iter().map(|&s| s as f32 / i16::MAX as f32).collect();
|
||||
let rms_val = rms(&samples_f32);
|
||||
let payload = format!("{{\"rms\":{}}}", rms_val);
|
||||
println!("Payload: {}", payload);
|
||||
// Buffer lock is dropped before this await
|
||||
match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await {
|
||||
Ok(Ok(_)) => println!("Payload gesendet"),
|
||||
Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"),
|
||||
Err(_) => eprintln!("Timeout beim Senden an MQTT (keine Verbindung?)"),
|
||||
Err(_) => eprintln!(
|
||||
"Timeout beim Senden an MQTT (keine Verbindung zu {}:{})",
|
||||
mqtt_config.host, mqtt_config.port
|
||||
),
|
||||
}
|
||||
}
|
||||
println!("Vor dem eventloop.poll()");
|
||||
// MQTT Eventloop weiterlaufen lassen
|
||||
// Nach jedem Zyklus noch einmal eventloop pollen
|
||||
let _ = eventloop.poll().await;
|
||||
println!("loopEnde")
|
||||
}
|
||||
}
|
||||
|
||||
// Each card can have multiple devices and subdevDurationt them all
|
||||
fn list_devices_for_card(card: &Card, direction: Direction) -> Result<bool, Error>{
|
||||
// Get a Ctl for the card
|
||||
let ctl_id = format!("hw:{}", card.get_index());
|
||||
let ctl = Ctl::new(&ctl_id, false)?;
|
||||
|
||||
// Read card id and name
|
||||
let cardinfo = ctl.card_info()?;
|
||||
let card_id = cardinfo.get_id()?;
|
||||
let card_name = cardinfo.get_name()?;
|
||||
for device in DeviceIter::new(&ctl) {
|
||||
// Read info from Ctl
|
||||
let pcm_info = ctl.pcm_info(device as u32, 0, direction)?;
|
||||
|
||||
// Read PCM name
|
||||
let pcm_name = pcm_info.get_name()?.to_string();
|
||||
|
||||
println!("card: {:<2} id: {:<10} device: {:<2} card name: '{}' PCM name: '{}'", card.get_index(), card_id, device, card_name, pcm_name);
|
||||
|
||||
// Loop through subdevices and get their names
|
||||
let subdevs = pcm_info.get_subdevices_count();
|
||||
for subdev in 0..subdevs {
|
||||
// Get subdevice name
|
||||
let pcm_info = ctl.pcm_info(device as u32, subdev, direction)?;
|
||||
let subdev_name = pcm_info.get_subdevice_name()?;
|
||||
|
||||
println!(" subdevice: {:<2} name: '{}'", subdev, subdev_name);
|
||||
}
|
||||
}
|
||||
|
||||
if card_name.to_lowercase().contains("usb") {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub fn list_hw_devices(direction: Direction) {
|
||||
let cards = Iter::new();
|
||||
cards.for_each(|card| if let Ok(c) = card { list_devices_for_card(&c, direction).unwrap_or_default(); });
|
||||
}
|
||||
|
||||
pub fn find_first_usb_device(direction: Direction) -> Option<String> {
|
||||
let cards = Iter::new();
|
||||
for card in cards {
|
||||
if let Ok(c) = card {
|
||||
if let Ok(true) = list_devices_for_card(&c, direction) {
|
||||
// Build device name string, e.g., "hw:1,0"
|
||||
let index = c.get_index();
|
||||
return Some(format!("hw:{},0", index));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn list_pcm_devices(direction: Direction) {
|
||||
let hints = HintIter::new_str(None, "pcm").unwrap();
|
||||
for hint in hints {
|
||||
// When Direction is None it means that both the PCM supports both playback and capture
|
||||
if hint.name.is_some() && hint.desc.is_some() && (hint.direction.is_none() || hint.direction.map(|dir| dir == direction).unwrap_or_default()) {
|
||||
println!("name: {:<35} desc: {:?}", hint.name.unwrap(), hint.desc.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue