Skip to content

Commit

Permalink
Initial draft of statistics (#49852)
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery committed May 16, 2021
1 parent 703456a commit 89dd329
Show file tree
Hide file tree
Showing 15 changed files with 774 additions and 56 deletions.
1 change: 1 addition & 0 deletions .strict-typing
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ homeassistant.components.persistent_notification.*
homeassistant.components.proximity.*
homeassistant.components.recorder.purge
homeassistant.components.recorder.repack
homeassistant.components.recorder.statistics
homeassistant.components.remote.*
homeassistant.components.scene.*
homeassistant.components.sensor.*
Expand Down
2 changes: 1 addition & 1 deletion homeassistant/components/history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

@deprecated_function("homeassistant.components.recorder.history.get_significant_states")
def get_significant_states(hass, *args, **kwargs):
"""Wrap _get_significant_states with a sql session."""
"""Wrap _get_significant_states with an sql session."""
return history.get_significant_states(hass, *args, **kwargs)


Expand Down
68 changes: 60 additions & 8 deletions homeassistant/components/recorder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@
INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER,
convert_include_exclude_filter,
)
from homeassistant.helpers.event import async_track_time_interval, track_time_change
from homeassistant.helpers.event import (
async_track_time_change,
async_track_time_interval,
)
from homeassistant.helpers.integration_platform import (
async_process_integration_platforms,
)
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import bind_hass
import homeassistant.util.dt as dt_util

from . import history, migration, purge
from . import history, migration, purge, statistics
from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX
from .models import Base, Events, RecorderRuns, States
from .pool import RecorderPool
Expand All @@ -56,6 +62,7 @@
_LOGGER = logging.getLogger(__name__)

SERVICE_PURGE = "purge"
SERVICE_STATISTICS = "statistics"
SERVICE_ENABLE = "enable"
SERVICE_DISABLE = "disable"

Expand Down Expand Up @@ -194,6 +201,7 @@ def run_information_with_session(session, point_in_time: datetime | None = None)

