From 095ed43c842c77f5a85c87cb99cc3c342cd60da7 Mon Sep 17 00:00:00 2001 From: audioprog Date: Fri, 29 Aug 2025 21:17:51 +0200 Subject: [PATCH] Initial commit for loudnessspy project with audio recording and MQTT integration. Added Cargo.toml for dependencies, workspace configuration, and main application logic for capturing audio input, calculating RMS, and publishing to MQTT. --- .gitignore | 23 +++++ .vscode/launch.json | 46 ++++++++++ Cargo.toml | 12 +++ loudnessspy.code-workspace | 7 ++ src/main.rs | 175 +++++++++++++++++++++++++++++++++++++ 5 files changed, 263 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/launch.json create mode 100644 Cargo.toml create mode 100644 loudnessspy.code-workspace create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bd3a069 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Generated by Cargo +# will have compiled files and executables +debug +target + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# Generated by cargo mutants +# Contains mutation testing data +**/mutants.out*/ + +# RustRover +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ +config.toml +Cargo.lock diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..bf49de8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,46 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'loudnessspy'", + "cargo": { + "args": [ + "build", + "--bin=loudnessspy", + "--package=loudnessspy" + ], + "filter": { + "name": "loudnessspy", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'loudnessspy'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bin=loudnessspy", + "--package=loudnessspy" + ], + "filter": { + "name": "loudnessspy", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..86734e3 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +# Audioaufnahme und MQTT +[package] +name = "loudnessspy" +version = "0.1.0" +edition = "2024" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +toml = "0.9" +cpal = "0.16" +rumqttc = "0.24" +tokio = { version = "1", features = ["full"] } diff --git a/loudnessspy.code-workspace b/loudnessspy.code-workspace new file mode 100644 index 0000000..362d7c2 --- /dev/null +++ b/loudnessspy.code-workspace @@ -0,0 +1,7 @@ +{ + "folders": [ + { + "path": "." + } + ] +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3a19cb5 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,175 @@ +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::time::sleep; +use serde::Deserialize; +use std::fs; +use std::path::Path; +use std::time::Instant; + + +// RMS-Berechnung +fn rms(samples: &[f32]) -> f32 { + let sum: f32 = samples.iter().map(|s| s * s).sum(); + (sum / samples.len() as f32).sqrt() +} + +#[derive(Deserialize)] +struct Config { + host: String, + port: u16, + username: String, + password: String, + client_id: String, + topic: String, +} + +fn load_config() -> Config { + let path = "config.toml"; + println!("Konfigurationsdateipfad: {}", fs::canonicalize(path).unwrap_or_else(|_| Path::new(path).to_path_buf()).display()); + if !Path::new(path).exists() { + let default = "host = \"localhost\"\nport = 1883\nusername = \"user\"\npassword = \"pass\"\nclient_id = \"loudnessspy\"\ntopic = \"loudnessspy/volume\"\n"; + fs::write(path, default).expect("Konnte Default-Konfiguration nicht schreiben"); + eprintln!("Konfigurationsdatei 'config.toml' wurde mit Defaultwerten erstellt. Bitte anpassen und Programm neu starten."); + std::process::exit(1); + } + let content = fs::read_to_string(path).expect("Konnte config.toml nicht lesen"); + toml::from_str(&content).expect("Konnte config.toml nicht parsen. Bitte prüfen!") +} + + +#[tokio::main] +async fn main() { + // Konfiguration laden + let mqtt_config = load_config(); + + // MQTT Setup + let mut mqttoptions = MqttOptions::new(&mqtt_config.client_id, &mqtt_config.host, mqtt_config.port); + mqttoptions.set_keep_alive(Duration::from_secs(30)); + mqttoptions.set_credentials(mqtt_config.username, mqtt_config.password); + let (mqtt, mut eventloop) = AsyncClient::new(mqttoptions, 10); + + // Audio Setup + let host = cpal::default_host(); + let device = host.default_input_device().expect("Kein Mikrofon gefunden"); + println!("Gefundene Eingabegeräte:"); + for (idx, dev) in host.input_devices().unwrap().enumerate() { + println!("{}: {}", idx, dev.name().unwrap_or_else(|_| "".to_string())); + } + // Versuche, das erste Gerät mit "USB" oder "usb" im Namen zu finden, sonst Standardgerät + let config = if let Some(usb_device) = host.input_devices().unwrap() + .find(|dev| dev.name().map(|n| n.contains("USB") || n.contains("usb")).unwrap_or(false)) + { + println!("Benutze USB-Gerät: {}", usb_device.name().unwrap_or_else(|_| "".to_string())); + usb_device.default_input_config().unwrap() + } else { + device.default_input_config().unwrap() + }; + + // Buffer für 2 Sekunden Samples + let sample_rate = config.sample_rate().0 as usize; + let channels = config.channels() as usize; + let mut buffer = Arc::new(Mutex::new(Vec::with_capacity(sample_rate * channels * 2))); + let buffer_clone = buffer.clone(); + + // Stream-Callback + let err_fn = |err| eprintln!("Stream error: {}", err); + let stream = match config.sample_format() { + cpal::SampleFormat::F32 => device.build_input_stream( + &config.into(), + move |data: &[f32], _| { + let mut buf = buffer_clone.lock().unwrap(); + buf.extend_from_slice(data); + // Maximal 2 Sekunden im Buffer halten + let max_len = sample_rate * channels * 2; + let buf_len = buf.len(); + if buf_len > max_len { + buf.drain(0..(buf_len - max_len)); + } + }, + err_fn, + None, + ), + cpal::SampleFormat::I16 => device.build_input_stream( + &config.into(), + move |data: &[i16], _| { + let mut buf = buffer_clone.lock().unwrap(); + buf.extend(data.iter().map(|&s| s as f32 / i16::MAX as f32)); + let max_len = sample_rate * channels * 2; + let buf_len = buf.len(); + if buf_len > max_len { + buf.drain(0..(buf_len - max_len)); + } + }, + err_fn, + None, + ), + cpal::SampleFormat::U16 => device.build_input_stream( + &config.into(), + move |data: &[u16], _| { + let mut buf = buffer_clone.lock().unwrap(); + buf.extend(data.iter().map(|&s| s as f32 / u16::MAX as f32 - 0.5)); + let max_len = sample_rate * channels * 2; + let buf_len = buf.len(); + if buf_len > max_len { + buf.drain(0..(buf_len - max_len)); + } + }, + err_fn, + None, + ), + _ => unreachable!("Nicht unterstütztes SampleFormat"), + }.expect("Stream konnte nicht gebaut werden"); + stream.play().expect("Stream konnte nicht gestartet werden"); + + // Alle 2 Sekunden RMS berechnen und per MQTT senden + loop { + println!("loopBegin"); + // 2 Sekunden warten, dabei regelmäßig eventloop.poll() aufrufen + let wait_total = Duration::from_secs(2); + let wait_step = Duration::from_millis(100); + let wait_start = Instant::now(); + + println!("Vor dem eventloop.poll() während Wartezeit"); + let _ = eventloop.poll().await; + println!("Nach dem eventloop.poll() während Wartezeit"); + + while wait_start.elapsed() < wait_total { + sleep(wait_step).await; + } + println!("Nach 2 Sekunden Wartezeit"); + + let start = Instant::now(); + println!("Vor dem Lock"); + let buf = loop { + println!("Versuche Buffer zu sperren"); + if let Ok(guard) = buffer.lock() { + println!("Buffer gesperrt nach {:?}", start.elapsed()); + // Clone the buffer data to avoid holding the lock during await + break guard.clone(); + } + if start.elapsed() > Duration::from_secs(2) { + eprintln!("Konnte Buffer nicht innerhalb des Timeouts (2 Sekunden) sperren"); + continue; + } + std::thread::sleep(Duration::from_millis(10)); + }; + println!("Nach dem Lock, Buffer-Länge: {}", buf.len()); + if buf.len() >= sample_rate * channels * 2 { + let rms_val = rms(&buf); + let payload = format!("{{\"rms\":{}}}", rms_val); + println!("Payload: {}", payload); + // Buffer lock is dropped before this await + match tokio::time::timeout(Duration::from_secs(3), mqtt.publish(&mqtt_config.topic, QoS::AtLeastOnce, false, payload)).await { + Ok(Ok(_)) => println!("Payload gesendet"), + Ok(Err(e)) => eprintln!("Fehler beim Senden an MQTT: {e}"), + Err(_) => eprintln!("Timeout beim Senden an MQTT (keine Verbindung?)"), + } + } + println!("Vor dem eventloop.poll()"); + // MQTT Eventloop weiterlaufen lassen + let _ = eventloop.poll().await; + println!("loopEnde") + } +}