/
config_flow.py
178 lines (142 loc) · 6.2 KB
/
config_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Config flow to configure esphome component."""
from collections import OrderedDict
from typing import Optional
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.helpers import ConfigType
from .entry_data import DATA_KEY, RuntimeEntryData
@config_entries.HANDLERS.register("esphome")
class EsphomeFlowHandler(config_entries.ConfigFlow):
"""Handle a esphome config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
def __init__(self):
"""Initialize flow."""
self._host: Optional[str] = None
self._port: Optional[int] = None
self._password: Optional[str] = None
async def async_step_user(
self, user_input: Optional[ConfigType] = None, error: Optional[str] = None
):
"""Handle a flow initialized by the user."""
if user_input is not None:
return await self._async_authenticate_or_add(user_input)
fields = OrderedDict()
fields[vol.Required("host", default=self._host or vol.UNDEFINED)] = str
fields[vol.Optional("port", default=self._port or 6053)] = int
errors = {}
if error is not None:
errors["base"] = error
return self.async_show_form(
step_id="user", data_schema=vol.Schema(fields), errors=errors
)
@property
def _name(self):
# pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167
return self.context.get("name")
@_name.setter
def _name(self, value):
# pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167
self.context["name"] = value
self.context["title_placeholders"] = {"name": self._name}
def _set_user_input(self, user_input):
if user_input is None:
return
self._host = user_input["host"]
self._port = user_input["port"]
async def _async_authenticate_or_add(self, user_input):
self._set_user_input(user_input)
error, device_info = await self.fetch_device_info()
if error is not None:
return await self.async_step_user(error=error)
self._name = device_info.name
# Only show authentication step if device uses password
if device_info.uses_password:
return await self.async_step_authenticate()
return self._async_get_entry()
async def async_step_discovery_confirm(self, user_input=None):
"""Handle user-confirmation of discovered node."""
if user_input is not None:
return await self._async_authenticate_or_add(None)
return self.async_show_form(
step_id="discovery_confirm", description_placeholders={"name": self._name}
)
async def async_step_zeroconf(self, user_input: ConfigType):
"""Handle zeroconf discovery."""
# Hostname is format: livingroom.local.
local_name = user_input["hostname"][:-1]
node_name = local_name[: -len(".local")]
address = user_input["properties"].get("address", local_name)
# Check if already configured
for entry in self._async_current_entries():
already_configured = False
if entry.data["host"] == address:
# Is this address already configured?
already_configured = True
elif entry.entry_id in self.hass.data.get(DATA_KEY, {}):
# Does a config entry with this name already exist?
data: RuntimeEntryData = self.hass.data[DATA_KEY][entry.entry_id]
# Node names are unique in the network
if data.device_info is not None:
already_configured = data.device_info.name == node_name
if already_configured:
return self.async_abort(reason="already_configured")
self._host = address
self._port = user_input["port"]
self._name = node_name
# Check if flow for this device already in progress
for flow in self._async_in_progress():
if flow["context"].get("name") == node_name:
return self.async_abort(reason="already_configured")
return await self.async_step_discovery_confirm()
def _async_get_entry(self):
return self.async_create_entry(
title=self._name,
data={
"host": self._host,
"port": self._port,
# The API uses protobuf, so empty string denotes absence
"password": self._password or "",
},
)
async def async_step_authenticate(self, user_input=None, error=None):
"""Handle getting password for authentication."""
if user_input is not None:
self._password = user_input["password"]
error = await self.try_login()
if error:
return await self.async_step_authenticate(error=error)
return self._async_get_entry()
errors = {}
if error is not None:
errors["base"] = error
return self.async_show_form(
step_id="authenticate",
data_schema=vol.Schema({vol.Required("password"): str}),
description_placeholders={"name": self._name},
errors=errors,
)
async def fetch_device_info(self):
"""Fetch device info from API and return any errors."""
from aioesphomeapi import APIClient, APIConnectionError
cli = APIClient(self.hass.loop, self._host, self._port, "")
try:
await cli.connect()
device_info = await cli.device_info()
except APIConnectionError as err:
if "resolving" in str(err):
return "resolve_error", None
return "connection_error", None
finally:
await cli.disconnect(force=True)
return None, device_info
async def try_login(self):
"""Try logging in to device and return any errors."""
from aioesphomeapi import APIClient, APIConnectionError
cli = APIClient(self.hass.loop, self._host, self._port, self._password)
try:
await cli.connect(login=True)
except APIConnectionError:
await cli.disconnect(force=True)
return "invalid_password"
return None