diff --git a/homeassistant/components/file/__init__.py b/homeassistant/components/file/__init__.py index ed31fa957dd00c..82e12ee5d165c3 100644 --- a/homeassistant/components/file/__init__.py +++ b/homeassistant/components/file/__init__.py @@ -1 +1,101 @@ """The file component.""" + +from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry +from homeassistant.const import CONF_FILE_PATH, CONF_PLATFORM, Platform +from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant +from homeassistant.exceptions import ConfigEntryNotReady +from homeassistant.helpers import ( + config_validation as cv, + discovery, + issue_registry as ir, +) +from homeassistant.helpers.typing import ConfigType + +from .const import DOMAIN + +CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN) + +PLATFORMS = [Platform.SENSOR] + +YAML_PLATFORMS = [Platform.NOTIFY, Platform.SENSOR] + + +async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: + """Set up the file integration.""" + + if hass.config_entries.async_entries(DOMAIN): + # We skip import in case we already have config entries + return True + # The YAML config was imported with HA Core 2024.6.0 and will be removed with + # HA Core 2024.12 + ir.async_create_issue( + hass, + HOMEASSISTANT_DOMAIN, + f"deprecated_yaml_{DOMAIN}", + breaks_in_ha_version="2024.12.0", + is_fixable=False, + issue_domain=DOMAIN, + learn_more_url="https://www.home-assistant.io/integrations/file/", + severity=ir.IssueSeverity.WARNING, + translation_key="deprecated_yaml", + translation_placeholders={ + "domain": DOMAIN, + "integration_title": "File", + }, + ) + + # Import the YAML config into separate config entries + for domain, items in config.items(): + for item in items: + if item[CONF_PLATFORM] == DOMAIN: + item[CONF_PLATFORM] = domain + hass.async_create_task( + hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_IMPORT}, + data=item, + ) + ) + + return True + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up a file component entry.""" + config = dict(entry.data) + filepath: str = config[CONF_FILE_PATH] + if filepath and not await hass.async_add_executor_job( + hass.config.is_allowed_path, filepath + ): + raise ConfigEntryNotReady( + translation_domain=DOMAIN, + translation_key="dir_not_allowed", + translation_placeholders={"filename": filepath}, + ) + + if entry.data[CONF_PLATFORM] in PLATFORMS: + await hass.config_entries.async_forward_entry_setups( + entry, [Platform(entry.data[CONF_PLATFORM])] + ) + else: + # The notify platform is not yet set up as entry, so + # forward setup config through discovery to ensure setup notify service. + # This is needed as long as the legacy service is not migrated + hass.async_create_task( + discovery.async_load_platform( + hass, + Platform.NOTIFY, + DOMAIN, + config, + {}, + ) + ) + + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a config entry.""" + return await hass.config_entries.async_unload_platforms( + entry, [entry.data[CONF_PLATFORM]] + ) diff --git a/homeassistant/components/file/config_flow.py b/homeassistant/components/file/config_flow.py new file mode 100644 index 00000000000000..9c6bcb4df003eb --- /dev/null +++ b/homeassistant/components/file/config_flow.py @@ -0,0 +1,126 @@ +"""Config flow for file integration.""" + +import os +from typing import Any + +import voluptuous as vol + +from homeassistant.config_entries import ConfigFlow, ConfigFlowResult +from homeassistant.const import ( + CONF_FILE_PATH, + CONF_FILENAME, + CONF_NAME, + CONF_PLATFORM, + CONF_UNIT_OF_MEASUREMENT, + CONF_VALUE_TEMPLATE, + Platform, +) +from homeassistant.helpers.selector import ( + BooleanSelector, + BooleanSelectorConfig, + TemplateSelector, + TemplateSelectorConfig, + TextSelector, + TextSelectorConfig, + TextSelectorType, +) + +from .const import CONF_TIMESTAMP, DEFAULT_NAME, DOMAIN + +BOOLEAN_SELECTOR = BooleanSelector(BooleanSelectorConfig()) +TEMPLATE_SELECTOR = TemplateSelector(TemplateSelectorConfig()) +TEXT_SELECTOR = TextSelector(TextSelectorConfig(type=TextSelectorType.TEXT)) + +FILE_SENSOR_SCHEMA = vol.Schema( + { + vol.Optional(CONF_NAME, default=DEFAULT_NAME): TEXT_SELECTOR, + vol.Required(CONF_FILE_PATH): TEXT_SELECTOR, + vol.Optional(CONF_VALUE_TEMPLATE): TEMPLATE_SELECTOR, + vol.Optional(CONF_UNIT_OF_MEASUREMENT): TEXT_SELECTOR, + } +) + +FILE_NOTIFY_SCHEMA = vol.Schema( + { + vol.Optional(CONF_NAME, default=DEFAULT_NAME): TEXT_SELECTOR, + vol.Required(CONF_FILE_PATH): TEXT_SELECTOR, + vol.Optional(CONF_TIMESTAMP, default=False): BOOLEAN_SELECTOR, + } +) + + +class FileConfigFlowHandler(ConfigFlow, domain=DOMAIN): + """Handle a file config flow.""" + + VERSION = 1 + + async def validate_file_path(self, file_path: str) -> bool: + """Ensure the file path is valid.""" + return await self.hass.async_add_executor_job( + self.hass.config.is_allowed_path, file_path + ) + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle a flow initiated by the user.""" + return self.async_show_menu( + step_id="user", + menu_options=["notify", "sensor"], + ) + + async def async_step_notify( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle file notifier config flow.""" + errors: dict[str, str] = {} + if user_input: + user_input[CONF_PLATFORM] = "notify" + self._async_abort_entries_match(user_input) + if not await self.validate_file_path(user_input[CONF_FILE_PATH]): + errors[CONF_FILE_PATH] = "not_allowed" + else: + name: str = user_input.get(CONF_NAME, DEFAULT_NAME) + title = f"{name} [{user_input[CONF_FILE_PATH]}]" + return self.async_create_entry(data=user_input, title=title) + + return self.async_show_form( + step_id="notify", data_schema=FILE_NOTIFY_SCHEMA, errors=errors + ) + + async def async_step_sensor( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle file sensor config flow.""" + errors: dict[str, str] = {} + if user_input: + user_input[CONF_PLATFORM] = "sensor" + self._async_abort_entries_match(user_input) + if not await self.validate_file_path(user_input[CONF_FILE_PATH]): + errors[CONF_FILE_PATH] = "not_allowed" + else: + name: str = user_input.get(CONF_NAME, DEFAULT_NAME) + title = f"{name} [{user_input[CONF_FILE_PATH]}]" + return self.async_create_entry(data=user_input, title=title) + + return self.async_show_form( + step_id="sensor", data_schema=FILE_SENSOR_SCHEMA, errors=errors + ) + + async def async_step_import( + self, import_data: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Import `file`` config from configuration.yaml.""" + assert import_data is not None + self._async_abort_entries_match(import_data) + platform = import_data[CONF_PLATFORM] + name: str = import_data.get(CONF_NAME, DEFAULT_NAME) + file_name: str + if platform == Platform.NOTIFY: + file_name = import_data.pop(CONF_FILENAME) + file_path: str = os.path.join(self.hass.config.config_dir, file_name) + import_data[CONF_FILE_PATH] = file_path + else: + file_path = import_data[CONF_FILE_PATH] + title = f"{name} [{file_path}]" + return self.async_create_entry(title=title, data=import_data) diff --git a/homeassistant/components/file/const.py b/homeassistant/components/file/const.py new file mode 100644 index 00000000000000..0fa9f8a421bd22 --- /dev/null +++ b/homeassistant/components/file/const.py @@ -0,0 +1,8 @@ +"""Constants for the file integration.""" + +DOMAIN = "file" + +CONF_TIMESTAMP = "timestamp" + +DEFAULT_NAME = "File" +FILE_ICON = "mdi:file" diff --git a/homeassistant/components/file/manifest.json b/homeassistant/components/file/manifest.json index fb09e5151f2034..37bb108e1d5fe9 100644 --- a/homeassistant/components/file/manifest.json +++ b/homeassistant/components/file/manifest.json @@ -2,6 +2,7 @@ "domain": "file", "name": "File", "codeowners": ["@fabaff"], + "config_flow": true, "documentation": "https://www.home-assistant.io/integrations/file", "iot_class": "local_polling", "requirements": ["file-read-backwards==2.0.0"] diff --git a/homeassistant/components/file/notify.py b/homeassistant/components/file/notify.py index 50e6cec09a8f6b..69ebda46e572c3 100644 --- a/homeassistant/components/file/notify.py +++ b/homeassistant/components/file/notify.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging import os from typing import Any, TextIO @@ -13,14 +14,19 @@ PLATFORM_SCHEMA, BaseNotificationService, ) -from homeassistant.const import CONF_FILENAME +from homeassistant.const import CONF_FILE_PATH, CONF_FILENAME from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ServiceValidationError import homeassistant.helpers.config_validation as cv from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType import homeassistant.util.dt as dt_util -CONF_TIMESTAMP = "timestamp" +from .const import CONF_TIMESTAMP, DOMAIN +_LOGGER = logging.getLogger(__name__) + +# The legacy platform schema uses a filename, after import +# The full file path is stored in the config entry PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_FILENAME): cv.string, @@ -29,40 +35,50 @@ ) -def get_service( +async def async_get_service( hass: HomeAssistant, config: ConfigType, discovery_info: DiscoveryInfoType | None = None, -) -> FileNotificationService: +) -> FileNotificationService | None: """Get the file notification service.""" - filename: str = config[CONF_FILENAME] - timestamp: bool = config[CONF_TIMESTAMP] + if discovery_info is None: + # We only set up through discovery + return None + file_path: str = discovery_info[CONF_FILE_PATH] + timestamp: bool = discovery_info[CONF_TIMESTAMP] - return FileNotificationService(filename, timestamp) + return FileNotificationService(file_path, timestamp) class FileNotificationService(BaseNotificationService): """Implement the notification service for the File service.""" - def __init__(self, filename: str, add_timestamp: bool) -> None: + def __init__(self, file_path: str, add_timestamp: bool) -> None: """Initialize the service.""" - self.filename = filename + self._file_path = file_path self.add_timestamp = add_timestamp def send_message(self, message: str = "", **kwargs: Any) -> None: """Send a message to a file.""" file: TextIO - filepath: str = os.path.join(self.hass.config.config_dir, self.filename) - with open(filepath, "a", encoding="utf8") as file: - if os.stat(filepath).st_size == 0: - title = ( - f"{kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)} notifications (Log" - f" started: {dt_util.utcnow().isoformat()})\n{'-' * 80}\n" - ) - file.write(title) + filepath = self._file_path + try: + with open(filepath, "a", encoding="utf8") as file: + if os.stat(filepath).st_size == 0: + title = ( + f"{kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)} notifications (Log" + f" started: {dt_util.utcnow().isoformat()})\n{'-' * 80}\n" + ) + file.write(title) - if self.add_timestamp: - text = f"{dt_util.utcnow().isoformat()} {message}\n" - else: - text = f"{message}\n" - file.write(text) + if self.add_timestamp: + text = f"{dt_util.utcnow().isoformat()} {message}\n" + else: + text = f"{message}\n" + file.write(text) + except Exception as exc: + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="write_access_failed", + translation_placeholders={"filename": filepath, "exc": f"{exc!r}"}, + ) from exc diff --git a/homeassistant/components/file/sensor.py b/homeassistant/components/file/sensor.py index f70b0bce701edb..55ccc0965bc297 100644 --- a/homeassistant/components/file/sensor.py +++ b/homeassistant/components/file/sensor.py @@ -9,6 +9,7 @@ import voluptuous as vol from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity +from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( CONF_FILE_PATH, CONF_NAME, @@ -16,22 +17,21 @@ CONF_VALUE_TEMPLATE, ) from homeassistant.core import HomeAssistant -import homeassistant.helpers.config_validation as cv +from homeassistant.helpers import config_validation as cv from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.template import Template from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType +from homeassistant.util import slugify -_LOGGER = logging.getLogger(__name__) - -DEFAULT_NAME = "File" +from .const import DEFAULT_NAME, FILE_ICON -ICON = "mdi:file" +_LOGGER = logging.getLogger(__name__) PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_FILE_PATH): cv.isfile, vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, - vol.Optional(CONF_VALUE_TEMPLATE): cv.template, + vol.Optional(CONF_VALUE_TEMPLATE): cv.string, vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string, } ) @@ -42,26 +42,37 @@ async def async_setup_platform( config: ConfigType, async_add_entities: AddEntitiesCallback, discovery_info: DiscoveryInfoType | None = None, +) -> None: + """Set up the file sensor from YAML. + + The YAML platform config is automatically + imported to a config entry, this method can be removed + when YAML support is removed. + """ + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, ) -> None: """Set up the file sensor.""" + config = dict(entry.data) file_path: str = config[CONF_FILE_PATH] name: str = config[CONF_NAME] unit: str | None = config.get(CONF_UNIT_OF_MEASUREMENT) - value_template: Template | None = config.get(CONF_VALUE_TEMPLATE) + value_template: Template | None = None - if value_template is not None: - value_template.hass = hass + if CONF_VALUE_TEMPLATE in config: + value_template = Template(config[CONF_VALUE_TEMPLATE], hass) - if hass.config.is_allowed_path(file_path): - async_add_entities([FileSensor(name, file_path, unit, value_template)], True) - else: - _LOGGER.error("'%s' is not an allowed directory", file_path) + async_add_entities([FileSensor(name, file_path, unit, value_template)], True) class FileSensor(SensorEntity): """Implementation of a file sensor.""" - _attr_icon = ICON + _attr_icon = FILE_ICON def __init__( self, @@ -75,6 +86,7 @@ def __init__( self._file_path = file_path self._attr_native_unit_of_measurement = unit_of_measurement self._val_tpl = value_template + self._attr_unique_id = slugify(f"{name}_{file_path}") def update(self) -> None: """Get the latest entry from a file and updates the state.""" diff --git a/homeassistant/components/file/strings.json b/homeassistant/components/file/strings.json new file mode 100644 index 00000000000000..243695b79cb7ba --- /dev/null +++ b/homeassistant/components/file/strings.json @@ -0,0 +1,57 @@ +{ + "config": { + "step": { + "user": { + "description": "Make a choice", + "menu_options": { + "sensor": "Set up a file based sensor", + "notify": "Set up a notification service" + } + }, + "sensor": { + "title": "File sensor", + "description": "Set up a file based sensor", + "data": { + "name": "Name", + "file_path": "File path", + "value_template": "Value template", + "unit_of_measurement": "Unit of measurement" + }, + "data_description": { + "name": "Name of the file based sensor", + "file_path": "The local file path to retrieve the sensor value from", + "value_template": "A template to render the the sensors value based on the file content", + "unit_of_measurement": "Unit of measurement for the sensor" + } + }, + "notify": { + "title": "Notification to file service", + "description": "Set up a service that allows to write notification to a file.", + "data": { + "file_path": "[%key:component::file::config::step::sensor::data::file_path%]", + "name": "[%key:component::file::config::step::sensor::data::name%]", + "timestamp": "Timestamp" + }, + "data_description": { + "file_path": "A local file path to write the notification to", + "name": "Name of the notify service", + "timestamp": "Add a timestamp to the notification" + } + } + }, + "error": { + "not_allowed": "Access to the selected file path is not allowed" + }, + "abort": { + "already_configured": "[%key:common::config_flow::abort::already_configured_device%]" + } + }, + "exceptions": { + "dir_not_allowed": { + "message": "Access to {filename} is not allowed." + }, + "write_access_failed": { + "message": "Write access to {filename} failed: {exc}." + } + } +} diff --git a/homeassistant/generated/config_flows.py b/homeassistant/generated/config_flows.py index 134b1e80d98688..5657b171701fdb 100644 --- a/homeassistant/generated/config_flows.py +++ b/homeassistant/generated/config_flows.py @@ -164,6 +164,7 @@ "faa_delays", "fastdotcom", "fibaro", + "file", "filesize", "fireservicerota", "fitbit", diff --git a/homeassistant/generated/integrations.json b/homeassistant/generated/integrations.json index e16f29a14e288b..97fd6d30ecaad0 100644 --- a/homeassistant/generated/integrations.json +++ b/homeassistant/generated/integrations.json @@ -1815,7 +1815,7 @@ "file": { "name": "File", "integration_type": "hub", - "config_flow": false, + "config_flow": true, "iot_class": "local_polling" }, "filesize": { diff --git a/tests/components/file/conftest.py b/tests/components/file/conftest.py new file mode 100644 index 00000000000000..082483266a2c5e --- /dev/null +++ b/tests/components/file/conftest.py @@ -0,0 +1,34 @@ +"""Test fixtures for file platform.""" + +from collections.abc import Generator +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from homeassistant.core import HomeAssistant + + +@pytest.fixture +def mock_setup_entry() -> Generator[AsyncMock, None, None]: + """Override async_setup_entry.""" + with patch( + "homeassistant.components.file.async_setup_entry", return_value=True + ) as mock_setup_entry: + yield mock_setup_entry + + +@pytest.fixture +def is_allowed() -> bool: + """Parameterize mock_is_allowed_path, default True.""" + return True + + +@pytest.fixture +def mock_is_allowed_path( + hass: HomeAssistant, is_allowed: bool +) -> Generator[None, MagicMock]: + """Mock is_allowed_path method.""" + with patch.object( + hass.config, "is_allowed_path", return_value=is_allowed + ) as allowed_path_mock: + yield allowed_path_mock diff --git a/tests/components/file/test_config_flow.py b/tests/components/file/test_config_flow.py new file mode 100644 index 00000000000000..1378793f9bdb59 --- /dev/null +++ b/tests/components/file/test_config_flow.py @@ -0,0 +1,144 @@ +"""Tests for the file config flow.""" + +from typing import Any +from unittest.mock import AsyncMock + +import pytest + +from homeassistant import config_entries +from homeassistant.components.file import DOMAIN +from homeassistant.core import HomeAssistant +from homeassistant.data_entry_flow import FlowResultType + +from tests.common import MockConfigEntry + +MOCK_CONFIG_NOTIFY = { + "platform": "notify", + "file_path": "some_file", + "timestamp": True, + "name": "File", +} +MOCK_CONFIG_SENSOR = { + "platform": "sensor", + "file_path": "some/path", + "value_template": "{{ value | round(1) }}", + "name": "File", +} + +pytestmark = pytest.mark.usefixtures("mock_setup_entry") + + +@pytest.mark.parametrize( + ("platform", "data"), + [("sensor", MOCK_CONFIG_SENSOR), ("notify", MOCK_CONFIG_NOTIFY)], +) +async def test_form( + hass: HomeAssistant, + mock_setup_entry: AsyncMock, + mock_is_allowed_path: bool, + platform: str, + data: dict[str, Any], +) -> None: + """Test we get the form.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {"next_step_id": platform}, + ) + await hass.async_block_till_done() + + user_input = dict(data) + user_input.pop("platform") + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input=user_input + ) + await hass.async_block_till_done() + + assert result2["type"] is FlowResultType.CREATE_ENTRY + assert result2["data"] == data + assert len(mock_setup_entry.mock_calls) == 1 + + +@pytest.mark.parametrize( + ("platform", "data"), + [("sensor", MOCK_CONFIG_SENSOR), ("notify", MOCK_CONFIG_NOTIFY)], +) +async def test_already_configured( + hass: HomeAssistant, + mock_setup_entry: AsyncMock, + mock_is_allowed_path: bool, + platform: str, + data: dict[str, Any], +) -> None: + """Test aborting if the entry is already configured.""" + entry = MockConfigEntry(domain=DOMAIN, data=data) + entry.add_to_hass(hass) + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {"next_step_id": platform}, + ) + await hass.async_block_till_done() + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == platform + + user_input = dict(data) + user_input.pop("platform") + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input=user_input, + ) + await hass.async_block_till_done() + + assert result2["type"] is FlowResultType.ABORT + assert result2["reason"] == "already_configured" + + +@pytest.mark.parametrize("is_allowed", [False], ids=["not_allowed"]) +@pytest.mark.parametrize( + ("platform", "data"), + [("sensor", MOCK_CONFIG_SENSOR), ("notify", MOCK_CONFIG_NOTIFY)], +) +async def test_not_allowed( + hass: HomeAssistant, + mock_setup_entry: AsyncMock, + mock_is_allowed_path: bool, + platform: str, + data: dict[str, Any], +) -> None: + """Test aborting if the file path is not allowed.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {"next_step_id": platform}, + ) + await hass.async_block_till_done() + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == platform + + user_input = dict(data) + user_input.pop("platform") + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input=user_input, + ) + await hass.async_block_till_done() + + assert result2["type"] is FlowResultType.FORM + assert result2["errors"] == {"file_path": "not_allowed"} diff --git a/tests/components/file/test_notify.py b/tests/components/file/test_notify.py index 3077d71bdde6ab..f6d30c2f166196 100644 --- a/tests/components/file/test_notify.py +++ b/tests/components/file/test_notify.py @@ -1,18 +1,22 @@ """The tests for the notify file platform.""" import os -from unittest.mock import call, mock_open, patch +from typing import Any +from unittest.mock import MagicMock, call, mock_open, patch from freezegun.api import FrozenDateTimeFactory import pytest from homeassistant.components import notify +from homeassistant.components.file import DOMAIN from homeassistant.components.notify import ATTR_TITLE_DEFAULT from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ServiceValidationError +from homeassistant.helpers.typing import ConfigType from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util -from tests.common import assert_setup_component +from tests.common import MockConfigEntry, assert_setup_component async def test_bad_config(hass: HomeAssistant) -> None: @@ -25,33 +29,60 @@ async def test_bad_config(hass: HomeAssistant) -> None: @pytest.mark.parametrize( - "timestamp", + ("domain", "service", "params"), [ - False, - True, + (notify.DOMAIN, "test", {"message": "one, two, testing, testing"}), ], + ids=["legacy"], +) +@pytest.mark.parametrize( + ("timestamp", "config"), + [ + ( + False, + { + "notify": [ + { + "name": "test", + "platform": "file", + "filename": "mock_file", + } + ] + }, + ), + ( + True, + { + "notify": [ + { + "name": "test", + "platform": "file", + "filename": "mock_file", + "timestamp": True, + } + ] + }, + ), + ], + ids=["no_timestamp", "timestamp"], ) async def test_notify_file( - hass: HomeAssistant, freezer: FrozenDateTimeFactory, timestamp: bool + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + timestamp: bool, + mock_is_allowed_path: MagicMock, + config: ConfigType, + domain: str, + service: str, + params: dict[str, str], ) -> None: """Test the notify file output.""" filename = "mock_file" - message = "one, two, testing, testing" - with assert_setup_component(1) as handle_config: - assert await async_setup_component( - hass, - notify.DOMAIN, - { - "notify": { - "name": "test", - "platform": "file", - "filename": filename, - "timestamp": timestamp, - } - }, - ) - await hass.async_block_till_done() - assert handle_config[notify.DOMAIN] + message = params["message"] + assert await async_setup_component(hass, notify.DOMAIN, config) + await hass.async_block_till_done() + assert await async_setup_component(hass, DOMAIN, config) + await hass.async_block_till_done(wait_background_tasks=True) freezer.move_to(dt_util.utcnow()) @@ -66,9 +97,7 @@ async def test_notify_file( f"(Log started: {dt_util.utcnow().isoformat()})\n{'-' * 80}\n" ) - await hass.services.async_call( - "notify", "test", {"message": message}, blocking=True - ) + await hass.services.async_call(domain, service, params, blocking=True) full_filename = os.path.join(hass.config.path(), filename) assert m_open.call_count == 1 @@ -85,3 +114,210 @@ async def test_notify_file( call(title), call(f"{dt_util.utcnow().isoformat()} {message}\n"), ] + + +@pytest.mark.parametrize( + ("domain", "service", "params"), + [(notify.DOMAIN, "test", {"message": "one, two, testing, testing"})], + ids=["legacy"], +) +@pytest.mark.parametrize( + ("is_allowed", "config"), + [ + ( + True, + { + "notify": [ + { + "name": "test", + "platform": "file", + "filename": "mock_file", + } + ] + }, + ), + ], + ids=["allowed_but_access_failed"], +) +async def test_legacy_notify_file_exception( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_is_allowed_path: MagicMock, + config: ConfigType, + domain: str, + service: str, + params: dict[str, str], +) -> None: + """Test legacy notify file output has exception.""" + assert await async_setup_component(hass, notify.DOMAIN, config) + await hass.async_block_till_done() + assert await async_setup_component(hass, DOMAIN, config) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.move_to(dt_util.utcnow()) + + m_open = mock_open() + with ( + patch("homeassistant.components.file.notify.open", m_open, create=True), + patch("homeassistant.components.file.notify.os.stat") as mock_st, + ): + mock_st.side_effect = OSError("Access Failed") + with pytest.raises(ServiceValidationError) as exc: + await hass.services.async_call(domain, service, params, blocking=True) + assert f"{exc.value!r}" == "ServiceValidationError('write_access_failed')" + + +@pytest.mark.parametrize( + ("timestamp", "data"), + [ + ( + False, + { + "name": "test", + "platform": "notify", + "file_path": "mock_file", + "timestamp": False, + }, + ), + ( + True, + { + "name": "test", + "platform": "notify", + "file_path": "mock_file", + "timestamp": True, + }, + ), + ], + ids=["no_timestamp", "timestamp"], +) +async def test_legacy_notify_file_entry_only_setup( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + timestamp: bool, + mock_is_allowed_path: MagicMock, + data: dict[str, Any], +) -> None: + """Test the legacy notify file output in entry only setup.""" + filename = "mock_file" + + domain = notify.DOMAIN + service = "test" + params = {"message": "one, two, testing, testing"} + message = params["message"] + + entry = MockConfigEntry( + domain=DOMAIN, data=data, title=f"test [{data['file_path']}]" + ) + entry.add_to_hass(hass) + await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.move_to(dt_util.utcnow()) + + m_open = mock_open() + with ( + patch("homeassistant.components.file.notify.open", m_open, create=True), + patch("homeassistant.components.file.notify.os.stat") as mock_st, + ): + mock_st.return_value.st_size = 0 + title = ( + f"{ATTR_TITLE_DEFAULT} notifications " + f"(Log started: {dt_util.utcnow().isoformat()})\n{'-' * 80}\n" + ) + + await hass.services.async_call(domain, service, params, blocking=True) + + assert m_open.call_count == 1 + assert m_open.call_args == call(filename, "a", encoding="utf8") + + assert m_open.return_value.write.call_count == 2 + if not timestamp: + assert m_open.return_value.write.call_args_list == [ + call(title), + call(f"{message}\n"), + ] + else: + assert m_open.return_value.write.call_args_list == [ + call(title), + call(f"{dt_util.utcnow().isoformat()} {message}\n"), + ] + + +@pytest.mark.parametrize( + ("is_allowed", "config"), + [ + ( + False, + { + "name": "test", + "platform": "notify", + "file_path": "mock_file", + "timestamp": False, + }, + ), + ], + ids=["not_allowed"], +) +async def test_legacy_notify_file_not_allowed( + hass: HomeAssistant, + caplog: pytest.LogCaptureFixture, + mock_is_allowed_path: MagicMock, + config: dict[str, Any], +) -> None: + """Test legacy notify file output not allowed.""" + entry = MockConfigEntry( + domain=DOMAIN, data=config, title=f"test [{config['file_path']}]" + ) + entry.add_to_hass(hass) + assert not await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done(wait_background_tasks=True) + assert "is not allowed" in caplog.text + + +@pytest.mark.parametrize( + ("data", "is_allowed"), + [ + ( + { + "name": "test", + "platform": "notify", + "file_path": "mock_file", + "timestamp": False, + }, + True, + ), + ], + ids=["not_allowed"], +) +async def test_notify_file_write_access_failed( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_is_allowed_path: MagicMock, + data: dict[str, Any], +) -> None: + """Test the notify file fails.""" + domain = notify.DOMAIN + service = "test" + params = {"message": "one, two, testing, testing"} + + entry = MockConfigEntry( + domain=DOMAIN, data=data, title=f"test [{data['file_path']}]" + ) + entry.add_to_hass(hass) + await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.move_to(dt_util.utcnow()) + + m_open = mock_open() + with ( + patch("homeassistant.components.file.notify.open", m_open, create=True), + patch("homeassistant.components.file.notify.os.stat") as mock_st, + ): + mock_st.side_effect = OSError("Access Failed") + with pytest.raises(ServiceValidationError) as exc: + await hass.services.async_call(domain, service, params, blocking=True) + assert f"{exc.value!r}" == "ServiceValidationError('write_access_failed')" diff --git a/tests/components/file/test_sensor.py b/tests/components/file/test_sensor.py index 8acdc3242095c9..d2059f4d56405a 100644 --- a/tests/components/file/test_sensor.py +++ b/tests/components/file/test_sensor.py @@ -1,18 +1,23 @@ """The tests for local file sensor platform.""" -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch +import pytest + +from homeassistant.components.file import DOMAIN from homeassistant.const import STATE_UNKNOWN from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component -from tests.common import get_fixture_path +from tests.common import MockConfigEntry, get_fixture_path @patch("os.path.isfile", Mock(return_value=True)) @patch("os.access", Mock(return_value=True)) -async def test_file_value(hass: HomeAssistant) -> None: - """Test the File sensor.""" +async def test_file_value_yaml_setup( + hass: HomeAssistant, mock_is_allowed_path: MagicMock +) -> None: + """Test the File sensor from YAML setup.""" config = { "sensor": { "platform": "file", @@ -21,9 +26,30 @@ async def test_file_value(hass: HomeAssistant) -> None: } } - with patch.object(hass.config, "is_allowed_path", return_value=True): - assert await async_setup_component(hass, "sensor", config) - await hass.async_block_till_done() + assert await async_setup_component(hass, "sensor", config) + await hass.async_block_till_done() + + state = hass.states.get("sensor.file1") + assert state.state == "21" + + +@patch("os.path.isfile", Mock(return_value=True)) +@patch("os.access", Mock(return_value=True)) +async def test_file_value_entry_setup( + hass: HomeAssistant, mock_is_allowed_path: MagicMock +) -> None: + """Test the File sensor from an entry setup.""" + data = { + "platform": "sensor", + "name": "file1", + "file_path": get_fixture_path("file_value.txt", "file"), + } + + entry = MockConfigEntry( + domain=DOMAIN, data=data, title=f"test [{data['file_path']}]" + ) + entry.add_to_hass(hass) + await hass.config_entries.async_setup(entry.entry_id) state = hass.states.get("sensor.file1") assert state.state == "21" @@ -31,20 +57,22 @@ async def test_file_value(hass: HomeAssistant) -> None: @patch("os.path.isfile", Mock(return_value=True)) @patch("os.access", Mock(return_value=True)) -async def test_file_value_template(hass: HomeAssistant) -> None: +async def test_file_value_template( + hass: HomeAssistant, mock_is_allowed_path: MagicMock +) -> None: """Test the File sensor with JSON entries.""" - config = { - "sensor": { - "platform": "file", - "name": "file2", - "file_path": get_fixture_path("file_value_template.txt", "file"), - "value_template": "{{ value_json.temperature }}", - } + data = { + "platform": "sensor", + "name": "file2", + "file_path": get_fixture_path("file_value_template.txt", "file"), + "value_template": "{{ value_json.temperature }}", } - with patch.object(hass.config, "is_allowed_path", return_value=True): - assert await async_setup_component(hass, "sensor", config) - await hass.async_block_till_done() + entry = MockConfigEntry( + domain=DOMAIN, data=data, title=f"test [{data['file_path']}]" + ) + entry.add_to_hass(hass) + await hass.config_entries.async_setup(entry.entry_id) state = hass.states.get("sensor.file2") assert state.state == "26" @@ -52,19 +80,19 @@ async def test_file_value_template(hass: HomeAssistant) -> None: @patch("os.path.isfile", Mock(return_value=True)) @patch("os.access", Mock(return_value=True)) -async def test_file_empty(hass: HomeAssistant) -> None: +async def test_file_empty(hass: HomeAssistant, mock_is_allowed_path: MagicMock) -> None: """Test the File sensor with an empty file.""" - config = { - "sensor": { - "platform": "file", - "name": "file3", - "file_path": get_fixture_path("file_empty.txt", "file"), - } + data = { + "platform": "sensor", + "name": "file3", + "file_path": get_fixture_path("file_empty.txt", "file"), } - with patch.object(hass.config, "is_allowed_path", return_value=True): - assert await async_setup_component(hass, "sensor", config) - await hass.async_block_till_done() + entry = MockConfigEntry( + domain=DOMAIN, data=data, title=f"test [{data['file_path']}]" + ) + entry.add_to_hass(hass) + await hass.config_entries.async_setup(entry.entry_id) state = hass.states.get("sensor.file3") assert state.state == STATE_UNKNOWN @@ -72,18 +100,21 @@ async def test_file_empty(hass: HomeAssistant) -> None: @patch("os.path.isfile", Mock(return_value=True)) @patch("os.access", Mock(return_value=True)) -async def test_file_path_invalid(hass: HomeAssistant) -> None: +@pytest.mark.parametrize("is_allowed", [False]) +async def test_file_path_invalid( + hass: HomeAssistant, mock_is_allowed_path: MagicMock +) -> None: """Test the File sensor with invalid path.""" - config = { - "sensor": { - "platform": "file", - "name": "file4", - "file_path": get_fixture_path("file_value.txt", "file"), - } + data = { + "platform": "sensor", + "name": "file4", + "file_path": get_fixture_path("file_value.txt", "file"), } - with patch.object(hass.config, "is_allowed_path", return_value=False): - assert await async_setup_component(hass, "sensor", config) - await hass.async_block_till_done() + entry = MockConfigEntry( + domain=DOMAIN, data=data, title=f"test [{data['file_path']}]" + ) + entry.add_to_hass(hass) + await hass.config_entries.async_setup(entry.entry_id) assert len(hass.states.async_entity_ids("sensor")) == 0