~ruther/nixos-config

ref: 884fd5613aabb3a48fdd35a64d5c9e2fce46f9d7 nixos-config/modules/desktop/qtile/config/mpris2widget.py -rw-r--r-- 14.7 KiB
884fd561 — Frantisek Bohacek feat: use xsecurelock only by utilizing xss-lock 1 year, 6 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# Copyright (c) 2014 Sebastian Kricner
# Copyright (c) 2014 Sean Vig
# Copyright (c) 2014 Adi Sieker
# Copyright (c) 2014 Tycho Andersen
# Copyright (c) 2020 elParaguayo
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations

import asyncio
import re
import string
from typing import TYPE_CHECKING

from dbus_next import Message, Variant
from dbus_next.constants import MessageType

from libqtile.command.base import expose_command
from libqtile.log_utils import logger
from libqtile.utils import _send_dbus_message, add_signal_receiver, create_task
from libqtile.widget import base

if TYPE_CHECKING:
    from typing import Any

MPRIS_PATH = "/org/mpris/MediaPlayer2"
MPRIS_OBJECT = "org.mpris.MediaPlayer2"
MPRIS_PLAYER = "org.mpris.MediaPlayer2.Player"
PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
MPRIS_REGEX = re.compile(r"(\{(.*?):(.*?)(:.*?)?\})")


class Mpris2Formatter(string.Formatter):
    """
    Custom string formatter for MPRIS2 metadata.

    Keys have a colon (e.g. "xesam:title") which causes issues with python's string
    formatting as the colon splits the identifier from the format specification.

    This formatter handles this issue by changing the first colon to an underscore and
    then formatting the incoming kwargs to match.

    Additionally, a default value is returned when an identifier is not provided by the
    kwarg data.
    """

    def __init__(self, default=""):
        string.Formatter.__init__(self)
        self._default = default

    def get_value(self, key, args, kwargs):
        """
        Replaces colon in kwarg keys with an underscore before getting value.

        Missing identifiers are replaced with the default value.
        """
        kwargs = {k.replace(":", "_"): v for k, v in kwargs.items()}
        try:
            return string.Formatter.get_value(self, key, args, kwargs)
        except (IndexError, KeyError):
            return self._default

    def parse(self, format_string):
        """
        Replaces first colon in format string with an underscore.

        This will cause issues if any identifier is provided that does not
        contain a colon. This should not happen according to the MPRIS2
        specification!
        """
        format_string = MPRIS_REGEX.sub(r"{\2_\3\4}", format_string)
        return string.Formatter.parse(self, format_string)


