From e1c98151b5bcaecc20fe4f26350fa7d97263e33e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franti=C5=A1ek=20Boh=C3=A1=C4=8Dek?= Date: Sat, 8 Jul 2023 20:50:54 +0200 Subject: [PATCH] feat(daemon): add basic daemon handling --- daemon/.projectile | 0 daemon/Cargo.toml | 13 +++++ daemon/src/main.rs | 127 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 140 insertions(+) create mode 100644 daemon/.projectile create mode 100644 daemon/Cargo.toml create mode 100644 daemon/src/main.rs diff --git a/daemon/.projectile b/daemon/.projectile new file mode 100644 index 0000000..e69de29 diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml new file mode 100644 index 0000000..57192ae --- /dev/null +++ b/daemon/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "daemon" +version = "0.1.0" +edition = "2021" + +[dependencies] +clap = { version = "4.3.11", features = ["derive"] } +futures = "0.3.28" +mpris = "2.0.1" +serde = { version = "1.0.167", features = ["derive"] } +serde_json = { version = "1.0.100", features = ["std"] } +tokio = { version = "1.29.1", features = ["net", "macros", "rt", "rt-multi-thread", "io-util", "time"] } +tokio-util = { version = "0.7.8", features = ["codec"] } diff --git a/daemon/src/main.rs b/daemon/src/main.rs new file mode 100644 index 0000000..4f7bd4e --- /dev/null +++ b/daemon/src/main.rs @@ -0,0 +1,127 @@ +use std::{path::PathBuf, fs, sync::{Arc, Mutex}, time::Duration}; + +use clap::Parser; +use mpris::{PlayerFinder, PlaybackStatus}; +use serde::{Serialize, Deserialize}; +use tokio::{net::{UnixListener, UnixStream}, time, task}; +use tokio_util::codec::{LengthDelimitedCodec, Framed}; + +use futures::{prelude::stream::StreamExt, SinkExt}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum Request { + GetLastActive, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum Response { + None, + Players(Vec), +} + +#[derive(Debug, Parser)] +pub struct Cli { + #[arg(short = 'c', long, default_value = r"config.json", value_hint = clap::ValueHint::FilePath)] + pub config: PathBuf, + #[arg(short = 's', long, default_value = r"/tmp/mpris-ctl.sock", value_hint = clap::ValueHint::FilePath)] + pub socket: PathBuf +} + +type SharedData = Arc>>; + +#[tokio::main] +async fn main() { + let args = Cli::parse(); + + if args.socket.exists() { + fs::remove_file(&args.socket).unwrap(); + } + + let last_active_players = Arc::new(Mutex::new(Vec::::new())); + + let last_active_mpris = last_active_players.clone(); + let _mpris_handle = tokio::spawn(async move { + handle_mpris_daemon(last_active_mpris).await; + }); + + let listener = UnixListener::bind(&args.socket).expect("failed to bind socket"); + loop { + let last_active_listener = last_active_players.clone(); + match listener.accept().await { + Ok((stream, _addr)) => { + tokio::spawn(async move { + process(stream, last_active_listener).await; + }); + }, + Err(e) => { + eprintln!("{:?}", e); + } + } + } +} + +async fn handle_mpris_daemon(shared_data: SharedData) { + + let mut interval = time::interval(Duration::from_millis(250)); + loop { + interval.tick().await; + let player_finder = PlayerFinder::new().expect("Could not connect to D-Bus"); + // get active players + let active_players: Vec = player_finder + .iter_players() + .expect("Could not iterate players") + .map(|x| x.expect("Could not get one of the players")) + .filter(|x| x.get_playback_status().expect("Cannot get playback status") == PlaybackStatus::Playing) + .map(|x| String::from(x.identity())) + .collect(); + + if active_players.len() == 0 { + continue; // there is nothing to do + } + + // change active players + let mut guard = shared_data.lock().expect("Could not lock the mutex"); + *guard = active_players; + } +} + +async fn process(stream: UnixStream, shared_data: SharedData) { + let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); + while let Some(frame) = framed.next().await { + match frame { + Ok(data) => { + let request: serde_json::Result = serde_json::from_slice(&data); + if request.is_err() { + eprintln!("Could not parse the frame from client: {:?}", request.err()); + break; + } + + let request = request.unwrap(); + let response = handle_request(request, shared_data.clone()).await; + let buffer = serde_json::to_vec(&response).unwrap(); + + let send = framed.send(buffer.into()).await; + if send.is_err() { + eprintln!("Could not send to the client: {:?}", send.err()); + break; + } + }, + Err(err) => { + eprintln!("Could not read from client: {:?}", err); + break; + } + } + } + + task::yield_now().await; +} + +async fn handle_request(request: Request, shared_data: SharedData) -> Response { + match request { + Request::GetLastActive => { + let guard = shared_data.lock().expect("Could not lock the mutex."); + Response::Players(guard.clone()) + }, + _ => Response::None + } +} -- 2.48.1