forked from home-assistant/core
-
Notifications
You must be signed in to change notification settings - Fork 1
/
services.py
181 lines (146 loc) · 6.21 KB
/
services.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""Services for the Plex integration."""
import json
import logging
from plexapi.exceptions import NotFound
import voluptuous as vol
from yarl import URL
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import (
DOMAIN,
PLEX_UPDATE_PLATFORMS_SIGNAL,
PLEX_URI_SCHEME,
SERVERS,
SERVICE_REFRESH_LIBRARY,
SERVICE_SCAN_CLIENTS,
)
from .errors import MediaNotFound
from .models import PlexMediaSearchResult
REFRESH_LIBRARY_SCHEMA = vol.Schema(
{vol.Optional("server_name"): str, vol.Required("library_name"): str}
)
_LOGGER = logging.getLogger(__package__)
async def async_setup_services(hass):
"""Set up services for the Plex component."""
async def async_refresh_library_service(service_call: ServiceCall) -> None:
await hass.async_add_executor_job(refresh_library, hass, service_call)
async def async_scan_clients_service(_: ServiceCall) -> None:
_LOGGER.warning(
"This service is deprecated in favor of the scan_clients button entity. "
"Service calls will still work for now but the service will be removed in a future release"
)
for server_id in hass.data[DOMAIN][SERVERS]:
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
hass.services.async_register(
DOMAIN,
SERVICE_REFRESH_LIBRARY,
async_refresh_library_service,
schema=REFRESH_LIBRARY_SCHEMA,
)
hass.services.async_register(
DOMAIN, SERVICE_SCAN_CLIENTS, async_scan_clients_service
)
return True
def refresh_library(hass: HomeAssistant, service_call: ServiceCall) -> None:
"""Scan a Plex library for new and updated media."""
plex_server_name = service_call.data.get("server_name")
library_name = service_call.data["library_name"]
plex_server = get_plex_server(hass, plex_server_name)
try:
library = plex_server.library.section(title=library_name)
except NotFound:
_LOGGER.error(
"Library with name '%s' not found in %s",
library_name,
[x.title for x in plex_server.library.sections()],
)
return
_LOGGER.debug("Scanning %s for new and updated media", library_name)
library.update()
def get_plex_server(hass, plex_server_name=None, plex_server_id=None):
"""Retrieve a configured Plex server by name."""
if DOMAIN not in hass.data:
raise HomeAssistantError("Plex integration not configured")
plex_servers = hass.data[DOMAIN][SERVERS].values()
if not plex_servers:
raise HomeAssistantError("No Plex servers available")
if plex_server_id:
return hass.data[DOMAIN][SERVERS][plex_server_id]
if plex_server_name:
plex_server = next(
(x for x in plex_servers if x.friendly_name == plex_server_name), None
)
if plex_server is not None:
return plex_server
friendly_names = [x.friendly_name for x in plex_servers]
raise HomeAssistantError(
f"Requested Plex server '{plex_server_name}' not found in {friendly_names}"
)
if len(plex_servers) == 1:
return next(iter(plex_servers))
friendly_names = [x.friendly_name for x in plex_servers]
raise HomeAssistantError(
f"Multiple Plex servers configured, choose with 'plex_server' key: {friendly_names}"
)
def process_plex_payload(
hass, content_type, content_id, default_plex_server=None, supports_playqueues=True
) -> PlexMediaSearchResult:
"""Look up Plex media using media_player.play_media service payloads."""
plex_server = default_plex_server
extra_params = {}
if content_id.startswith(PLEX_URI_SCHEME + "{"):
# Handle the special payload of 'plex://{<json>}'
content_id = content_id[len(PLEX_URI_SCHEME) :]
content = json.loads(content_id)
elif content_id.startswith(PLEX_URI_SCHEME):
# Handle standard media_browser payloads
plex_url = URL(content_id)
if plex_url.name:
if len(plex_url.parts) == 2:
# The path contains a single item, will always be a ratingKey
content = int(plex_url.name)
else:
# For "special" items like radio stations
content = plex_url.path
server_id = plex_url.host
plex_server = get_plex_server(hass, plex_server_id=server_id)
else:
# Handle legacy payloads without server_id in URL host position
content = int(plex_url.host) # type: ignore[arg-type]
extra_params = dict(plex_url.query)
else:
content = json.loads(content_id)
if isinstance(content, dict):
if plex_server_name := content.pop("plex_server", None):
plex_server = get_plex_server(hass, plex_server_name)
if not plex_server:
plex_server = get_plex_server(hass)
if content_type == "station":
if not supports_playqueues:
raise HomeAssistantError("Plex stations are not supported on this device")
playqueue = plex_server.create_station_playqueue(content)
return PlexMediaSearchResult(playqueue)
if isinstance(content, int):
content = {"plex_key": content}
content_type = DOMAIN
content.update(extra_params)
if playqueue_id := content.pop("playqueue_id", None):
if not supports_playqueues:
raise HomeAssistantError("Plex playqueues are not supported on this device")
try:
playqueue = plex_server.get_playqueue(playqueue_id)
except NotFound as err:
raise MediaNotFound(
f"PlayQueue '{playqueue_id}' could not be found"
) from err
return PlexMediaSearchResult(playqueue, content)
search_query = content.copy()
shuffle = search_query.pop("shuffle", 0)
media = plex_server.lookup_media(content_type, **search_query)
if supports_playqueues and (isinstance(media, list) or shuffle):
playqueue = plex_server.create_playqueue(
media, includeRelated=0, shuffle=shuffle
)
return PlexMediaSearchResult(playqueue, content)
return PlexMediaSearchResult(media, content)