diff --git a/.gitignore b/.gitignore index ea8c4bf..af82612 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +interstitials \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b8059e3..8c528dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,21 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anyhow" version = "1.0.81" @@ -64,6 +79,7 @@ checksum = "c1e49f94356350dde8c54c5cbde90bdfa5642ab25f9c51c60a9ebdfdf3dbef87" dependencies = [ "log", "symphonia", + "tokio", ] [[package]] @@ -129,6 +145,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.52.4", +] + [[package]] name = "concurrent-queue" version = "2.4.0" @@ -160,6 +190,12 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -338,6 +374,17 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.28.1" @@ -515,6 +562,29 @@ dependencies = [ "tokio", ] +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.5.0" @@ -541,6 +611,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -555,13 +634,16 @@ dependencies = [ "async-broadcast", "awedio", "bytes", + "chrono", "envconfig", "future-bool", "futures-util", "http-body-util", "hyper 1.2.0", "hyper-util", + "itertools", "mp3lame-encoder", + "rand", "reqwest", "serde", "symphonia", @@ -682,6 +764,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num-traits" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -814,6 +905,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.79" @@ -832,6 +929,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1542,6 +1669,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.4", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 6bf5d18..3a437b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,10 @@ edition = "2021" [dependencies] reqwest = { version = "0.11", features = ["json", "stream", "native-tls-vendored"] } tokio = { version = "1", features = ["full"] } -awedio = { version = "0.3", features = ["symphonia-all"], default-features = false } +awedio = { version = "0.3", features = ["symphonia-all", "async"], default-features = false } anyhow = "1.0" mp3lame-encoder = { git = "https://github.com/jhbruhn/mp3lame-encoder.git", branch = "arm" } -tokio-stream = { version = "0.1", features = ["sync"] } +tokio-stream = { version = "0.1", features = ["sync", "fs"] } futures-util = "0.3" hyper = { version = "1.2", features = ["server", "http1"] } hyper-util = {version = "0.1", features = ["tokio"] } @@ -22,3 +22,6 @@ future-bool = "0.1" serde = { version = "1.0", features = ["derive"] } symphonia = { version = "0.5.4", features = ["all"] } envconfig = "0.10" +chrono = "0.4.35" +itertools = "0.12.1" +rand = "0.8.5" diff --git a/src/main.rs b/src/main.rs index 2ad886f..7a237ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use awedio::Sound; use envconfig::Envconfig; use hyper::server::conn::http1; use hyper_util::rt::TokioIo; @@ -29,6 +30,51 @@ struct Config { pub song_prefetch: u32, } +async fn get_time_file_map( + folder: &std::path::Path, +) -> std::collections::HashMap> { + // time announce logic + let time_files: Vec = std::fs::read_dir(folder) + .unwrap() + .filter_map(|v| v.ok()) + .filter(|v| !v.path().is_dir()) + .map(|v| v.path().clone()) + .collect(); + + let mut time_map = std::collections::HashMap::new(); + for path in time_files.iter() { + if let Ok((time, path)) = async { + let mut name_split = path + .file_stem() + .ok_or(anyhow::anyhow!("Wrong file stem!"))? + .to_str() + .ok_or(anyhow::anyhow!("Wrong file path!"))? + .split("_"); + let hour = name_split + .next() + .ok_or(anyhow::anyhow!("No hour!"))? + .parse() + .unwrap(); + let minute = name_split + .next() + .ok_or(anyhow::anyhow!("No minute!"))? + .parse() + .unwrap(); + let time = chrono::NaiveTime::from_hms_opt(hour, minute, 0) + .ok_or(anyhow::anyhow!("Can't parse time"))?; + Ok::<_, anyhow::Error>((time, path)) + } + .await + { + time_map + .entry(time) + .or_insert_with(Vec::new) + .push(path.clone()); + } + } + time_map +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let config = Config::init_from_env().unwrap(); @@ -58,12 +104,19 @@ async fn main() -> anyhow::Result<()> { let (streamer_backend, mut streamer_manager) = streamer::StreamerBackend::start()?; + let (mixer, mixer_controller) = awedio::sounds::SoundMixer::new(2, 48_000).controllable(); + // basic playlist playback + + let (player, mut player_controller) = player::Player::new(config.song_prefetch); + let player = Box::new(player); + + let mut player_mixer_controller = mixer_controller.clone(); + player_mixer_controller.add(player); + let mut announce_downmix_player_controller = player_controller.clone(); tokio::task::spawn(async move { - let (player, mut controller) = player::Player::new(config.song_prefetch); - let player = Box::new(player); - streamer_manager.play(player); loop { - controller.wait_for_queue().await; + tokio::task::yield_now().await; + player_controller.wait_for_queue().await; println!("Queuing song"); @@ -77,9 +130,9 @@ async fn main() -> anyhow::Result<()> { let sound = client.fetch_audio(item).await?; println!("Fetched Song!"); if sound.channel_count() > 2 { - anyhow::bail!("Too many channels"); + anyhow::bail!("Too many channels, skipping!"); } - controller.add(Box::new(sound)); + player_controller.add(Box::new(sound)); anyhow::Ok(()) } .await; @@ -93,6 +146,88 @@ async fn main() -> anyhow::Result<()> { } }); + let mut time_announce_mixer_controller = mixer_controller.clone(); + + tokio::task::spawn(async move { + let time_file_map = + get_time_file_map(&std::path::PathBuf::from("./interstitials/time")).await; + loop { + tokio::task::yield_now().await; + + let fade_duration = Duration::from_secs(2); + let fade_steps = 100; + let fade_minimum_level = 0.1; + + let fade_steps_max = fade_steps; + let fade_steps_min = (fade_minimum_level * fade_steps as f32) as u32; + + let now = chrono::Local::now(); + use itertools::Itertools; + let next_time = if let Some(next_time) = time_file_map + .keys() + .sorted() + .filter(|k| **k > now.time()) + .next() + { + Some(next_time) + } else { + time_file_map.keys().sorted().next() + }; + + if let Some(next_time) = next_time { + let paths = time_file_map.get(next_time).unwrap(); // definitely exists, we just did the math + use rand::seq::SliceRandom; + let next_path = paths.choose(&mut rand::thread_rng()); + if next_path.is_none() { + continue; + } + let next_path = next_path.unwrap(); + + let interstitial_time = if *next_time > now.time() { + now.date_naive() + .and_time(*next_time) + .and_local_timezone(chrono::Local) + } else { + now.date_naive() + .checked_add_days(chrono::Days::new(1)) + .unwrap() + .and_time(*next_time) + .and_local_timezone(chrono::Local) + } + .unwrap(); + + println!("Next Internstitial time {}", interstitial_time); + + tokio::time::sleep_until( + tokio::time::Instant::now() + (interstitial_time - now).to_std().unwrap(), + ) + .await; + + println!("Internstitial time {}", interstitial_time); + + for v in (fade_steps_min..=fade_steps_max).rev() { + let volume = v as f32 / fade_steps as f32; + announce_downmix_player_controller.set_volume(volume); + tokio::time::sleep(fade_duration / (fade_steps_max - fade_steps_min)).await; + } + + let (sound, completion_notifier) = awedio::sounds::open_file(next_path.as_path()) + .unwrap() + .with_async_completion_notifier(); + time_announce_mixer_controller.add(Box::new(sound)); + let _ = completion_notifier.await; + + for v in fade_steps_min..=fade_steps_max { + let volume = v as f32 / fade_steps as f32; + announce_downmix_player_controller.set_volume(volume); + tokio::time::sleep(fade_duration / (fade_steps_max - fade_steps_min)).await; + } + } + } + }); + + streamer_manager.play(Box::new(mixer)); + let listener = TcpListener::bind(addr).await?; println!("Listening on http://{}", addr); loop { @@ -100,6 +235,8 @@ async fn main() -> anyhow::Result<()> { let io = TokioIo::new(tcp); let backend = streamer_backend.clone(); + println!("New connection!"); + tokio::task::spawn(async move { if let Err(err) = http1::Builder::new().serve_connection(io, backend).await { println!("Error serving connection: {:?}", err); diff --git a/src/player.rs b/src/player.rs index bcb2175..75b9e6e 100644 --- a/src/player.rs +++ b/src/player.rs @@ -8,6 +8,7 @@ pub struct Player { sounds: Vec>, was_empty: bool, song_prefetch: u32, + volume_adjustment: f32, } type Command = Box; @@ -15,23 +16,24 @@ type Command = Box; pub struct PlayerControllable { inner: Player, command_receiver: mpsc::Receiver>, - queue_next_song_sender: tokio::sync::mpsc::Sender<()>, + queue_next_song_sender: tokio::sync::watch::Sender, finished: bool, } pub struct PlayerController { command_sender: mpsc::Sender>, - queue_next_song_receiver: tokio::sync::mpsc::Receiver<()>, + queue_next_song_receiver: tokio::sync::watch::Receiver, } impl Player { /// Create a new empty Player. pub fn new(song_prefetch: u32) -> (PlayerControllable, PlayerController) { - let (queue_next_song_sender, queue_next_song_receiver) = tokio::sync::mpsc::channel(1); + let (queue_next_song_sender, queue_next_song_receiver) = tokio::sync::watch::channel(false); let inner = Player { sounds: Vec::new(), was_empty: false, song_prefetch, + volume_adjustment: 1.0, }; let (command_sender, command_receiver) = mpsc::channel::>(); @@ -57,8 +59,12 @@ impl Player { self.sounds.push(sound); } - fn last_song_playing_or_empty(&self) -> bool { - self.sounds.len() <= self.song_prefetch as usize + fn set_volume(&mut self, new: f32) { + self.volume_adjustment = new; + } + + fn should_prefetch(&self) -> bool { + self.sounds.len() < self.song_prefetch as usize } } @@ -95,15 +101,19 @@ impl Sound for Player { self.was_empty = false; return Ok(NextSample::MetadataChanged); } - + let next_sample = next_sound.next_sample(); if let Err(e) = &next_sample { println!("Error playing track: {:?}", e); } let ret = match next_sample { - Ok(NextSample::Sample(_) | NextSample::MetadataChanged | NextSample::Paused) => next_sample.unwrap(), - Ok(NextSample::Finished) | Err(_) => { // Just ignore the error + Ok(NextSample::Sample(s)) => { + NextSample::Sample((s as f32 * self.volume_adjustment) as i16) + } + Ok(NextSample::MetadataChanged | NextSample::Paused) => next_sample.unwrap(), + Ok(NextSample::Finished) | Err(_) => { + // Just ignore the error self.sounds.remove(0); if self.sounds.is_empty() { NextSample::Finished @@ -159,9 +169,15 @@ impl Sound for PlayerControllable { } } } - if self.inner.last_song_playing_or_empty() { - let _ = self.queue_next_song_sender.try_send(()); - } + + let _ = self.queue_next_song_sender.send_if_modified(|v| { + if *v != self.inner.should_prefetch() { + *v = self.inner.should_prefetch(); + return true; + } + false + }); + self.inner.on_start_of_batch(); } } @@ -175,12 +191,25 @@ impl PlayerController { } } +impl Clone for PlayerController { + fn clone(&self) -> Self { + Self { + command_sender: self.command_sender.clone(), + queue_next_song_receiver: self.queue_next_song_receiver.clone(), + } + } +} + impl PlayerController { pub fn add(&mut self, sound: Box) { self.send_command(Box::new(|s: &mut Player| s.add(sound))); } + pub fn set_volume(&mut self, new: f32) { + self.send_command(Box::new(move |s: &mut Player| s.set_volume(new))); + } + pub async fn wait_for_queue(&mut self) { - self.queue_next_song_receiver.recv().await; + let _ = self.queue_next_song_receiver.wait_for(|v| *v == true).await; } } diff --git a/src/streamer.rs b/src/streamer.rs index 722e530..686d9b5 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -11,11 +11,12 @@ use hyper::service::Service; use hyper::{body, Request}; use hyper::{Response, StatusCode}; -const SEGMENT_INTERVAL: u64 = 2; // seconds const SAMPLE_RATE: u64 = 48000; const CHANNEL_COUNT: u64 = 2; -type Chunk = [i16; (SEGMENT_INTERVAL * SAMPLE_RATE * CHANNEL_COUNT) as usize]; +const BUFFER_SIZE: usize = 2000; // Should be an integer result of 48000 / 2 / x + +type Chunk = [i16; BUFFER_SIZE]; pub struct StreamerBackend { stream_receiver: Receiver>, @@ -35,10 +36,10 @@ impl StreamerBackend { tokio::spawn(async move { let mut stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( - Duration::from_secs(SEGMENT_INTERVAL), + Duration::from_millis(1000 * BUFFER_SIZE as u64 / CHANNEL_COUNT / SAMPLE_RATE), )) .map(move |_| { - let mut buffer = [0_i16; (SAMPLE_RATE * SEGMENT_INTERVAL * CHANNEL_COUNT) as usize]; + let mut buffer = [0_i16; BUFFER_SIZE]; renderer.on_start_of_batch(); buffer.fill_with(|| { let sample = renderer