Files
haos_config/custom_components/alarmo/automations.py
2026-01-30 23:31:00 -06:00

381 lines
13 KiB
Python

"""Automations."""
import re
import copy
import logging
from homeassistant.core import (
HomeAssistant,
callback,
)
from homeassistant.const import (
CONF_TYPE,
ATTR_SERVICE,
ATTR_ENTITY_ID,
CONF_SERVICE_DATA,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.template import Template, is_template_string
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.translation import async_get_translations
from homeassistant.components.binary_sensor.device_condition import (
ENTITY_CONDITIONS,
)
from . import const
from .helpers import (
friendly_name_for_entity_id,
)
from .sensors import (
STATE_OPEN,
STATE_CLOSED,
STATE_UNAVAILABLE,
)
from .alarm_control_panel import AlarmoBaseEntity
_LOGGER = logging.getLogger(__name__)
EVENT_ARM_FAILURE = "arm_failure"
def validate_area(trigger, area_id, hass):
"""Validate area for trigger."""
if const.ATTR_AREA not in trigger:
return False
elif trigger[const.ATTR_AREA]:
return trigger[const.ATTR_AREA] == area_id
elif len(hass.data[const.DOMAIN]["areas"]) == 1:
return True
else:
return area_id is None
def validate_modes(trigger, mode):
"""Validate modes for trigger."""
if const.ATTR_MODES not in trigger:
return False
elif not trigger[const.ATTR_MODES]:
return True
else:
return mode in trigger[const.ATTR_MODES]
def validate_trigger(trigger, to_state, from_state=None):
"""Validate trigger condition."""
if const.ATTR_EVENT not in trigger:
return False
elif trigger[const.ATTR_EVENT] == "untriggered" and from_state == "triggered":
return True
elif trigger[const.ATTR_EVENT] == to_state:
return True
else:
return False
class AutomationHandler:
"""Handle automations."""
def __init__(self, hass: HomeAssistant):
"""Initialize automation handler."""
self.hass = hass
self._config = None
self._subscriptions = []
self._sensorTranslationCache = {}
self._alarmTranslationCache = {}
self._sensorTranslationLang = None
self._alarmTranslationLang = None
def async_update_config():
"""Automation config updated, reload the configuration."""
self._config = self.hass.data[const.DOMAIN][
"coordinator"
].store.async_get_automations()
self._subscriptions.append(
async_dispatcher_connect(
hass, "alarmo_automations_updated", async_update_config
)
)
async_update_config()
@callback
async def async_alarm_state_changed(
area_id: str, old_state: str, new_state: str
):
if not old_state:
# ignore automations at startup/restoring
return
if area_id:
alarm_entity = self.hass.data[const.DOMAIN]["areas"][area_id]
else:
alarm_entity = self.hass.data[const.DOMAIN]["master"]
if not alarm_entity:
return
_LOGGER.debug(
"state of %s is updated from %s to %s",
alarm_entity.entity_id,
old_state,
new_state,
)
if new_state in const.ARM_MODES:
# we don't distinguish between armed modes for automations
# they are handled separately
new_state = "armed"
for automation_id, config in self._config.items():
if not config[const.ATTR_ENABLED]:
continue
for trigger in config[const.ATTR_TRIGGERS]:
if (
validate_area(trigger, area_id, self.hass)
and validate_modes(trigger, alarm_entity._arm_mode)
and validate_trigger(trigger, new_state, old_state)
):
await self.async_execute_automation(automation_id, alarm_entity)
self._subscriptions.append(
async_dispatcher_connect(
self.hass, "alarmo_state_updated", async_alarm_state_changed
)
)
@callback
async def async_handle_event(event: str, area_id: str, args: dict = {}):
if event != const.EVENT_FAILED_TO_ARM:
return
if area_id:
alarm_entity = self.hass.data[const.DOMAIN]["areas"][area_id]
else:
alarm_entity = self.hass.data[const.DOMAIN]["master"]
_LOGGER.debug(
"%s has failed to arm",
alarm_entity.entity_id,
)
for automation_id, config in self._config.items():
if not config[const.ATTR_ENABLED]:
continue
for trigger in config[const.ATTR_TRIGGERS]:
if (
validate_area(trigger, area_id, self.hass)
and validate_modes(trigger, alarm_entity._arm_mode)
and validate_trigger(trigger, EVENT_ARM_FAILURE)
):
await self.async_execute_automation(automation_id, alarm_entity)
self._subscriptions.append(
async_dispatcher_connect(self.hass, "alarmo_event", async_handle_event)
)
def __del__(self):
"""Prepare for removal."""
while len(self._subscriptions):
self._subscriptions.pop()()
async def async_execute_automation(
self, automation_id: str, alarm_entity: AlarmoBaseEntity
):
"""Execute the specified automation."""
# automation is a dict of AutomationEntry
_LOGGER.debug(
"Executing automation %s",
automation_id,
)
actions = self._config[automation_id][const.ATTR_ACTIONS]
for action in actions:
try:
service_data = copy.copy(action[CONF_SERVICE_DATA])
if action.get(ATTR_ENTITY_ID):
service_data[ATTR_ENTITY_ID] = action[ATTR_ENTITY_ID]
if self._config[automation_id][CONF_TYPE] == const.ATTR_NOTIFICATION:
# replace wildcards within service_data struct
for key, val in service_data.items():
if type(val) is str:
service_data[key] = await self.replace_wildcards_in_string(
val, alarm_entity
)
elif type(val) is dict:
for subkey, subval in service_data[key].items():
if type(subval) is str:
service_data[key][
subkey
] = await self.replace_wildcards_in_string(
subval, alarm_entity
)
domain, service = action[ATTR_SERVICE].split(".")
await self.hass.async_create_task(
self.hass.services.async_call(
domain,
service,
service_data,
blocking=False,
context={},
)
)
except HomeAssistantError as e:
_LOGGER.error(
"Execution of action %s failed, reason: %s",
automation_id,
e,
)
def get_automations_by_area(self, area_id: str):
"""Get automations for specified area."""
result = []
for automation_id, config in self._config.items():
if any(
el[const.ATTR_AREA] == area_id for el in config[const.ATTR_TRIGGERS]
):
result.append(automation_id)
return result
async def replace_wildcards_in_string(
self, input: str, alarm_entity: AlarmoBaseEntity
):
"""Look for wildcards in string and replace them with content."""
# process wildcard '{{open_sensors}}'
res = re.search(r"{{open_sensors(\|lang=([^}]+))?(\|format=short)?}}", input)
if res:
lang = res.group(2) if res.group(2) else "en"
names_only = True if res.group(3) else False
open_sensors = ""
if alarm_entity.open_sensors:
parts = []
for entity_id, status in alarm_entity.open_sensors.items():
if names_only:
parts.append(friendly_name_for_entity_id(entity_id, self.hass))
else:
parts.append(
await self.async_get_open_sensor_string(
entity_id, status, lang
)
)
open_sensors = ", ".join(parts)
input = input.replace(res.group(0), open_sensors)
# process wildcard '{{bypassed_sensors}}'
if "{{bypassed_sensors}}" in input:
bypassed_sensors = ""
if alarm_entity.bypassed_sensors and len(alarm_entity.bypassed_sensors):
parts = []
for entity_id in alarm_entity.bypassed_sensors:
name = friendly_name_for_entity_id(entity_id, self.hass)
parts.append(name)
bypassed_sensors = ", ".join(parts)
input = input.replace("{{bypassed_sensors}}", bypassed_sensors)
# process wildcard '{{arm_mode}}'
res = re.search(r"{{arm_mode(\|lang=([^}]+))?}}", input)
if res:
lang = res.group(2) if res.group(2) else "en"
arm_mode = await self.async_get_arm_mode_string(alarm_entity.arm_mode, lang)
input = input.replace(res.group(0), arm_mode)
# process wildcard '{{changed_by}}'
if "{{changed_by}}" in input:
changed_by = alarm_entity.changed_by if alarm_entity.changed_by else ""
input = input.replace("{{changed_by}}", changed_by)
# process wildcard '{{delay}}'
if "{{delay}}" in input:
delay = str(alarm_entity.delay) if alarm_entity.delay else ""
input = input.replace("{{delay}}", delay)
# process HA templates
if is_template_string(input):
input = Template(input, self.hass).async_render()
return input
async def async_get_open_sensor_string(
self, entity_id: str, state: str, language: str
):
"""Get translation for sensor states."""
if self._sensorTranslationCache and self._sensorTranslationLang == language:
translations = self._sensorTranslationCache
else:
translations = await async_get_translations(
self.hass, language, "device_automation", ["binary_sensor"]
)
self._sensorTranslationCache = translations
self._sensorTranslationLang = language
entity = self.hass.states.get(entity_id)
device_type = (
entity.attributes["device_class"]
if entity and "device_class" in entity.attributes
else None
)
if state == STATE_OPEN:
translation_key = (
f"component.binary_sensor.device_automation.condition_type.{ENTITY_CONDITIONS[device_type][0]['type']}"
if device_type in ENTITY_CONDITIONS
else None
)
if translation_key and translation_key in translations:
string = translations[translation_key]
else:
string = "{entity_name} is open"
elif state == STATE_CLOSED:
translation_key = (
f"component.binary_sensor.device_automation.condition_type.{ENTITY_CONDITIONS[device_type][1]['type']}"
if device_type in ENTITY_CONDITIONS
else None
)
if translation_key and translation_key in translations:
string = translations[translation_key]
else:
string = "{entity_name} is closed"
elif state == STATE_UNAVAILABLE:
string = "{entity_name} is unavailable"
else:
string = "{entity_name} is unknown"
name = friendly_name_for_entity_id(entity_id, self.hass)
string = string.replace("{entity_name}", name)
return string
async def async_get_arm_mode_string(self, arm_mode: str, language: str):
"""Get translation for alarm arm mode."""
if self._alarmTranslationCache and self._alarmTranslationLang == language:
translations = self._alarmTranslationCache
else:
translations = await async_get_translations(
self.hass, language, "entity_component", ["alarm_control_panel"]
)
self._alarmTranslationCache = translations
self._alarmTranslationLang = language
translation_key = (
f"component.alarm_control_panel.entity_component._.state.{arm_mode}"
if arm_mode
else None
)
if translation_key and translation_key in translations:
return translations[translation_key]
elif arm_mode:
return " ".join(w.capitalize() for w in arm_mode.split("_"))
else:
return ""