diff --git a/test/io_tests/knxip_interface_test.py b/test/io_tests/knxip_interface_test.py index fda25e65e..3c16de79b 100644 --- a/test/io_tests/knxip_interface_test.py +++ b/test/io_tests/knxip_interface_test.py @@ -1,11 +1,23 @@ """Unit test for KNX/IP Interface.""" +import os import threading from unittest.mock import DEFAULT, Mock, patch +import pytest + from xknx import XKNX -from xknx.io import ConnectionConfig, ConnectionType, knx_interface_factory +from xknx.exceptions.exception import ( + InterfaceWithUserIdNotFound, + InvalidSecureConfiguration, +) +from xknx.io import ( + ConnectionConfig, + ConnectionType, + SecureConfig, + knx_interface_factory, +) from xknx.io.routing import Routing -from xknx.io.tunnel import TCPTunnel, UDPTunnel +from xknx.io.tunnel import SecureTunnel, TCPTunnel, UDPTunnel class TestKNXIPInterface: @@ -171,3 +183,174 @@ def assert_thread(*args, **kwargs): await interface.stop() disconnect_routing_mock.assert_called_once_with() assert interface._interface is None + + async def test_start_secure_connection_knx_keys(self): + """Test starting a secure connection from a knxkeys file.""" + gateway_ip = "192.168.1.1" + knxkeys_file = os.path.join( + os.path.dirname(__file__), "resources/testcase.knxkeys" + ) + connection_config = ConnectionConfig( + connection_type=ConnectionType.TUNNELING_TCP_SECURE, + gateway_ip=gateway_ip, + secure_config=SecureConfig( + user_id=4, knxkeys_file_path=knxkeys_file, knxkeys_password="password" + ), + ) + with patch( + "xknx.io.KNXIPInterface._start_secure_tunnelling_tcp" + ) as start_secure_tunnel: + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() + start_secure_tunnel.assert_called_once_with( + gateway_ip="192.168.1.1", + gateway_port=3671, + auto_reconnect=True, + auto_reconnect_wait=3, + user_id=4, + user_password="user2", + device_authentication_password="authenticationcode", + ) + with patch("xknx.io.tunnel.SecureTunnel.connect") as connect_secure: + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() + assert isinstance(interface._interface, SecureTunnel) + assert interface._interface.gateway_ip == gateway_ip + assert interface._interface.gateway_port == 3671 + assert interface._interface.auto_reconnect is True + assert interface._interface.auto_reconnect_wait == 3 + assert interface._interface._user_id == 4 + assert interface._interface._user_password == "user2" + assert ( + interface._interface._device_authentication_password + == "authenticationcode" + ) + assert ( # pylint: disable=comparison-with-callable + interface._interface.telegram_received_callback + == interface.telegram_received + ) + connect_secure.assert_called_once_with() + + async def test_start_secure_connection_knx_keys_first_interface(self): + """Test starting a secure connection from a knxkeys file.""" + gateway_ip = "192.168.1.1" + knxkeys_file = os.path.join( + os.path.dirname(__file__), "resources/testcase.knxkeys" + ) + connection_config = ConnectionConfig( + connection_type=ConnectionType.TUNNELING_TCP_SECURE, + gateway_ip=gateway_ip, + secure_config=SecureConfig( + knxkeys_file_path=knxkeys_file, knxkeys_password="password" + ), + ) + with patch( + "xknx.io.KNXIPInterface._start_secure_tunnelling_tcp" + ) as start_secure_tunnel: + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() + start_secure_tunnel.assert_called_once_with( + gateway_ip="192.168.1.1", + gateway_port=3671, + auto_reconnect=True, + auto_reconnect_wait=3, + user_id=3, + user_password="user1", + device_authentication_password="authenticationcode", + ) + with patch("xknx.io.tunnel.SecureTunnel.connect") as connect_secure: + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() + assert isinstance(interface._interface, SecureTunnel) + assert interface._interface.gateway_ip == gateway_ip + assert interface._interface.gateway_port == 3671 + assert interface._interface.auto_reconnect is True + assert interface._interface.auto_reconnect_wait == 3 + assert interface._interface._user_id == 3 + assert interface._interface._user_password == "user1" + assert ( + interface._interface._device_authentication_password + == "authenticationcode" + ) + assert ( # pylint: disable=comparison-with-callable + interface._interface.telegram_received_callback + == interface.telegram_received + ) + connect_secure.assert_called_once_with() + + async def test_start_secure_with_manual_passwords(self): + """Test starting a secure connection from manual passwords.""" + gateway_ip = "192.168.1.1" + connection_config = ConnectionConfig( + connection_type=ConnectionType.TUNNELING_TCP_SECURE, + gateway_ip=gateway_ip, + secure_config=SecureConfig( + user_id=3, + device_authentication_password="authenticationcode", + user_password="user1", + ), + ) + with patch( + "xknx.io.KNXIPInterface._start_secure_tunnelling_tcp" + ) as start_secure_tunnel: + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() + start_secure_tunnel.assert_called_once_with( + gateway_ip="192.168.1.1", + gateway_port=3671, + auto_reconnect=True, + auto_reconnect_wait=3, + user_id=3, + user_password="user1", + device_authentication_password="authenticationcode", + ) + with patch("xknx.io.tunnel.SecureTunnel.connect") as connect_secure: + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() + assert isinstance(interface._interface, SecureTunnel) + assert interface._interface.gateway_ip == gateway_ip + assert interface._interface.gateway_port == 3671 + assert interface._interface.auto_reconnect is True + assert interface._interface.auto_reconnect_wait == 3 + assert interface._interface._user_id == 3 + assert interface._interface._user_password == "user1" + assert ( + interface._interface._device_authentication_password + == "authenticationcode" + ) + assert ( # pylint: disable=comparison-with-callable + interface._interface.telegram_received_callback + == interface.telegram_received + ) + connect_secure.assert_called_once_with() + + async def test_invalid_user_id_secure_error(self): + """Test ip secure.""" + gateway_ip = "192.168.1.1" + knxkeys_file = os.path.join( + os.path.dirname(__file__), "resources/testcase.knxkeys" + ) + connection_config = ConnectionConfig( + connection_type=ConnectionType.TUNNELING_TCP_SECURE, + gateway_ip=gateway_ip, + secure_config=SecureConfig( + user_id=12, knxkeys_file_path=knxkeys_file, knxkeys_password="password" + ), + ) + with pytest.raises(InterfaceWithUserIdNotFound): + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() + + async def test_invalid_user_password(self): + """Test ip secure.""" + gateway_ip = "192.168.1.1" + connection_config = ConnectionConfig( + connection_type=ConnectionType.TUNNELING_TCP_SECURE, + gateway_ip=gateway_ip, + secure_config=SecureConfig( + user_id=1, + ), + ) + with pytest.raises(InvalidSecureConfiguration): + interface = knx_interface_factory(self.xknx, connection_config) + await interface.start() diff --git a/test/io_tests/resources/testcase.knxkeys b/test/io_tests/resources/testcase.knxkeys new file mode 100644 index 000000000..070f2307c --- /dev/null +++ b/test/io_tests/resources/testcase.knxkeys @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/test/secure_tests/keyring_test.py b/test/secure_tests/keyring_test.py index 8d96d8aa8..1f7242ba2 100644 --- a/test/secure_tests/keyring_test.py +++ b/test/secure_tests/keyring_test.py @@ -1,6 +1,9 @@ """Unit test for keyring reader.""" import os +import pytest + +from xknx.exceptions.exception import InvalidSignature from xknx.secure import Keyring, load_key_ring from xknx.secure.keyring import XMLDevice, XMLInterface, verify_keyring_signature @@ -42,6 +45,11 @@ def test_verify_signature(self): assert verify_keyring_signature(self.keyring_test_file, "pwd") assert verify_keyring_signature(self.testcase_file, "password") + def test_invalid_signature(self): + """Test invalid signature throws error.""" + with pytest.raises(InvalidSignature): + load_key_ring(self.testcase_file, "wrong_password") + @staticmethod def assert_interface(keyring: Keyring, password: str, user: str) -> None: """Verify password for given user.""" diff --git a/xknx/io/knxip_interface.py b/xknx/io/knxip_interface.py index defd877e1..6465e60d5 100644 --- a/xknx/io/knxip_interface.py +++ b/xknx/io/knxip_interface.py @@ -214,6 +214,7 @@ async def _start_secure_tunnelling_tcp( user_id=user_id, user_password=user_password, device_authentication_password=device_authentication_password, + telegram_received_callback=self.telegram_received, ) await self._interface.connect()