python bleakを使ってSwitchBotの温度計データを取得する

 SwitchBotの温度湿度計Raspberry PiBluetoothを使って読み取る件です。
 このキーワードで検索すると、pythonライブラリーのbluepyを使ったものが多くあり、参考となるスクリプトもあります。しかし最近のUbuntuはpipのインストールは仮想環境へせよとのこと。自分の使い方では電源ONからずっとcronで定期的にスクリプトを実行するので仮想環境を使うのはなんかひと手間なような気がします。
 そこでUbuntuの流儀にしたがってaptでインストールできるBluetoothライブラリのbleakのほうを使うことにしました。

# apt install python3-bleak

でインストールするのみです。
 bleakに対応したpython3スクリプトですが、こちらの記事を参考に、該当の温度計に当てはまるものをほぼ丸々使わせてもらいました。あとはpython3-psycopg2もaptでインストールして、他のマシンで動作しているPostgreSQLにデータを投げてデータ収集してWebページで表示するようにしています。参考までにうちの防水タイプの温度湿度計向けのデータ収集改造スクリプトを以下に貼っておきます。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SwitchBotセンサーからBLEアドバタイズデータを取得してPostgreSQLに保存する
# SwitchBotセンサーのMACアドレスを引数に指定して実行
# 使い方: python3 switchbot.py <MACアドレス>
# 例: python3 switchbot.py aa:bb:cc:dd:ee:ff

import sys
import asyncio
from datetime import datetime, timezone, timedelta
import logging

from typing import Optional

import psycopg2
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s'
)

# DB接続情報
DB_HOST = "192.168.1.10"
DB_NAME = "temperature"
DB_USER = "dbuser"
DB_PASSWORD = "dbpassword"


def usage():
    logging.info("使い方: python switchbot.py <MACアドレス>")
    logging.info("例: python switchbot.py aa:bb:cc:dd:ee:ff")
    sys.exit(1)


if len(sys.argv) < 2:
    logging.error("[エラー] MACアドレスを指定してください。\n")
    usage()


macaddr: str = sys.argv[1].lower()


async def run() -> None:
    """
    SwitchBotのBLEアドバタイズデータをスキャンし、バッテリー・温度・湿度を出力
    """
    scan_complete = asyncio.get_event_loop().create_future()


    def parse_advertisement_data(advertisement_data) -> Optional[tuple[int, float, int]]:
        """
        SwitchBotのBLEアドバタイズデータからバッテリー・温度・湿度を抽出する。

        Args:
            advertisement_data: BLEアドバタイズデータオブジェクト

        Returns:
            (battery, temp, humid) のタプル、またはパース失敗時はNone
        """
        # Service Data (AD Type 0x16)
        try:
            service_data = advertisement_data.service_data
            battery: Optional[int] = None
            if "0000fd3d-0000-1000-8000-00805f9b34fb" in service_data:
                servicedata = service_data["0000fd3d-0000-1000-8000-00805f9b34fb"]
                if len(servicedata) > 2:
                    battery = servicedata[2] & 0b01111111

            # Manufacturer Data (AD Type 0xFF)
            manufacturer_data = advertisement_data.manufacturer_data
            if not manufacturer_data:
                return None
            raw_data = next(iter(manufacturer_data.values()))
            if len(raw_data) < 11:
                return None
            s_payload = raw_data[6:]
            if len(s_payload) < 5:
                return None

            humid = s_payload[4] & 0b01111111
            temp = (s_payload[2] & 0b00001111) / 10 + (s_payload[3] & 0b01111111)
            isOverZero = s_payload[3] & 0b10000000
            if not isOverZero:
                temp = -temp

            if battery is not None:
                return (battery, temp, humid)
            else:
                return None
        except Exception as e:
            logging.error(f"[データパースエラー] {e}")
            return None


    def insert_db(result: tuple[int, float, int]) -> None:
        """
        result: (battery, temp, humid) のタプルをDBへ保存
        """
        try:
            battery, temp, humid = result
            conn = psycopg2.connect(
                host=DB_HOST,
                dbname=DB_NAME,
                user=DB_USER,
                password=DB_PASSWORD
            )
            cur = conn.cursor()
            JST = timezone(timedelta(hours=9), 'JST')
            now = datetime.now(JST)
            sql = """
                INSERT INTO temperature (datetime, mac, tmpr, humid, battery)
                VALUES (%s, %s, %s, %s, %s)
            """
            cur.execute(sql, (now, macaddr, temp, humid, battery))
            conn.commit()
            cur.close()
            conn.close()
            logging.info("[DB] データを保存しました。")
        except Exception as db_e:
            logging.error(f"[DBエラー] {db_e}")


    def detection_callback(device: 'BLEDevice', advertisement_data: 'AdvertisementData') -> None:
        """
        BLEスキャン時に呼ばれるコールバック関数。指定MACアドレスのデータを検出したらDB保存・出力を行う。

        Args:
            device: 検出されたBLEデバイスオブジェクト
            advertisement_data: BLEアドバタイズデータオブジェクト

        Returns:
            なし
        """
        try:
            if device.address.lower() == macaddr:
                result = parse_advertisement_data(advertisement_data)
                if result is not None and not scan_complete.done():
                    now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    logging.info(f"TIME: {now} | MAC: {macaddr} | [RESULT] {result}")
                    insert_db(result)
                    scan_complete.set_result(True)
        except Exception as e:
            logging.error(f"[コールバックエラー] {e}")


    scanner = BleakScanner(detection_callback)

    try:
        await scanner.start()
        try:
            await asyncio.wait_for(scan_complete, timeout=20.0)
        except asyncio.TimeoutError:
            logging.warning("[タイムアウト] デバイスが見つかりませんでした。")
        finally:
            await scanner.stop()
    except Exception as e:
        logging.error(f"[スキャンエラー] {e}")


# 実行
if __name__ == "__main__":

    try:
        asyncio.run(run())
    except KeyboardInterrupt:
        logging.info("[中断] ユーザーによって停止されました。")