diff --git a/Cargo.lock b/Cargo.lock index cd04721..89daabf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,8 +83,7 @@ dependencies = [ [[package]] name = "awedio" version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e49f94356350dde8c54c5cbde90bdfa5642ab25f9c51c60a9ebdfdf3dbef87" +source = "git+https://github.com/jhbruhn/awedio.git?branch=symphonia-recover-errors#188442284efea9147e0a37e42edc6ae8ddee10af" dependencies = [ "log", "symphonia", diff --git a/Cargo.toml b/Cargo.toml index 1596afc..b1e79a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] reqwest = { version = "0.11", features = ["json", "stream", "native-tls-vendored"] } tokio = { version = "1", features = ["full"] } -awedio = { version = "0.3", features = ["symphonia-all", "async"], default-features = false } +awedio = { git = "https://github.com/jhbruhn/awedio.git", branch = "symphonia-recover-errors", features = ["symphonia-all", "async"], default-features = false } anyhow = "1.0" mp3lame-encoder = { git = "https://github.com/jhbruhn/mp3lame-encoder.git", branch = "arm" } tokio-stream = { version = "0.1", features = ["sync", "fs"] } diff --git a/src/jellyfin.rs b/src/jellyfin.rs index 00da0eb..46bff90 100644 --- a/src/jellyfin.rs +++ b/src/jellyfin.rs @@ -150,10 +150,12 @@ impl JellyfinClient { .map(|s| s.replace("\"", "")); let body = response.bytes().await?; - let decoder = Box::new(awedio::sounds::decoders::SymphoniaDecoder::new( + let decoder = awedio::sounds::decoders::SymphoniaDecoder::new( Box::new(symphonia::core::io::ReadOnlySource::new(body.reader())), extension.as_deref(), - )?); + )?; + + let decoder = Box::new(decoder); Ok(decoder) } diff --git a/src/main.rs b/src/main.rs index d2083c2..f37feb1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,7 +123,6 @@ async fn main() -> anyhow::Result<()> { let mut announce_downmix_player_controller = player_controller.clone(); tokio::task::spawn(async move { loop { - tokio::task::yield_now().await; player_controller.wait_for_queue().await; tracing::info!("Queuing song"); @@ -168,8 +167,6 @@ async fn main() -> anyhow::Result<()> { let time_file_map = get_time_file_map(&time_file_path).await; loop { - tokio::task::yield_now().await; - let fade_duration = Duration::from_secs(2); let fade_steps = 100; let fade_minimum_level = 0.1; @@ -224,7 +221,9 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Playing interstitial {:?}", next_path); - if let Ok(sound) = awedio::sounds::open_file(next_path.as_path()) { + if let Ok(sound) = + tokio::task::block_in_place(|| awedio::sounds::open_file(next_path.as_path())) + { let (sound, completion_notifier) = sound.with_async_completion_notifier(); for v in (fade_steps_min..=fade_steps_max).rev() { let volume = v as f32 / fade_steps as f32; diff --git a/src/player.rs b/src/player.rs index 6e84f0c..b0e5896 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,6 +1,8 @@ use awedio::NextSample; use awedio::Sound; use std::sync::mpsc; +use std::sync::Arc; +use tokio::sync::Notify; /// Heavily Based on awedios SoundList and Controllable implementations @@ -16,19 +18,18 @@ type Command = Box; pub struct PlayerControllable { inner: Player, command_receiver: mpsc::Receiver>, - queue_next_song_sender: tokio::sync::watch::Sender, + queue_next_song_notify: Arc, finished: bool, } pub struct PlayerController { command_sender: mpsc::Sender>, - queue_next_song_receiver: tokio::sync::watch::Receiver, + queue_next_song_notify: Arc, } impl Player { /// Create a new empty Player. pub fn new(song_prefetch: u32) -> (PlayerControllable, PlayerController) { - let (queue_next_song_sender, queue_next_song_receiver) = tokio::sync::watch::channel(false); let inner = Player { sounds: Vec::new(), was_empty: false, @@ -36,16 +37,18 @@ impl Player { volume_adjustment: 1.0, }; + let queue_next_song_notify = Arc::new(tokio::sync::Notify::new()); let (command_sender, command_receiver) = mpsc::channel::>(); let controllable = PlayerControllable { inner, - queue_next_song_sender, + queue_next_song_notify: queue_next_song_notify.clone(), command_receiver, finished: false, }; + let controller = PlayerController { command_sender, - queue_next_song_receiver, + queue_next_song_notify, }; (controllable, controller) @@ -64,7 +67,7 @@ impl Player { } fn should_prefetch(&self) -> bool { - self.sounds.len() < self.song_prefetch as usize + self.sounds.len() <= self.song_prefetch as usize } } @@ -128,6 +131,14 @@ impl Sound for Player { } } +impl PlayerControllable { + fn notify_next_song_queue_if_needed(&mut self) { + if self.inner.should_prefetch() { + self.queue_next_song_notify.notify_waiters(); + } + } +} + impl Sound for PlayerControllable { fn channel_count(&self) -> u16 { self.inner.channel_count() @@ -140,9 +151,12 @@ impl Sound for PlayerControllable { fn next_sample(&mut self) -> Result { let next = self.inner.next_sample()?; match next { - awedio::NextSample::Sample(_) - | awedio::NextSample::MetadataChanged - | awedio::NextSample::Paused => Ok(next), + awedio::NextSample::Sample(_) | awedio::NextSample::Paused => Ok(next), + awedio::NextSample::MetadataChanged => { + // Maybe a new track has started + self.notify_next_song_queue_if_needed(); + Ok(next) + } // Since this is controllable we might add another sound later. // Ideally we would do this only if the inner sound can have sounds // added to it but I don't think we can branch on S: AddSound here. @@ -169,15 +183,7 @@ impl Sound for PlayerControllable { } } } - - let _ = self.queue_next_song_sender.send_if_modified(|v| { - if *v != self.inner.should_prefetch() { - *v = self.inner.should_prefetch(); - return true; - } - false - }); - + self.notify_next_song_queue_if_needed(); self.inner.on_start_of_batch(); } } @@ -195,7 +201,7 @@ impl Clone for PlayerController { fn clone(&self) -> Self { Self { command_sender: self.command_sender.clone(), - queue_next_song_receiver: self.queue_next_song_receiver.clone(), + queue_next_song_notify: self.queue_next_song_notify.clone(), } } } @@ -210,6 +216,6 @@ impl PlayerController { } pub async fn wait_for_queue(&mut self) { - let _ = self.queue_next_song_receiver.wait_for(|v| *v == true).await; + self.queue_next_song_notify.notified().await; } } diff --git a/src/streamer.rs b/src/streamer.rs index 40e203a..7c285fd 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -14,7 +14,7 @@ use hyper::{Response, StatusCode}; const SAMPLE_RATE: u64 = 48000; const CHANNEL_COUNT: u64 = 2; -const BUFFER_SIZE: usize = 2000; // Should be an integer result of 48000 / 2 / x +const BUFFER_SIZE: usize = (SAMPLE_RATE / CHANNEL_COUNT / 10) as usize; // Should be an integer result of 48000 / 2 / x type Chunk = [i16; BUFFER_SIZE]; @@ -31,32 +31,31 @@ impl StreamerBackend { panic!("expected MetadataChanged event") }; - let (mut s, stream_receiver) = async_broadcast::broadcast(3); + let (mut s, stream_receiver) = async_broadcast::broadcast(10); s.set_overflow(true); tokio::spawn(async move { let mut stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( - Duration::from_millis(1000 * BUFFER_SIZE as u64 / CHANNEL_COUNT / SAMPLE_RATE), + Duration::from_millis((1000 * BUFFER_SIZE as u64) / CHANNEL_COUNT / SAMPLE_RATE), )) .map(move |_| { - let mut buffer = [0_i16; BUFFER_SIZE]; renderer.on_start_of_batch(); - tokio::task::block_in_place(|| { - buffer.fill_with(|| { - let sample = renderer - .next_sample() - .expect("renderer should never return an Error"); - let sample = match sample { - awedio::NextSample::Sample(s) => s, - awedio::NextSample::MetadataChanged => { - unreachable!("we never change metadata mid-batch") - } - awedio::NextSample::Paused => 0, - awedio::NextSample::Finished => 0, - }; - sample - }); + let mut buffer: [i16; 2400] = [0_i16; BUFFER_SIZE]; + buffer.fill_with(|| { + let sample = renderer + .next_sample() + .expect("renderer should never return an Error"); + let sample = match sample { + awedio::NextSample::Sample(s) => s, + awedio::NextSample::MetadataChanged => { + unreachable!("we never change metadata mid-batch") + } + awedio::NextSample::Paused => 0, + awedio::NextSample::Finished => 0, + }; + sample }); + Box::new(buffer) });