This repository has been archived by the owner on Aug 4, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Encapsulate beewilight controller and add config_flow
- Loading branch information
Showing
8 changed files
with
398 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,54 @@ | ||
"""Beewi Light integration.""" | ||
"""Control Beewi bluetooth light.""" | ||
import logging | ||
|
||
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry | ||
from homeassistant.core import HomeAssistant | ||
|
||
DOMAIN = "beewi_light" | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
async def async_setup(hass: HomeAssistant, config: dict): | ||
"""Set up beewi_light from configuration.yaml.""" | ||
_LOGGER.debug("async setup.") | ||
_LOGGER.debug(" List entries for domain:") | ||
_LOGGER.debug(hass.config_entries.async_entries(DOMAIN)) | ||
|
||
conf = config.get(DOMAIN) | ||
if conf: | ||
hass.async_create_task( | ||
hass.config_entries.flow.async_init( | ||
DOMAIN, data=conf, context={"source": SOURCE_IMPORT} | ||
) | ||
) | ||
|
||
return True | ||
|
||
|
||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): | ||
"""Set up beewi_light from a config entry.""" | ||
_LOGGER.debug(f"async setup entry: {entry.as_dict()}") | ||
hass.async_create_task( | ||
hass.config_entries.async_forward_entry_setup(entry, "light") | ||
) | ||
return True | ||
|
||
|
||
#async def async_migrate_entry(hass, entry): | ||
#"""Migrate old entry.""" | ||
#data = entry.data | ||
#version = entry.version | ||
|
||
#_LOGGER.debug(f"Migrating Yeelight_bt from Version {version}. it has data: {data}") | ||
# Migrate Version 1 -> Version 2: Stuff up... nothing changed. | ||
#if version == 1: | ||
# version = entry.version = 2 | ||
# hass.config_entries.async_update_entry(entry, data=data) | ||
#return True | ||
|
||
|
||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): | ||
"""Unload a config entry.""" | ||
_LOGGER.debug("async unload entry") | ||
return await hass.config_entries.async_forward_entry_unload(entry, "light") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
""" Beewi SmartLight used by Home Assistant """ | ||
import logging | ||
from bluepy import btle | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
class BeewiSmartLight: | ||
""" This class will interact with a Beewi SmartLight bulb """ | ||
|
||
def __init__(self, mac, iface: str = "hci0", address_type: str = "public"): | ||
""" Initialize the BeewiSmartLight """ | ||
self._mac = mac | ||
self._iface = iface | ||
self._address_type = address_type | ||
self.peripheral = btle.Peripheral() | ||
self._connection = None | ||
|
||
def turnOn(self): | ||
""" Turn on the light """ | ||
command = "1001" | ||
try: | ||
self.__writeCharacteristic(command) | ||
except Exception as e: | ||
_LOGGER.error(e) | ||
raise | ||
|
||
def turnOff(self): | ||
""" Turn off the light """ | ||
try: | ||
command = "1000" | ||
self.__writeCharacteristic(command) | ||
except Exception as e: | ||
_LOGGER.error(e) | ||
raise | ||
|
||
def setWhite(self): | ||
""" Switch the light in white mode """ | ||
try: | ||
command = "14808080" | ||
self.__writeCharacteristic(command) | ||
except Exception as e: | ||
_LOGGER.error(e) | ||
raise | ||
|
||
def setColor(self, r:int, g:int, b:int): | ||
""" Switch the light in color mode and set the RGB color """ | ||
try: | ||
hexR = str(hex(r)[2:]).zfill(2) | ||
hexG = str(hex(g)[2:]).zfill(2) | ||
hexB = str(hex(b)[2:]).zfill(2) | ||
command = "13" + hexR + hexG + hexB | ||
self.__writeCharacteristic(command) | ||
except Exception as e: | ||
_LOGGER.error(e) | ||
raise | ||
|
||
def setBrightness(self, brightness:int): | ||
""" Set the brightness of the light """ | ||
try: | ||
brightnessten = 0 if brightness == 0 else (round((brightness / 2.55 ) / 10) + 1) | ||
command = "120" + str(hex(2 if brightnessten < 2 else brightnessten)[2:]) | ||
self.__writeCharacteristic(command) | ||
except Exception as e: | ||
_LOGGER.error(e) | ||
raise | ||
|
||
def setWhiteWarm(self, warm:int): | ||
""" Set the tone of the light cold/hot """ | ||
try: | ||
warmten = 0 if warm == 0 else (round((warm / 2.55 ) / 10) + 1) | ||
command = "110" + str(hex(2 if warmten < 2 else warmten)[2:]) | ||
self.__writeCharacteristic(command) | ||
except Exception as e: | ||
_LOGGER.error(e) | ||
raise | ||
|
||
def getSettings(self, verbose = 0): | ||
""" Get current state of the light """ | ||
try: | ||
self.__readSettings() | ||
if verbose: | ||
print(" ON/OFF : {}".format(self.isOn)) | ||
print(" WHITE/COLOR : {}".format(self.isWhite)) | ||
print(" BRIGHTNESS : {}".format(self.brightness)) | ||
print(" TEMPERATURE : {}".format(self.temperature)) | ||
print("COLOR (R/G/B) : {} {} {}".format(self.red, self.green, self.blue)) | ||
|
||
return self.settings | ||
except Exception as e: | ||
_LOGGER.error(e) | ||
raise | ||
|
||
def __readSettings(self): | ||
""" Read settings of the light """ | ||
try: | ||
self.settings = self.__readCharacteristic(0x0024) | ||
self.isOn = self.settings[0] | ||
|
||
if(0x2 <= (self.settings[1] & 0x0F) <= 0xB): | ||
self.isWhite = 1 | ||
temp = (self.settings[1] & 0x0F) - 2 | ||
self.temperature = int(0 if temp == 0 else (((temp + 1) * 2.55) * 10)) | ||
elif(0x0 <= (self.settings[1] & 0x0F) < 0x2): | ||
self.isWhite = 0 | ||
self.temperature = 0 | ||
brightness = ((self.settings[1] & 0xF0) >> 4) - 2 | ||
self.brightness = int(0 if brightness == 0 else (((brightness + 1) * 2.55) * 10)) | ||
self.red = self.settings[2] | ||
self.green = self.settings[3] | ||
self.blue = self.settings[4] | ||
|
||
return self.settings | ||
except: | ||
raise | ||
|
||
def __writeCharacteristic(self,command): | ||
""" Send command to the light """ | ||
try: | ||
if not self.test_connection(): | ||
self.connect() | ||
|
||
self._connection.writeCharacteristic(0x0021,bytes.fromhex("55" + command + "0d0a")) | ||
except: | ||
raise | ||
|
||
def __readCharacteristic(self,characteristic): | ||
""" Read BTLE characteristic """ | ||
try: | ||
if not self.test_connection(): | ||
self.connect() | ||
|
||
resp = self._connection.readCharacteristic(characteristic) | ||
return resp | ||
except: | ||
raise | ||
|
||
def test_connection(self): | ||
""" | ||
Test if the connection is still alive | ||
:return: True if connected | ||
""" | ||
if not self.is_connected(): | ||
return False | ||
|
||
# send test message, read bulb name | ||
try: | ||
self._connection.readCharacteristic(0x0024) | ||
except btle.BTLEException: | ||
self.disconnect() | ||
return False | ||
except BrokenPipeError: | ||
# bluepy-helper died | ||
self._connection = None | ||
return False | ||
|
||
return True | ||
|
||
def is_connected(self): | ||
""" | ||
:return: True if connected | ||
""" | ||
return self._connection is not None # and self.test_connection() | ||
|
||
def connect(self): | ||
""" | ||
Connect to device | ||
:param bluetooth_adapter_nr: bluetooth adapter name as shown by | ||
"hciconfig" command. Default : 0 for (hci0) | ||
:return: True if connection succeed, False otherwise | ||
""" | ||
_LOGGER.debug("Connecting...") | ||
|
||
try: | ||
connection = btle.Peripheral(self._mac) | ||
self._connection = connection.withDelegate(self) | ||
#self._subscribe_to_recv_characteristic() | ||
except RuntimeError as e: | ||
_LOGGER.error('Connection failed : {}'.format(e)) | ||
return False | ||
|
||
return True | ||
|
||
def disconnect(self): | ||
""" | ||
Disconnect from device | ||
""" | ||
_LOGGER.debug("Disconnecting...") | ||
|
||
try: | ||
self._connection.disconnect() | ||
except btle.BTLEException: | ||
pass | ||
|
||
self._connection = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
"""Config flow for yeelight_bt""" | ||
import logging | ||
from homeassistant import config_entries | ||
from homeassistant.const import CONF_NAME, CONF_MAC | ||
import voluptuous as vol | ||
from homeassistant.helpers import device_registry as dr | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
DOMAIN = "beewi_light" | ||
|
||
class Beewi_lightConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): # type: ignore | ||
"""Handle a config flow for yeelight_bt.""" | ||
|
||
VERSION = 1 | ||
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL | ||
|
||
@property | ||
def data_schema(self): | ||
"""Return the data schema for integration.""" | ||
return vol.Schema({vol.Required(CONF_NAME): str, vol.Required(CONF_MAC): str}) | ||
|
||
async def async_step_user(self, user_input=None): | ||
"""Handle a flow initialized by the user.""" | ||
self.devices = [] | ||
return await self.async_step_device() | ||
|
||
async def async_step_device(self, user_input=None): | ||
"""Handle setting up a device.""" | ||
# _LOGGER.debug(f"User_input: {user_input}") | ||
if not user_input: | ||
schema_mac = str | ||
if self.devices: | ||
schema_mac = vol.In(self.devices) | ||
schema = vol.Schema( | ||
{vol.Required(CONF_NAME): str, vol.Required(CONF_MAC): schema_mac} | ||
) | ||
return self.async_show_form(step_id="device", data_schema=schema) | ||
|
||
user_input[CONF_MAC] = user_input[CONF_MAC][:17] | ||
unique_id = dr.format_mac(user_input[CONF_MAC]) | ||
_LOGGER.debug(f"Yeelight UniqueID: {unique_id}") | ||
|
||
await self.async_set_unique_id(unique_id) | ||
self._abort_if_unique_id_configured() | ||
|
||
return self.async_create_entry(title=user_input[CONF_NAME], data=user_input) | ||
|
||
async def async_step_import(self, import_info): | ||
"""Handle import from config file.""" | ||
return await self.async_step_device(import_info) |
Oops, something went wrong.