Add Mikrotik hub and rework device tracker (#25664)

* Add const.py for Mikrotik hub

* Add Mikrotik hub component

* Rework device tracker to use hub

* Fix validation errors

* Fix line spacing

* Bump librouteros version to 2.3.0

* Bump librouteros version to 2.3.0

* Used black code formatter

* Fix validation errors

* Fix errors

* Fix errors

* Renamed MikrotikAPI to MikrotikClient

* Fix method

* Fix device_tracker and rename ssl to use_ssl

* Moved device tracker functions into device tracker

* Fix missing constants

* Fix device tracker host_name

* Fix errors

* Fix device tracker typo

* Adding device tracker attributes

* Change attributes order

* Change attributes order

* Add one more attribute

* Reformat black

* Exclude Mikrotik modules

* Remove async calls

* Remove unused import

* Adding scan interval to device tracker

* Fix errors and update code

* Fix error

* Fix missing period

* Update device tracker to use setup_scanner

* Fix hass.data HOSTS

* Fix errors

* Fix errors

* Fixes and updates

* Fixing and reworking

* Fixes

* Fix constant INFO

* get_hostname fix and return value
This commit is contained in:
Robert Dunmire III 2019-08-08 07:58:13 -04:00 committed by Martin Hjelmare
parent 0fa1e3ac92
commit 4bcef25486
6 changed files with 402 additions and 219 deletions

View file

@ -374,7 +374,7 @@ omit =
homeassistant/components/metoffice/weather.py
homeassistant/components/microsoft/tts.py
homeassistant/components/miflora/sensor.py
homeassistant/components/mikrotik/device_tracker.py
homeassistant/components/mikrotik/*
homeassistant/components/mill/climate.py
homeassistant/components/mitemp_bt/sensor.py
homeassistant/components/mjpeg/camera.py

View file

@ -1 +1,196 @@
"""The mikrotik component."""
import logging
import ssl
import voluptuous as vol
import librouteros
from librouteros.login import login_plain, login_token
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_USERNAME,
CONF_PORT,
CONF_SSL,
CONF_METHOD,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.discovery import load_platform
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER
from .const import (
DOMAIN,
HOSTS,
MTK_LOGIN_PLAIN,
MTK_LOGIN_TOKEN,
DEFAULT_ENCODING,
IDENTITY,
CONF_TRACK_DEVICES,
CONF_ENCODING,
CONF_ARP_PING,
CONF_LOGIN_METHOD,
MIKROTIK_SERVICES,
)
_LOGGER = logging.getLogger(__name__)
MTK_DEFAULT_API_PORT = "8728"
MTK_DEFAULT_API_SSL_PORT = "8729"
MIKROTIK_SCHEMA = vol.All(
vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_METHOD): cv.string,
vol.Optional(CONF_LOGIN_METHOD): vol.Any(MTK_LOGIN_PLAIN, MTK_LOGIN_TOKEN),
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
vol.Optional(CONF_TRACK_DEVICES, default=True): cv.boolean,
vol.Optional(CONF_ARP_PING, default=False): cv.boolean,
}
)
)
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: vol.All(cv.ensure_list, [MIKROTIK_SCHEMA])}, extra=vol.ALLOW_EXTRA
)
def setup(hass, config):
"""Set up the Mikrotik component."""
hass.data[DOMAIN] = {HOSTS: {}}
for device in config[DOMAIN]:
host = device[CONF_HOST]
use_ssl = device.get(CONF_SSL)
user = device.get(CONF_USERNAME)
password = device.get(CONF_PASSWORD, "")
login = device.get(CONF_LOGIN_METHOD)
encoding = device.get(CONF_ENCODING)
track_devices = device.get(CONF_TRACK_DEVICES)
if CONF_PORT in device:
port = device.get(CONF_PORT)
else:
if use_ssl:
port = MTK_DEFAULT_API_SSL_PORT
else:
port = MTK_DEFAULT_API_PORT
if login == MTK_LOGIN_PLAIN:
login_method = (login_plain,)
elif login == MTK_LOGIN_TOKEN:
login_method = (login_token,)
else:
login_method = (login_plain, login_token)
try:
api = MikrotikClient(
host, use_ssl, port, user, password, login_method, encoding
)
api.connect_to_device()
hass.data[DOMAIN][HOSTS][host] = {"config": device, "api": api}
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
) as api_error:
_LOGGER.error("Mikrotik %s error %s", host, api_error)
continue
if track_devices:
hass.data[DOMAIN][HOSTS][host][DEVICE_TRACKER] = True
load_platform(hass, DEVICE_TRACKER, DOMAIN, None, config)
if not hass.data[DOMAIN][HOSTS]:
return False
return True
class MikrotikClient:
"""Handle all communication with the Mikrotik API."""
def __init__(self, host, use_ssl, port, user, password, login_method, encoding):
"""Initialize the Mikrotik Client."""
self._host = host
self._use_ssl = use_ssl
self._port = port
self._user = user
self._password = password
self._login_method = login_method
self._encoding = encoding
self.hostname = None
self._client = None
self._connected = False
def connect_to_device(self):
"""Connect to Mikrotik device."""
self._connected = False
_LOGGER.debug("[%s] Connecting to Mikrotik device", self._host)
kwargs = {
"encoding": self._encoding,
"login_methods": self._login_method,
"port": self._port,
}
if self._use_ssl:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
kwargs["ssl_wrapper"] = ssl_context.wrap_socket
try:
self._client = librouteros.connect(
self._host, self._user, self._password, **kwargs
)
self._connected = True
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
) as api_error:
_LOGGER.error("Mikrotik %s: %s", self._host, api_error)
self._client = None
return False
self.hostname = self.get_hostname()
_LOGGER.info("Mikrotik Connected to %s (%s)", self.hostname, self._host)
return self._connected
def get_hostname(self):
"""Return device host name."""
data = self.command(MIKROTIK_SERVICES[IDENTITY])
return data[0]["name"] if data else None
def connected(self):
"""Return connected boolean."""
return self._connected
def command(self, cmd, params=None):
"""Retrieve data from Mikrotik API."""
if not self._connected or not self._client:
return None
try:
if params:
response = self._client(cmd=cmd, **params)
else:
response = self._client(cmd=cmd)
except (librouteros.exceptions.ConnectionError,) as api_error:
_LOGGER.error("Mikrotik %s connection error %s", self._host, api_error)
self.connect_to_device()
return None
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
) as api_error:
_LOGGER.error(
"Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s",
self._host,
cmd,
api_error,
)
return None
return response if response else None

View file

@ -0,0 +1,49 @@
"""Constants used in the Mikrotik components."""
DOMAIN = "mikrotik"
MIKROTIK = DOMAIN
HOSTS = "hosts"
MTK_LOGIN_PLAIN = "plain"
MTK_LOGIN_TOKEN = "token"
CONF_ARP_PING = "arp_ping"
CONF_TRACK_DEVICES = "track_devices"
CONF_LOGIN_METHOD = "login_method"
CONF_ENCODING = "encoding"
DEFAULT_ENCODING = "utf-8"
INFO = "info"
IDENTITY = "identity"
ARP = "arp"
DHCP = "dhcp"
WIRELESS = "wireless"
CAPSMAN = "capsman"
MIKROTIK_SERVICES = {
INFO: "/system/routerboard/getall",
IDENTITY: "/system/identity/getall",
ARP: "/ip/arp/getall",
DHCP: "/ip/dhcp-server/lease/getall",
WIRELESS: "/interface/wireless/registration-table/getall",
CAPSMAN: "/caps-man/registration-table/getall",
}
ATTR_DEVICE_TRACKER = [
"comment",
"mac-address",
"ssid",
"interface",
"host-name",
"last-seen",
"rx-signal",
"signal-strength",
"tx-ccq",
"signal-to-noise",
"wmm-enabled",
"authentication-type",
"encryption",
"tx-rate-set",
"rx-rate",
"tx-rate",
"uptime",
]

View file

@ -1,251 +1,190 @@
"""Support for Mikrotik routers as device tracker."""
import logging
import ssl
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN,
PLATFORM_SCHEMA,
DOMAIN as DEVICE_TRACKER,
DeviceScanner,
)
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_USERNAME,
CONF_PORT,
CONF_SSL,
CONF_METHOD,
from homeassistant.util import slugify
from homeassistant.const import CONF_METHOD
from .const import (
HOSTS,
MIKROTIK,
CONF_ARP_PING,
MIKROTIK_SERVICES,
CAPSMAN,
WIRELESS,
DHCP,
ARP,
ATTR_DEVICE_TRACKER,
)
_LOGGER = logging.getLogger(__name__)
MTK_DEFAULT_API_PORT = "8728"
MTK_DEFAULT_API_SSL_PORT = "8729"
CONF_LOGIN_METHOD = "login_method"
MTK_LOGIN_PLAIN = "plain"
MTK_LOGIN_TOKEN = "token"
CONF_ENCODING = "encoding"
DEFAULT_ENCODING = "utf-8"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_METHOD): cv.string,
vol.Optional(CONF_LOGIN_METHOD): vol.Any(MTK_LOGIN_PLAIN, MTK_LOGIN_TOKEN),
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
}
)
def get_scanner(hass, config):
"""Validate the configuration and return MTikScanner."""
scanner = MikrotikScanner(config[DOMAIN])
"""Validate the configuration and return MikrotikScanner."""
for host in hass.data[MIKROTIK][HOSTS]:
if DEVICE_TRACKER not in hass.data[MIKROTIK][HOSTS][host]:
continue
hass.data[MIKROTIK][HOSTS][host].pop(DEVICE_TRACKER, None)
api = hass.data[MIKROTIK][HOSTS][host]["api"]
config = hass.data[MIKROTIK][HOSTS][host]["config"]
hostname = api.get_hostname()
scanner = MikrotikScanner(api, host, hostname, config)
return scanner if scanner.success_init else None
class MikrotikScanner(DeviceScanner):
"""This class queries a Mikrotik router."""
"""This class queries a Mikrotik device."""
def __init__(self, config):
def __init__(self, api, host, hostname, config):
"""Initialize the scanner."""
self.last_results = {}
self.host = config[CONF_HOST]
self.ssl = config[CONF_SSL]
try:
self.port = config[CONF_PORT]
except KeyError:
if self.ssl:
self.port = MTK_DEFAULT_API_SSL_PORT
else:
self.port = MTK_DEFAULT_API_PORT
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.login_method = config.get(CONF_LOGIN_METHOD)
self.api = api
self.config = config
self.host = host
self.hostname = hostname
self.method = config.get(CONF_METHOD)
self.encoding = config[CONF_ENCODING]
self.arp_ping = config.get(CONF_ARP_PING)
self.dhcp = None
self.devices_arp = {}
self.devices_dhcp = {}
self.device_tracker = None
self.success_init = self.api.connected()
self.connected = False
self.success_init = False
self.client = None
self.wireless_exist = None
self.success_init = self.connect_to_device()
def get_extra_attributes(self, device):
"""
Get extra attributes of a device.
if self.success_init:
_LOGGER.info("Start polling Mikrotik (%s) router...", self.host)
self._update_info()
else:
_LOGGER.error("Connection to Mikrotik (%s) failed", self.host)
Some known extra attributes that may be returned in the device tuple
include MAC address (mac), network device (dev), IP address
(ip), reachable status (reachable), associated router
(host), hostname if known (hostname) among others.
"""
return self.device_tracker.get(device) or {}
def connect_to_device(self):
"""Connect to Mikrotik method."""
import librouteros
from librouteros.login import login_plain, login_token
if self.login_method == MTK_LOGIN_PLAIN:
login_method = (login_plain,)
elif self.login_method == MTK_LOGIN_TOKEN:
login_method = (login_token,)
else:
login_method = (login_plain, login_token)
try:
kwargs = {
"port": self.port,
"encoding": self.encoding,
"login_methods": login_method,
}
if self.ssl:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
kwargs["ssl_wrapper"] = ssl_context.wrap_socket
self.client = librouteros.connect(
self.host, self.username, self.password, **kwargs
)
try:
routerboard_info = self.client(cmd="/system/routerboard/getall")
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
):
routerboard_info = None
raise
if routerboard_info:
_LOGGER.info(
"Connected to Mikrotik %s with IP %s",
routerboard_info[0].get("model", "Router"),
self.host,
)
self.connected = True
try:
self.capsman_exist = self.client(cmd="/caps-man/interface/getall")
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
):
self.capsman_exist = False
if not self.capsman_exist:
_LOGGER.info(
"Mikrotik %s: Not a CAPSman controller. Trying "
"local interfaces",
self.host,
)
try:
self.wireless_exist = self.client(cmd="/interface/wireless/getall")
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
):
self.wireless_exist = False
if (
not self.wireless_exist
and not self.capsman_exist
or self.method == "ip"
):
_LOGGER.info(
"Mikrotik %s: Wireless adapters not found. Try to "
"use DHCP lease table as presence tracker source. "
"Please decrease lease time as much as possible",
self.host,
)
if self.method:
_LOGGER.info(
"Mikrotik %s: Manually selected polling method %s",
self.host,
self.method,
)
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
) as api_error:
_LOGGER.error("Connection error: %s", api_error)
return self.connected
def get_device_name(self, device):
"""Get name for a device."""
host = self.device_tracker.get(device, {})
return host.get("host_name")
def scan_devices(self):
"""Scan for new devices and return a list with found device MACs."""
import librouteros
self.update_device_tracker()
return list(self.device_tracker)
try:
self._update_info()
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
) as api_error:
_LOGGER.error("Connection error: %s", api_error)
self.connect_to_device()
return [device for device in self.last_results]
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
return self.last_results.get(device)
def _update_info(self):
"""Retrieve latest information from the Mikrotik box."""
def get_method(self):
"""Determine the device tracker polling method."""
if self.method:
devices_tracker = self.method
_LOGGER.debug(
"Mikrotik %s: Manually selected polling method %s",
self.host,
self.method,
)
return self.method
capsman = self.api.command(MIKROTIK_SERVICES[CAPSMAN])
if not capsman:
_LOGGER.debug(
"Mikrotik %s: Not a CAPsMAN controller. "
"Trying local wireless interfaces",
(self.host),
)
else:
if self.capsman_exist:
devices_tracker = "capsman"
elif self.wireless_exist:
devices_tracker = "wireless"
else:
devices_tracker = "ip"
return CAPSMAN
_LOGGER.debug(
"Loading %s devices from Mikrotik (%s) ...", devices_tracker, self.host
)
wireless = self.api.command(MIKROTIK_SERVICES[WIRELESS])
if not wireless:
_LOGGER.info(
"Mikrotik %s: Wireless adapters not found. Try to "
"use DHCP lease table as presence tracker source. "
"Please decrease lease time as much as possible",
self.host,
)
return DHCP
device_names = self.client(cmd="/ip/dhcp-server/lease/getall")
if devices_tracker == "capsman":
devices = self.client(cmd="/caps-man/registration-table/getall")
elif devices_tracker == "wireless":
devices = self.client(cmd="/interface/wireless/registration-table/getall")
else:
devices = device_names
return WIRELESS
if device_names is None and devices is None:
return False
def update_device_tracker(self):
"""Update device_tracker from Mikrotik API."""
self.device_tracker = {}
if not self.method:
self.method = self.get_method()
mac_names = {
device.get("mac-address"): device.get("host-name")
for device in device_names
if device.get("mac-address")
data = self.api.command(MIKROTIK_SERVICES[self.method])
if data is None:
return
if self.method != DHCP:
dhcp = self.api.command(MIKROTIK_SERVICES[DHCP])
if dhcp is not None:
self.devices_dhcp = load_mac(dhcp)
arp = self.api.command(MIKROTIK_SERVICES[ARP])
self.devices_arp = load_mac(arp)
for device in data:
mac = device.get("mac-address")
if self.method == DHCP:
if "active-address" not in device:
continue
if self.arp_ping and self.devices_arp:
if mac not in self.devices_arp:
continue
interface = self.devices_arp[mac]["interface"]
if not self.do_arp_ping(mac, interface):
continue
attrs = {}
if mac in self.devices_dhcp and "host-name" in self.devices_dhcp[mac]:
hostname = self.devices_dhcp[mac].get("host-name")
if hostname:
attrs["host_name"] = hostname
if self.devices_arp and mac in self.devices_arp:
attrs["ip_address"] = self.devices_arp[mac].get("address")
for attr in ATTR_DEVICE_TRACKER:
if attr in device and device[attr] is not None:
attrs[slugify(attr)] = device[attr]
attrs["scanner_type"] = self.method
attrs["scanner_host"] = self.host
attrs["scanner_hostname"] = self.hostname
self.device_tracker[mac] = attrs
def do_arp_ping(self, mac, interface):
"""Attempt to arp ping MAC address via interface."""
params = {
"arp-ping": "yes",
"interval": "100ms",
"count": 3,
"interface": interface,
"address": mac,
}
cmd = "/ping"
data = self.api.command(cmd, params)
if data is not None:
status = 0
for result in data:
if "status" in result:
_LOGGER.debug(
"Mikrotik %s arp_ping error: %s", self.host, result["status"]
)
status += 1
if status == len(data):
return None
return data
if devices_tracker in ("wireless", "capsman"):
self.last_results = {
device.get("mac-address"): mac_names.get(device.get("mac-address"))
for device in devices
}
else:
self.last_results = {
device.get("mac-address"): mac_names.get(device.get("mac-address"))
for device in device_names
if device.get("active-address")
}
return True
def load_mac(devices=None):
"""Load dictionary using MAC address as key."""
if not devices:
return None
mac_devices = {}
for device in devices:
if "mac-address" in device:
mac = device.pop("mac-address")
mac_devices[mac] = device
return mac_devices

View file

@ -3,7 +3,7 @@
"name": "Mikrotik",
"documentation": "https://www.home-assistant.io/components/mikrotik",
"requirements": [
"librouteros==2.2.0"
"librouteros==2.3.0"
],
"dependencies": [],
"codeowners": []

View file

@ -714,7 +714,7 @@ libpurecool==0.5.0
libpyfoscam==1.0
# homeassistant.components.mikrotik
librouteros==2.2.0
librouteros==2.3.0
# homeassistant.components.soundtouch
libsoundtouch==0.7.2