# # Copyright (c) 2020 Michael Büsch # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import bleak import asyncio def make_uuid16(uuid): return f"0000{uuid:04x}-0000-1000-8000-00805f9b34fb" class EnvsensorsError(Exception): pass class EnvsensorsClient(object): uuid_pres = make_uuid16(0x2A6D) uuid_temp = make_uuid16(0x2A6E) uuid_hum = make_uuid16(0x2A6F) uuid_str = make_uuid16(0x2A3D) @staticmethod async def discover(): try: for dev in await bleak.discover(): if dev.name == "envsensors": yield dev.address except bleak.BleakError as e: raise EnvsensorsError(str(e)) def __init__(self, address): try: self.__address = address self.__client = bleak.BleakClient(address) except bleak.BleakError as e: raise EnvsensorsError(str(e)) async def __aenter__(self): try: await self.__client.connect() return self except bleak.BleakError as e: raise EnvsensorsError(str(e)) async def __aexit__(self, exc_type, exc_value, traceback): try: await self.__client.disconnect() except bleak.BleakError as e: raise EnvsensorsError(str(e)) async def __getAttrInt(self, uuid): try: return int.from_bytes(await self.__client.read_gatt_char(uuid), "little") except bleak.BleakError as e: raise EnvsensorsError(str(e)) async def __getAttrStr(self, uuid): try: return (await self.__client.read_gatt_char(uuid)).decode("UTF-8", "ignore") except bleak.BleakError as e: raise EnvsensorsError(str(e)) async def dump(self): ret = [] ret.append(f"Device {self.__address}") ret.append(f" Temperature {await self.__getAttrInt(self.uuid_temp) * 1e-2 :.1f} °C") ret.append(f" Humidity {await self.__getAttrInt(self.uuid_hum) * 1e-2 :.1f} %") ret.append(f" {await self.__getAttrStr(self.uuid_str)}") ret.append(f" Pressure {await self.__getAttrInt(self.uuid_pres) * 1e-3 :.1f} mBar") return "\n".join(ret) # vim: ts=4 sw=4 expandtab