Initial commit

This commit is contained in:
Jan-Henrik Bruhn 2024-03-17 20:22:12 +01:00 committed by Jan-Henrik Bruhn
commit 186edf9620
10 changed files with 2347 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

5
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,5 @@
{
"rust-analyzer.linkedProjects": [
".\\Cargo.toml"
]
}

1675
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

24
Cargo.toml Normal file
View file

@ -0,0 +1,24 @@
[package]
name = "jellyfin-radio"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
reqwest = { version = "0.11", features = ["json", "stream"] }
tokio = { version = "1", features = ["full"] }
awedio = { version = "0.3", features = ["symphonia-all"], default-features = false }
anyhow = "1.0"
mp3lame-encoder = { git = "https://github.com/jhbruhn/mp3lame-encoder.git", branch = "send-sync" }
tokio-stream = { version = "0.1", features = ["sync"] }
futures-util = "0.3"
hyper = { version = "1.2", features = ["server", "http1"] }
hyper-util = {version = "0.1", features = ["tokio"] }
bytes = "1.5"
http-body-util = "0.1"
async-broadcast = "0.7"
future-bool = "0.1"
serde = { version = "1.0", features = ["derive"] }
symphonia = { version = "0.5.4", features = ["all"] }
envconfig = "0.10"

37
Dockerfile Normal file
View file