class Mpris2(base._TextBox):
    """An MPRIS 2 widget

    A widget which displays the current track/artist of your favorite MPRIS
    player. This widget scrolls the text if neccessary and information that
    is displayed is configurable.

    The widget relies on players broadcasting signals when the metadata or playback
    status changes. If you are getting inconsistent results then you can enable background
    polling of the player by setting the `poll_interval` parameter. This is disabled by
    default.

    Basic mouse controls are also available: button 1 = play/pause,
    scroll up = next track, scroll down = previous track.

    Widget requirements: dbus-next_.

    .. _dbus-next: https://pypi.org/project/dbus-next/
    """

    defaults = [
        ("name", "audacious", "Name of the MPRIS widget."),
        (
            "objname",
            None,
            "DBUS MPRIS 2 compatible player identifier"
            "- Find it out with dbus-monitor - "
            "Also see: http://specifications.freedesktop.org/"
            "mpris-spec/latest/#Bus-Name-Policy. "
            "``None`` will listen for notifications from all MPRIS2 compatible players.",
        ),

        (
            "playerfilter",
            None,
            "Filter the player identifier based on this regex",
            "usable with objname = None"
        ),
        (
            "format",
            "{xesam:title} - {xesam:album} - {xesam:artist}",
            "Format string for displaying metadata. "
            "See http://www.freedesktop.org/wiki/Specifications/mpris-spec/metadata/#index5h3 "
            "for available values",
        ),
        ("separator", ", ", "Separator for metadata fields that are a list."),
        (
            "display_metadata",
            ["xesam:title", "xesam:album", "xesam:artist"],
            "(Deprecated) Which metadata identifiers to display. ",
        ),
        ("scroll", True, "Whether text should scroll."),
        ("playing_text", "{track}", "Text to show when playing"),
        ("paused_text", "Paused: {track}", "Text to show when paused"),
        ("stopped_text", "", "Text to show when stopped"),
        (
            "stop_pause_text",
            None,
            "(Deprecated) Optional text to display when in the stopped/paused state",
        ),
        (
            "no_metadata_text",
            "No metadata for current track",
            "Text to show when track has no metadata",
        ),
        (
            "poll_interval",
            0,
            "Periodic background polling interval of player (0 to disable polling).",
        ),
    ]

    def __init__(self, **config):
        base._TextBox.__init__(self, "", **config)
        self.add_defaults(Mpris2.defaults)
        self.is_playing = False
        self.count = 0
        self.displaytext = ""
        self.track_info = ""
        self.status = "{track}"
        self.add_callbacks(
            {
                "Button1": self.play_pause,
                "Button4": self.next,
                "Button5": self.previous,
            }
        )
        paused = ""
        stopped = ""
        if "stop_pause_text" in config:
            logger.warning(
                "The use of 'stop_pause_text' is deprecated. Please use 'paused_text' and 'stopped_text' instead."
            )
            if "paused_text" not in config:
                paused = self.stop_pause_text

            if "stopped_text" not in config:
                stopped = self.stop_pause_text

        if "display_metadata" in config:
            logger.warning(
                "The use of `display_metadata is deprecated. Please use `format` instead."
            )
            self.format = " - ".join(f"{{{s}}}" for s in config["display_metadata"])

        self._formatter = Mpris2Formatter()

        self.prefixes = {
            "Playing": self.playing_text,
            "Paused": paused or self.paused_text,
            "Stopped": stopped or self.stopped_text,
        }

        self._current_player: str | None = None
        self.player_names: dict[str, str] = {}
        self._background_poll: asyncio.TimerHandle | None = None

    @property
    def player(self) -> str:
        if self._current_player is None:
            return "None"
        else:
            return self.player_names.get(self._current_player, "Unknown")

    async def _config_async(self):
        # Set up a listener for NameOwner changes so we can remove players when they close
        await add_signal_receiver(
            self._name_owner_changed,
            session_bus=True,
            signal_name="NameOwnerChanged",
            dbus_interface="org.freedesktop.DBus",
        )

        # Listen out for signals from any Mpris2 compatible player
        subscribe = await add_signal_receiver(
            self.message,
            session_bus=True,
            signal_name="PropertiesChanged",
            bus_name=self.objname,
            path="/org/mpris/MediaPlayer2",
            dbus_interface="org.freedesktop.DBus.Properties",
        )

        if not subscribe:
            logger.warning("Unable to add signal receiver for Mpris2 players")

        # If the user has specified a player to be monitored, we can poll it now.
        if self.objname is not None:
            await self._check_player()

    def _name_owner_changed(self, message):
        # We need to track when an interface has been removed from the bus
        # We use the NameOwnerChanged signal and check if the new owner is
        # empty.
        name, _, new_owner = message.body

        # Check if the current player has closed
        if new_owner == "" and name == self._current_player:
            self._current_player = None
            self.update("")

            # Cancel any scheduled background poll
            self._set_background_poll(False)

    def message(self, message):
        if message.message_type != MessageType.SIGNAL:
            return

        create_task(self.process_message(message))

    async def process_message(self, message):
        current_player = message.sender

        name = await self.get_player_name(current_player)
        if current_player not in self.player_names:
            self.player_names[current_player] = name

        if self.playerfilter != None and not re.match(self.playerfilter, name):
            return

        self._current_player = current_player

        self.parse_message(*message.body)

    async def _check_player(self):
        """Check for player at startup and retrieve metadata."""
        if not (self.objname or self._current_player):
            return

        bus, message = await _send_dbus_message(
            True,
            MessageType.METHOD_CALL,
            self.objname if self.objname else self._current_player,
            PROPERTIES_INTERFACE,
            MPRIS_PATH,
            "GetAll",
            "s",
            [MPRIS_PLAYER],
        )

        if bus:
            bus.disconnect()

        # If we get an error here it will be because the player object doesn't exist
        if message.message_type != MessageType.METHOD_RETURN:
            self._current_player = None
            self.update("")
            return

        if message.body:
            self._current_player = message.sender
            self.parse_message(self.objname, message.body[0], [])

    def _set_background_poll(self, poll=True):
        if self._background_poll is not None:
            self._background_poll.cancel()

        if poll:
            self._background_poll = self.timeout_add(self.poll_interval, self._check_player)

    async def get_player_name(self, player):
        bus, message = await _send_dbus_message(
            True,
            MessageType.METHOD_CALL,
            player,
            PROPERTIES_INTERFACE,
            MPRIS_PATH,
            "Get",
            "ss",
            [MPRIS_OBJECT, "Identity"],
        )

        if bus:
            bus.disconnect()

        if message.message_type != MessageType.METHOD_RETURN:
            logger.warning("Could not retrieve identity of player on %s.", player)
            return ""

        return message.body[0].value

    def parse_message(
        self,
        _interface_name: str,
        changed_properties: dict[str, Any],
        _invalidated_properties: list[str],
    ) -> None:
        """
        http://specifications.freedesktop.org/mpris-spec/latest/Track_List_Interface.html#Mapping:Metadata_Map
        """
        if not self.configured:
            return

        if "Metadata" not in changed_properties and "PlaybackStatus" not in changed_properties:
            return

        self.displaytext = ""

        metadata = changed_properties.get("Metadata")
        if metadata:
            self.track_info = self.get_track_info(metadata.value)

        playbackstatus = getattr(changed_properties.get("PlaybackStatus"), "value", None)
        if playbackstatus:
            self.is_playing = playbackstatus == "Playing"
            self.status = self.prefixes.get(playbackstatus, "{track}")

        if not self.track_info:
            self.track_info = self.no_metadata_text

        self.displaytext = self.status.format(track=self.track_info)

        if self.text != self.displaytext:
            self.update(self.displaytext)

        if self.poll_interval:
            self._set_background_poll()

    def get_track_info(self, metadata: dict[str, Variant]) -> str:
        self.metadata = {}
        for key in metadata:
            new_key = key
            val = getattr(metadata.get(key), "value", None)
            if isinstance(val, str):
                self.metadata[new_key] = val
            elif isinstance(val, list):
                self.metadata[new_key] = self.separator.join(
                    (y for y in val if isinstance(y, str))
                )

        return self._formatter.format(self.format, **self.metadata).replace("\n", "")

    def _player_cmd(self, cmd: str) -> None:
        if self._current_player is None:
            return

        task = create_task(self._send_player_cmd(cmd))
        assert task
        task.add_done_callback(self._task_callback)

    async def _send_player_cmd(self, cmd: str) -> Message | None:
        bus, message = await _send_dbus_message(
            True,
            MessageType.METHOD_CALL,
            self._current_player,
            MPRIS_PLAYER,
            MPRIS_PATH,
            cmd,
            "",
            [],
        )

        if bus:
            bus.disconnect()

        return message

    def _task_callback(self, task: asyncio.Task) -> None:
        message = task.result()

        # This happens if we can't connect to dbus. Logger call is made
        # elsewhere so we don't need to do any more here.
        if message is None:
            return

        if message.message_type != MessageType.METHOD_RETURN:
            logger.warning("Unable to send command to player.")

    @expose_command()
    def play_pause(self) -> None:
        """Toggle the playback status."""
        self._player_cmd("PlayPause")


    @expose_command()
    def next(self) -> None:
        """Play the next track."""
        self._player_cmd("Next")


    @expose_command()
    def previous(self) -> None:
        """Play the previous track."""
        self._player_cmd("Previous")


    @expose_command()
    def stop(self) -> None:
        """Stop playback."""
        self._player_cmd("Stop")


    @expose_command()
    def info(self):
        """What's the current state of the widget?"""
        d = base._TextBox.info(self)
        d.update(dict(isplaying=self.is_playing, player=self.player))
        return d
Do not follow this link