async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the recorder."""
hass.data[DOMAIN] = {}
conf = config[DOMAIN]
entity_filter = convert_include_exclude_filter(conf)
auto_purge = conf[CONF_AUTO_PURGE]
Expand Down Expand Up @@ -221,10 +229,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
instance.start()
_async_register_services(hass, instance)
history.async_setup(hass)
statistics.async_setup(hass)
await async_process_integration_platforms(hass, DOMAIN, _process_recorder_platform)

return await instance.async_db_ready


async def _process_recorder_platform(hass, domain, platform):
"""Process a recorder platform."""
hass.data[DOMAIN][domain] = platform


@callback
def _async_register_services(hass, instance):
"""Register recorder services."""
Expand Down Expand Up @@ -263,6 +278,12 @@ class PurgeTask(NamedTuple):
apply_filter: bool


class StatisticsTask(NamedTuple):
"""An object to insert into the recorder queue to run a statistics task."""

start: datetime.datetime


class WaitTask:
"""An object to insert into the recorder queue to tell it set the _queue_watch event."""

Expand Down Expand Up @@ -389,6 +410,13 @@ def do_adhoc_purge(self, **kwargs):

self.queue.put(PurgeTask(keep_days, repack, apply_filter))

def do_adhoc_statistics(self, **kwargs):
"""Trigger an adhoc statistics run."""
start = kwargs.get("start")
if not start:
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))

@callback
def async_register(self, shutdown_task, hass_started):
"""Post connection initialize."""
Expand Down Expand Up @@ -451,14 +479,33 @@ def async_connection_success(self):

@callback
def _async_recorder_ready(self):
"""Mark recorder ready."""
"""Finish start and mark recorder ready."""
self._async_setup_periodic_tasks()
self.async_recorder_ready.set()

@callback
def async_purge(self, now):
"""Trigger the purge."""
self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False))

@callback
def async_hourly_statistics(self, now):
"""Trigger the hourly statistics run."""
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))

def _async_setup_periodic_tasks(self):
"""Prepare periodic tasks."""
if self.auto_purge:
# Purge every night at 4:12am
async_track_time_change(
self.hass, self.async_purge, hour=4, minute=12, second=0
)
# Compile hourly statistics every hour at *:12
async_track_time_change(
self.hass, self.async_hourly_statistics, minute=12, second=0
)

def run(self):
"""Start processing events to save."""
shutdown_task = object()
Expand Down Expand Up @@ -507,11 +554,6 @@ def run(self):
self._shutdown()
return

# Start periodic purge
if self.auto_purge:
# Purge every night at 4:12am
track_time_change(self.hass, self.async_purge, hour=4, minute=12, second=0)

_LOGGER.debug("Recorder processing the queue")
self.hass.add_job(self._async_recorder_ready)
self._run_event_loop()
Expand Down Expand Up @@ -608,11 +650,21 @@ def _run_purge(self, keep_days, repack, apply_filter):
# Schedule a new purge task if this one didn't finish
self.queue.put(PurgeTask(keep_days, repack, apply_filter))

def _run_statistics(self, start):
"""Run statistics task."""
if statistics.compile_statistics(self, start):
return
# Schedule a new statistics task if this one didn't finish
self.queue.put(StatisticsTask(start))

def _process_one_event(self, event):
"""Process one event."""
if isinstance(event, PurgeTask):
self._run_purge(event.keep_days, event.repack, event.apply_filter)
return
if isinstance(event, StatisticsTask):
self._run_statistics(event.start)
return
if isinstance(event, WaitTask):
self._queue_watch.set()
return
Expand Down
2 changes: 1 addition & 1 deletion homeassistant/components/recorder/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
States.last_updated,
]

HISTORY_BAKERY = "history_bakery"
HISTORY_BAKERY = "recorder_history_bakery"


def async_setup(hass):
Expand Down
44 changes: 43 additions & 1 deletion homeassistant/components/recorder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Index,
Integer,
Expand Down Expand Up @@ -38,7 +39,15 @@
TABLE_RECORDER_RUNS = "recorder_runs"
TABLE_SCHEMA_CHANGES = "schema_changes"

ALL_TABLES = [TABLE_STATES, TABLE_EVENTS, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES]
TABLE_STATISTICS = "statistics"

ALL_TABLES = [
TABLE_STATES,
TABLE_EVENTS,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
TABLE_STATISTICS,
]

DATETIME_TYPE = DateTime(timezone=True).with_variant(
mysql.DATETIME(timezone=True, fsp=6), "mysql"
Expand Down Expand Up @@ -198,6 +207,39 @@ def to_native(self, validate_entity_id=True):
return None


class Statistics(Base): # type: ignore
"""Statistics."""

__table_args__ = {
"mysql_default_charset": "utf8mb4",
"mysql_collate": "utf8mb4_unicode_ci",
}
__tablename__ = TABLE_STATISTICS
id = Column(Integer, primary_key=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
source = Column(String(32))
statistic_id = Column(String(255))
start = Column(DATETIME_TYPE, index=True)
mean = Column(Float())
min = Column(Float())
max = Column(Float())

__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "statistic_id", "start"),
)

@staticmethod
def from_stats(source, statistic_id, start, stats):
"""Create object from a statistics."""
return Statistics(
source=source,
statistic_id=statistic_id,
start=start,
**stats,
)


class RecorderRuns(Base): # type: ignore
"""Representation of recorder run."""

Expand Down
58 changes: 20 additions & 38 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@

from datetime import datetime, timedelta
import logging
import time
from typing import TYPE_CHECKING

from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct

Expand All @@ -15,20 +13,15 @@
from .const import MAX_ROWS_TO_PURGE
from .models import Events, RecorderRuns, States
from .repack import repack_database
from .util import session_scope
from .util import retryable_database_job, session_scope

if TYPE_CHECKING:
from . import Recorder

_LOGGER = logging.getLogger(__name__)

# Retry when one of the following MySQL errors occurred:
RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213)
# 1205: Lock wait timeout exceeded; try restarting transaction
# 1206: The total number of locks exceeds the lock table size
# 1213: Deadlock found when trying to get lock; try restarting transaction


@retryable_database_job("purge")
def purge_old_data(
instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False
) -> bool:
Expand All @@ -41,36 +34,25 @@ def purge_old_data(
"Purging states and events before target %s",
purge_before.isoformat(sep=" ", timespec="seconds"),
)
try:
with session_scope(session=instance.get_session()) as session: # type: ignore
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
if state_ids:
_purge_state_ids(session, state_ids)
if event_ids:
_purge_event_ids(session, event_ids)
# If states or events purging isn't processing the purge_before yet,
# return false, as we are not done yet.
_LOGGER.debug("Purging hasn't fully completed yet")
return False
if apply_filter and _purge_filtered_data(instance, session) is False:
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
return False
_purge_old_recorder_runs(instance, session, purge_before)
if repack:
repack_database(instance)
except OperationalError as err:
if (
instance.engine.dialect.name == "mysql"
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
):
_LOGGER.info("%s; purge not completed, retrying", err.orig.args[1])
time.sleep(instance.db_retry_wait)
return False

_LOGGER.warning("Error purging history: %s", err)

with session_scope(session=instance.get_session()) as session: # type: ignore
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
if state_ids:
_purge_state_ids(session, state_ids)
if event_ids:
_purge_event_ids(session, event_ids)
# If states or events purging isn't processing the purge_before yet,
# return false, as we are not done yet.
_LOGGER.debug("Purging hasn't fully completed yet")
return False
if apply_filter and _purge_filtered_data(instance, session) is False:
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
return False
_purge_old_recorder_runs(instance, session, purge_before)
if repack:
repack_database(instance)
return True


Expand Down

0 comments on commit 89dd329

Please sign in to comment.