~ruther/mpris-ctl

e1c98151b5bcaecc20fe4f26350fa7d97263e33e — František Boháček 1 year, 8 months ago 3a43681
feat(daemon): add basic daemon handling
3 files changed, 140 insertions(+), 0 deletions(-)

A daemon/.projectile
A daemon/Cargo.toml
A daemon/src/main.rs
A daemon/.projectile => daemon/.projectile +0 -0
A daemon/Cargo.toml => daemon/Cargo.toml +13 -0
@@ 0,0 1,13 @@
[package]
name = "daemon"
version = "0.1.0"
edition = "2021"

[dependencies]
clap = { version = "4.3.11", features = ["derive"] }
futures = "0.3.28"
mpris = "2.0.1"
serde = { version = "1.0.167", features = ["derive"] }
serde_json = { version = "1.0.100", features = ["std"] }
tokio = { version = "1.29.1", features = ["net", "macros", "rt", "rt-multi-thread", "io-util", "time"] }
tokio-util = { version = "0.7.8", features = ["codec"] }

A daemon/src/main.rs => daemon/src/main.rs +127 -0
@@ 0,0 1,127 @@
use std::{path::PathBuf, fs, sync::{Arc, Mutex}, time::Duration};

use clap::Parser;
use mpris::{PlayerFinder, PlaybackStatus};
use serde::{Serialize, Deserialize};
use tokio::{net::{UnixListener, UnixStream}, time, task};
use tokio_util::codec::{LengthDelimitedCodec, Framed};

use futures::{prelude::stream::StreamExt, SinkExt};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Request {
    GetLastActive,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Response {
    None,
    Players(Vec<String>),
}

#[derive(Debug, Parser)]
pub struct Cli {
    #[arg(short = 'c', long, default_value = r"config.json", value_hint = clap::ValueHint::FilePath)]
    pub config: PathBuf,
    #[arg(short = 's', long, default_value = r"/tmp/mpris-ctl.sock", value_hint = clap::ValueHint::FilePath)]
    pub socket: PathBuf
}

type SharedData = Arc<Mutex<Vec<String>>>;

#[tokio::main]
async fn main() {
    let args = Cli::parse();

    if args.socket.exists() {
        fs::remove_file(&args.socket).unwrap();
    }

    let last_active_players = Arc::new(Mutex::new(Vec::<String>::new()));

    let last_active_mpris = last_active_players.clone();
    let _mpris_handle = tokio::spawn(async move {
        handle_mpris_daemon(last_active_mpris).await;
    });

    let listener = UnixListener::bind(&args.socket).expect("failed to bind socket");
    loop {
        let last_active_listener = last_active_players.clone();
        match listener.accept().await {
            Ok((stream, _addr)) => {
                tokio::spawn(async move {
                    process(stream, last_active_listener).await;
                });
            },
            Err(e) => {
                eprintln!("{:?}", e);
            }
        }
    }
}

async fn handle_mpris_daemon(shared_data: SharedData) {

    let mut interval = time::interval(Duration::from_millis(250));
    loop {
        interval.tick().await;
        let player_finder = PlayerFinder::new().expect("Could not connect to D-Bus");
        // get active players
        let active_players: Vec<String> = player_finder
            .iter_players()
            .expect("Could not iterate players")
            .map(|x| x.expect("Could not get one of the players"))
            .filter(|x| x.get_playback_status().expect("Cannot get playback status") == PlaybackStatus::Playing)
            .map(|x| String::from(x.identity()))
            .collect();

        if active_players.len() == 0 {
            continue; // there is nothing to do
        }

        // change active players
        let mut guard = shared_data.lock().expect("Could not lock the mutex");
        *guard = active_players;
    }
}

async fn process(stream: UnixStream, shared_data: SharedData) {
    let mut framed = Framed::new(stream, LengthDelimitedCodec::new());
    while let Some(frame) = framed.next().await {
        match frame {
            Ok(data) => {
                let request: serde_json::Result<Request> = serde_json::from_slice(&data);
                if request.is_err() {
                    eprintln!("Could not parse the frame from client: {:?}", request.err());
                    break;
                }

                let request = request.unwrap();
                let response = handle_request(request, shared_data.clone()).await;
                let buffer = serde_json::to_vec(&response).unwrap();

                let send = framed.send(buffer.into()).await;
                if send.is_err() {
                    eprintln!("Could not send to the client: {:?}", send.err());
                    break;
                }
            },
            Err(err) => {
                eprintln!("Could not read from client: {:?}", err);
                break;
            }
        }
    }

    task::yield_now().await;
}

async fn handle_request(request: Request, shared_data: SharedData) -> Response {
    match request {
        Request::GetLastActive => {
            let guard = shared_data.lock().expect("Could not lock the mutex.");
            Response::Players(guard.clone())
        },
        _ => Response::None
    }
}

Do not follow this link