~ruther/mpris-ctl

ref: 5dd996b62a67365d1a62d57d5fa9080e838d746c mpris-ctl/daemon/src/main.rs -rw-r--r-- 4.7 KiB
5dd996b6 — František Boháček feat(daemon): handle errors correctly, logging the error 1 year, 8 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use std::{
    error::Error,
    fs,
    path::PathBuf,
    sync::{Arc, Mutex},
    time::Duration,
};

use clap::Parser;
use mpris::{PlaybackStatus, PlayerFinder};
use serde::{Deserialize, Serialize};
use tokio::{
    net::{UnixListener, UnixStream},
    time,
};
use tokio_util::codec::{Framed, LengthDelimitedCodec};

use futures::{prelude::stream::StreamExt, SinkExt};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Request {
    GetLastActive,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Response {
    None,
    Players(Vec<String>),
}

#[derive(Debug, Parser)]
pub struct Cli {
    #[arg(short = 'c', long, default_value = r"config.json", value_hint = clap::ValueHint::FilePath)]
    pub config: PathBuf,
    #[arg(short = 's', long, default_value = r"/tmp/mpris-ctl.sock", value_hint = clap::ValueHint::FilePath)]
    pub socket: PathBuf,
}

type SharedData = Arc<Mutex<Vec<String>>>;

#[tokio::main]
async fn main() {
    let args = Cli::parse();

    if args.socket.exists() {
        fs::remove_file(&args.socket).unwrap();
    }

    let last_active_players = Arc::new(Mutex::new(Vec::<String>::new()));

    let last_active_mpris = last_active_players.clone();
    let _mpris_handle = tokio::spawn(async move {
        handle_mpris_daemon_loop(last_active_mpris).await;
    });

    let listener = UnixListener::bind(&args.socket).expect("failed to bind socket");
    loop {
        let last_active_listener = last_active_players.clone();
        match listener.accept().await {
            Ok((stream, _addr)) => {
                tokio::spawn(async move {
                    process(stream, last_active_listener).await;
                });
            }
            Err(e) => {
                eprintln!("{:?}", e);
            }
        }
    }
}

async fn handle_mpris_daemon_loop(shared_data: SharedData) {
    let mut interval = time::interval(Duration::from_millis(250));
    loop {
        interval.tick().await;
        match handle_mpris_daemon(&shared_data).await {
            Ok(..) => (),
            Err(err) => eprintln!("Got an error in the daemon player finder loop: {}", err),
        }
    }
}

async fn handle_mpris_daemon(shared_data: &SharedData) -> Result<(), Box<dyn Error + '_>> {
    let player_finder = PlayerFinder::new()?;

    let mut errors = vec![];
    let active_players: Vec<_> = player_finder
        .iter_players()?
        .filter_map(|x| {
            let player_result = x.map_err(|e| errors.push(e)).ok();
            let status = if let Some(player) = &player_result {
                player
                    .get_playback_status()
                    .map_err(|e| errors.push(e))
                    .ok()
            } else {
                Option::None
            };

            if status.is_some() && status.unwrap() == PlaybackStatus::Playing {
                player_result
            } else {
                Option::None
            }
        })
        .map(|x| String::from(x.identity()))
        .collect();

    if active_players.len() != 0 {
        let mut guard = shared_data.lock()?;
        *guard = active_players;
    }

    for error in errors {
        // TODO: return the errors correctly (this is not so easy as the DBusError does not contain copy trait)
        // maybe create a custom error type somehow wrapping dbus error?
        eprintln!("An errorw hen obtaining a player has occurred: {}", error);
    }

    Ok(())
}

async fn process(stream: UnixStream, shared_data: SharedData) {
    let mut framed = Framed::new(stream, LengthDelimitedCodec::new());
    while let Some(frame) = framed.next().await {
        match frame {
            Ok(data) => {
                let request: serde_json::Result<Request> = serde_json::from_slice(&data);
                if request.is_err() {
                    eprintln!("Could not parse the frame from client: {:?}", request.err());
                    break;
                }

                let request = request.unwrap();
                let response = handle_request(request, shared_data.clone()).await;
                let buffer = serde_json::to_vec(&response).unwrap();

                let send = framed.send(buffer.into()).await;
                if send.is_err() {
                    eprintln!("Could not send to the client: {:?}", send.err());
                    break;
                }
            }
            Err(err) => {
                eprintln!("Could not read from client: {:?}", err);
                break;
            }
        }
    }
}

async fn handle_request(request: Request, shared_data: SharedData) -> Response {
    match request {
        Request::GetLastActive => {
            let guard = shared_data.lock().expect("Could not lock the mutex.");
            Response::Players(guard.clone())
        }
        _ => Response::None,
    }
}
Do not follow this link