@ -0,0 +1,37 @@
FROM --platform=$BUILDPLATFORM rust:1.75 as cross
ARG TARGETARCH
COPY docker/platform.sh .
RUN ./platform.sh # should write /.platform and /.compiler
RUN rustup target add $(cat /.platform)
RUN apt update && apt-get install -y unzip $(cat /.compiler)
WORKDIR ./jellyfin-radio
ADD . ./
RUN cargo build --release --target $(cat /.platform)
RUN cp ./target/$(cat /.platform)/release/jellyfin-radio /jellyfin-radio.bin # Get rid of this when build --out is stable
FROM debian:bookworm-slim
ARG APP=/usr/src/app
RUN apt-get update \
&& apt-get install -y ca-certificates tzdata \
&& rm -rf /var/lib/apt/lists/*
EXPOSE 3000
ENV TZ=Etc/UTC \
APP_USER=appuser
RUN groupadd $APP_USER \
&& useradd -g $APP_USER $APP_USER \
&& mkdir -p ${APP}
COPY --from=cross /jellyfin-radio.bin ${APP}/jellyfin-radio
RUN chown -R $APP_USER:$APP_USER ${APP}
USER $APP_USER
WORKDIR ${APP}
CMD ["./jellyfin-radio"]

19
docker/platform.sh Normal file
View file

@ -0,0 +1,19 @@
#!/bin/bash
# Used in Docker build to set platform dependent variables
case $TARGETARCH in
"amd64")
echo "x86_64-unknown-linux-gnu" > /.platform
echo "" > /.compiler
;;
"arm64")
echo "aarch64-unknown-linux-gnu" > /.platform
echo "gcc-aarch64-linux-gnu" > /.compiler
;;
"arm")
echo "armv7-unknown-linux-gnueabihf" > /.platform
echo "gcc-arm-linux-gnueabihf" > /.compiler
;;
esac

160
src/jellyfin.rs Normal file
View file

@ -0,0 +1,160 @@
use bytes::Buf;
use serde::Deserialize;
pub struct JellyfinClient {
api_token: String,
base_url: String,
client: reqwest::Client,
}
#[derive(Deserialize)]
pub struct Audio {
#[serde(rename(deserialize = "Id"))]
pub id: String,
#[serde(rename(deserialize = "Name"))]
pub name: String,
#[serde(rename(deserialize = "Artists"))]
pub artists: Vec<String>,
}
#[derive(Deserialize)]
pub struct View {
#[serde(rename(deserialize = "Name"))]
pub name: String,
#[serde(rename(deserialize = "Id"))]
pub id: String,
#[serde(rename(deserialize = "CollectionType"))]
pub collection_type: String,
}
#[derive(Deserialize)]
pub struct User {
#[serde(rename(deserialize = "Name"))]
pub name: String,
#[serde(rename(deserialize = "Id"))]
pub id: String,
#[serde(rename(deserialize = "Policy"))]
pub policy: UserPolicy,
}
#[derive(Deserialize)]
pub struct UserPolicy {
#[serde(rename(deserialize = "IsAdministrator"))]
pub is_administrator: bool,
}
impl JellyfinClient {
pub fn new(base_url: String, api_token: String) -> Self {
Self {
base_url,
api_token,
client: reqwest::Client::new(),
}
}
pub async fn users(&self) -> anyhow::Result<Vec<User>> {
let url = format!("{}/Users", self.base_url);
let response: Vec<User> = self
.client
.get(url)
.header(
"Authorization",
format!("MediaBrowser Token=\"{}\"", self.api_token),
)
.send()
.await?
.json()
.await?;
Ok(response)
}
pub async fn views(&self, user_id: &str) -> anyhow::Result<Vec<View>> {
#[derive(Deserialize)]
struct ViewList {
#[serde(rename(deserialize = "Items"))]
items: Vec<View>,
}
let url = format!("{}/Users/{user_id}/Views", self.base_url);
let response: ViewList = self
.client
.get(url)
.header(
"Authorization",
format!("MediaBrowser Token=\"{}\"", self.api_token),
)
.send()
.await?
.json()
.await?;
Ok(response.items)
}
pub async fn random_audio(&self, user_id: &str, collection_id: &str) -> anyhow::Result<Audio> {
#[derive(Deserialize)]
struct AudioList {
#[serde(rename(deserialize = "Items"))]
items: Vec<Audio>,
}
let url = format!("{}/Users/{user_id}/Items", self.base_url);
let mut response: AudioList = self
.client
.get(url)
.query(&[
("ParentId", collection_id),
("Filters", "IsNotFolder"),
("Recursive", "true"),
("SortBy", "Random"),
("MediaTypes", "Audio"),
("Limit", "1"),
("ExcludeLocationTypes", "Virtual"),
("CollapseBoxSetItems", "false"),
])
.header(
"Authorization",
format!("MediaBrowser Token=\"{}\"", self.api_token),
)
.send()
.await?
.json()
.await?;
response.items.pop().ok_or(anyhow::anyhow!("No item found"))
}
pub async fn fetch_audio(&self, audio: Audio) -> anyhow::Result<Box<dyn awedio::Sound>> {
let url = format!("{}/Items/{}/Download", self.base_url, audio.id);
let response = self
.client
.get(url)
.header(
"Authorization",
format!("MediaBrowser Token=\"{}\"", self.api_token),
)
.send()
.await?;
let filename = response
.headers()
.get(reqwest::header::CONTENT_DISPOSITION)
.and_then(|v| v.to_str().ok())
.map(|v| v.split(";").into_iter())
.map(|i| {
i.filter(|v| v.contains("filename="))
.map(|v| v.split("=").collect::<Vec<&str>>()[1])
.next()
})
.unwrap();
let extension = filename
.and_then(|v| v.rsplit(".").next())
.map(String::from)
.map(|s| s.replace("\"", ""));
let body = response.bytes().await?;
let decoder = Box::new(awedio::sounds::decoders::SymphoniaDecoder::new(
Box::new(symphonia::core::io::ReadOnlySource::new(body.reader())),
extension.as_deref(),
)?);
Ok(decoder)
}
}

106
src/main.rs Normal file
View file

@ -0,0 +1,106 @@
use envconfig::Envconfig;
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use std::{net::SocketAddr, time::Duration};
use tokio::net::TcpListener;
mod jellyfin;
mod player;
mod streamer;
#[derive(Envconfig, Clone)]
struct Config {
#[envconfig(from = "JELLYFIN_URL")]
pub jellyfin_url: String,
#[envconfig(from = "JELLYFIN_API_KEY")]
pub jellyfin_api_key: String,
#[envconfig(from = "JELLYFIN_COLLECTION_NAME")]
pub jellyfin_collection_name: String,
#[envconfig(from = "PORT", default = "3000")]
pub port: u16,
#[envconfig(from = "HOST", default = "0.0.0.0")]
pub host: String,
#[envconfig(from = "SONG_PREFETCH", default = "2")]
pub song_prefetch: u32,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = Config::init_from_env().unwrap();
let client =
jellyfin::JellyfinClient::new(config.jellyfin_url.into(), config.jellyfin_api_key.into());
let admin_user = client
.users()
.await?
.into_iter()
.filter(|u| u.policy.is_administrator)
.next()
.expect("No Admin user found!");
let matched_collection = client
.views(&admin_user.id)
.await?
.into_iter()
.filter(|c| c.name == config.jellyfin_collection_name)
.next()
.expect("Collection not found!");
let addr: SocketAddr = SocketAddr::from((
config.host.parse::<std::net::Ipv4Addr>().unwrap(),
config.port,
));
let (streamer_backend, mut streamer_manager) = streamer::StreamerBackend::start()?;
tokio::task::spawn(async move {
let (player, mut controller) = player::Player::new(config.song_prefetch);
let player = Box::new(player);
streamer_manager.play(player);
loop {
controller.wait_for_queue().await;
println!("Queuing song");
loop {
let result = async {
let item = client
.random_audio(&admin_user.id, &matched_collection.id)
.await?;
println!("Fetching {} - {}", item.artists.join(","), item.name);
let sound = client.fetch_audio(item).await?;
println!("Fetched Song!");
controller.add(Box::new(sound));
anyhow::Ok(())
}
.await;
if let Err(e) = result {
println!("Error fetching new song: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
} else {
break;
}
}
}
});
let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);
loop {
let (tcp, _) = listener.accept().await?;
let io = TokioIo::new(tcp);
let backend = streamer_backend.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new().serve_connection(io, backend).await {
println!("Error serving connection: {:?}", err);
}
});
}
}

188
src/player.rs Normal file
View file

@ -0,0 +1,188 @@
use awedio::NextSample;
use awedio::Sound;
use std::sync::mpsc;
/// Heavily Based on awedios SoundList and Controllable implementations
pub struct Player {
sounds: Vec<Box<dyn Sound>>,
was_empty: bool,
song_prefetch: u32,
}
type Command<S> = Box<dyn FnOnce(&mut S) + Send>;
pub struct PlayerControllable {
inner: Player,
command_receiver: mpsc::Receiver<Command<Player>>,
queue_next_song_sender: tokio::sync::mpsc::Sender<()>,
finished: bool,
}
pub struct PlayerController {
command_sender: mpsc::Sender<Command<Player>>,
queue_next_song_receiver: tokio::sync::mpsc::Receiver<()>,
}
impl Player {
/// Create a new empty Player.
pub fn new(song_prefetch: u32) -> (PlayerControllable, PlayerController) {
let (queue_next_song_sender, queue_next_song_receiver) = tokio::sync::mpsc::channel(1);
let inner = Player {
sounds: Vec::new(),
was_empty: false,
song_prefetch,
};
let (command_sender, command_receiver) = mpsc::channel::<Command<Player>>();
let controllable = PlayerControllable {
inner,
queue_next_song_sender,
command_receiver,
finished: false,
};
let controller = PlayerController {
command_sender,
queue_next_song_receiver,
};
(controllable, controller)
}
/// Add a Sound to be played after any existing sounds have `Finished`.
pub fn add(&mut self, sound: Box<dyn Sound>) {
if self.sounds.is_empty() {
self.was_empty = true;
}
self.sounds.push(sound);
}
fn last_song_playing_or_empty(&self) -> bool {
self.sounds.len() <= self.song_prefetch as usize
}
}
// Returned only when no sounds exist so they shouldn't be used in practice.
const DEFAULT_CHANNEL_COUNT: u16 = 2;
const DEFAULT_SAMPLE_RATE: u32 = 44100;
impl Sound for Player {
fn channel_count(&self) -> u16 {
self.sounds
.first()
.map(|s| s.channel_count())
.unwrap_or(DEFAULT_CHANNEL_COUNT)
}
fn sample_rate(&self) -> u32 {
self.sounds
.first()
.map(|s| s.sample_rate())
.unwrap_or(DEFAULT_SAMPLE_RATE)
}
fn on_start_of_batch(&mut self) {
for sound in &mut self.sounds {
sound.on_start_of_batch();
}
}
fn next_sample(&mut self) -> Result<NextSample, awedio::Error> {
let Some(next_sound) = self.sounds.first_mut() else {
return Ok(NextSample::Finished);
};
if self.was_empty {
self.was_empty = false;
return Ok(NextSample::MetadataChanged);
}
let next_sample = match next_sound.next_sample() {
Ok(s) => s,
Err(e) => {
self.sounds.remove(0);
return Err(e);
}
};
let ret = match next_sample {
NextSample::Sample(_) | NextSample::MetadataChanged | NextSample::Paused => next_sample,
NextSample::Finished => {
self.sounds.remove(0);
if self.sounds.is_empty() {
NextSample::Finished
} else {
// The next sample might have different metadata. Instead of
// normalizing here let downstream normalize.
NextSample::MetadataChanged
}
}
};
Ok(ret)
}
}
impl Sound for PlayerControllable {
fn channel_count(&self) -> u16 {
self.inner.channel_count()
}
fn sample_rate(&self) -> u32 {
self.inner.sample_rate()
}
fn next_sample(&mut self) -> Result<awedio::NextSample, awedio::Error> {
let next = self.inner.next_sample()?;
match next {
awedio::NextSample::Sample(_)
| awedio::NextSample::MetadataChanged
| awedio::NextSample::Paused => Ok(next),
// Since this is controllable we might add another sound later.
// Ideally we would do this only if the inner sound can have sounds
// added to it but I don't think we can branch on S: AddSound here.
// We could add a Sound::is_addable but lets avoid that until we see
// a reason why it is necessary.
awedio::NextSample::Finished => {
if self.finished {
Ok(awedio::NextSample::Finished)
} else {
Ok(awedio::NextSample::Paused)
}
}
}
}
fn on_start_of_batch(&mut self) {
loop {
match self.command_receiver.try_recv() {
Ok(command) => command(&mut self.inner),
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
self.finished = true;
break;
}
}
}
if self.inner.last_song_playing_or_empty() {
let _ = self.queue_next_song_sender.try_send(());
}
self.inner.on_start_of_batch();
}
}
impl PlayerController {
pub fn send_command(&mut self, command: Command<Player>) {
// Ignore the error since it only happens if the receiver
// has been dropped which is not expected after it has been
// sent to the manager.
let _ = self.command_sender.send(command);
}
}
impl PlayerController {
pub fn add(&mut self, sound: Box<dyn Sound>) {
self.send_command(Box::new(|s: &mut Player| s.add(sound)));
}
pub async fn wait_for_queue(&mut self) {
self.queue_next_song_receiver.recv().await;
}
}

132
src/streamer.rs Normal file
View file

@ -0,0 +1,132 @@
use awedio::{manager::Manager, Sound};
use async_broadcast::Receiver;
use bytes::Bytes;
use core::time::Duration;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http_body_util::{combinators::BoxBody, StreamBody};
use hyper::body::Frame;
use hyper::service::Service;
use hyper::{body, Request};
use hyper::{Response, StatusCode};
const SEGMENT_INTERVAL: u64 = 2; // seconds
const SAMPLE_RATE: u64 = 48000;
const CHANNEL_COUNT: u64 = 2;
type Chunk = [i16; (SEGMENT_INTERVAL * SAMPLE_RATE * CHANNEL_COUNT) as usize];
pub struct StreamerBackend {
stream_receiver: Receiver<Box<Chunk>>,
}
impl StreamerBackend {
pub fn start() -> anyhow::Result<(Self, Manager)> {
let (manager, mut renderer) = Manager::new();
renderer.set_output_channel_count_and_sample_rate(CHANNEL_COUNT as u16, SAMPLE_RATE as u32);
let Ok(awedio::NextSample::MetadataChanged) = renderer.next_sample() else {
panic!("expected MetadataChanged event")
};
let (mut s, stream_receiver) = async_broadcast::broadcast(3);
s.set_overflow(true);
tokio::spawn(async move {
let mut stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
Duration::from_secs(SEGMENT_INTERVAL),
))
.map(move |_| {
let mut buffer = [0_i16; (SAMPLE_RATE * SEGMENT_INTERVAL * CHANNEL_COUNT) as usize];
renderer.on_start_of_batch();
buffer.fill_with(|| {
let sample = renderer
.next_sample()
.expect("renderer should never return an Error");
let sample = match sample {
awedio::NextSample::Sample(s) => s,
awedio::NextSample::MetadataChanged => {
unreachable!("we never change metadata mid-batch")
}
awedio::NextSample::Paused => 0,
awedio::NextSample::Finished => 0,
};
sample
});
Box::new(buffer)
});
loop {
s.broadcast(stream.next().await.expect("Should not end!"))
.await
.unwrap();
}
});
Ok((Self { stream_receiver }, manager))
}
}
impl Clone for StreamerBackend {
fn clone(&self) -> Self {
Self {
stream_receiver: self.stream_receiver.clone(),
}
}
}
impl Service<Request<body::Incoming>> for StreamerBackend {
type Response = Response<BoxBody<Bytes, anyhow::Error>>;
type Error = anyhow::Error;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;
fn call(&self, _req: Request<body::Incoming>) -> Self::Future {
use mp3lame_encoder::{Builder, InterleavedPcm};
let mut mp3_encoder = Builder::new().expect("Create LAME builder");
mp3_encoder
.set_num_channels(CHANNEL_COUNT as u8)
.expect("set channels");
mp3_encoder
.set_sample_rate(SAMPLE_RATE as u32)
.expect("set sample rate");
mp3_encoder
.set_brate(mp3lame_encoder::Bitrate::Kbps320)
.expect("set brate");
mp3_encoder
.set_quality(mp3lame_encoder::Quality::Best)
.expect("set quality");
let mut mp3_encoder = mp3_encoder.build().expect("To initialize LAME encoder");
//use actual PCM data
let watch_stream = self.stream_receiver.clone().map(move |data| {
let input = InterleavedPcm(&data.as_slice());
let mut mp3_out_buffer: Vec<u8> = Vec::new();
mp3_out_buffer.reserve(mp3lame_encoder::max_required_buffer_size(data.len() / 2));
let encoded_size = mp3_encoder
.encode(input, mp3_out_buffer.spare_capacity_mut())
.expect("To encode");
unsafe {
mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size));
}
anyhow::Ok(Bytes::from(mp3_out_buffer))
});
let stream_body = StreamBody::new(watch_stream.map_ok(Frame::data));
let boxed_body: BoxBody<Bytes, anyhow::Error> = BoxBody::new(stream_body); //.boxed();
Box::pin(async {
anyhow::Ok(
Response::builder()
.status(StatusCode::OK)
.body(boxed_body)
.unwrap(),
)
})
}
}