SwitchBotの温度湿度計をRaspberry PiのBluetoothを使って読み取る件です。
このキーワードで検索すると、pythonライブラリーのbluepyを使ったものが多くあり、参考となるスクリプトもあります。しかし最近のUbuntuはpipのインストールは仮想環境へせよとのこと。自分の使い方では電源ONからずっとcronで定期的にスクリプトを実行するので仮想環境を使うのはなんかひと手間なような気がします。
そこでUbuntuの流儀にしたがってaptでインストールできるBluetoothライブラリのbleakのほうを使うことにしました。
# apt install python3-bleak
でインストールするのみです。
bleakに対応したpython3スクリプトですが、こちらの記事を参考に、該当の温度計に当てはまるものをほぼ丸々使わせてもらいました。あとはpython3-psycopg2もaptでインストールして、他のマシンで動作しているPostgreSQLにデータを投げてデータ収集してWebページで表示するようにしています。参考までにうちの防水タイプの温度湿度計向けのデータ収集改造スクリプトを以下に貼っておきます。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # SwitchBotセンサーからBLEアドバタイズデータを取得してPostgreSQLに保存する # SwitchBotセンサーのMACアドレスを引数に指定して実行 # 使い方: python3 switchbot.py <MACアドレス> # 例: python3 switchbot.py aa:bb:cc:dd:ee:ff import sys import asyncio from datetime import datetime, timezone, timedelta import logging from typing import Optional import psycopg2 from bleak import BleakScanner from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s' ) # DB接続情報 DB_HOST = "192.168.1.10" DB_NAME = "temperature" DB_USER = "dbuser" DB_PASSWORD = "dbpassword" def usage(): logging.info("使い方: python switchbot.py <MACアドレス>") logging.info("例: python switchbot.py aa:bb:cc:dd:ee:ff") sys.exit(1) if len(sys.argv) < 2: logging.error("[エラー] MACアドレスを指定してください。\n") usage() macaddr: str = sys.argv[1].lower() async def run() -> None: """ SwitchBotのBLEアドバタイズデータをスキャンし、バッテリー・温度・湿度を出力 """ scan_complete = asyncio.get_event_loop().create_future() def parse_advertisement_data(advertisement_data) -> Optional[tuple[int, float, int]]: """ SwitchBotのBLEアドバタイズデータからバッテリー・温度・湿度を抽出する。 Args: advertisement_data: BLEアドバタイズデータオブジェクト Returns: (battery, temp, humid) のタプル、またはパース失敗時はNone """ # Service Data (AD Type 0x16) try: service_data = advertisement_data.service_data battery: Optional[int] = None if "0000fd3d-0000-1000-8000-00805f9b34fb" in service_data: servicedata = service_data["0000fd3d-0000-1000-8000-00805f9b34fb"] if len(servicedata) > 2: battery = servicedata[2] & 0b01111111 # Manufacturer Data (AD Type 0xFF) manufacturer_data = advertisement_data.manufacturer_data if not manufacturer_data: return None raw_data = next(iter(manufacturer_data.values())) if len(raw_data) < 11: return None s_payload = raw_data[6:] if len(s_payload) < 5: return None humid = s_payload[4] & 0b01111111 temp = (s_payload[2] & 0b00001111) / 10 + (s_payload[3] & 0b01111111) isOverZero = s_payload[3] & 0b10000000 if not isOverZero: temp = -temp if battery is not None: return (battery, temp, humid) else: return None except Exception as e: logging.error(f"[データパースエラー] {e}") return None def insert_db(result: tuple[int, float, int]) -> None: """ result: (battery, temp, humid) のタプルをDBへ保存 """ try: battery, temp, humid = result conn = psycopg2.connect( host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD ) cur = conn.cursor() JST = timezone(timedelta(hours=9), 'JST') now = datetime.now(JST) sql = """ INSERT INTO temperature (datetime, mac, tmpr, humid, battery) VALUES (%s, %s, %s, %s, %s) """ cur.execute(sql, (now, macaddr, temp, humid, battery)) conn.commit() cur.close() conn.close() logging.info("[DB] データを保存しました。") except Exception as db_e: logging.error(f"[DBエラー] {db_e}") def detection_callback(device: 'BLEDevice', advertisement_data: 'AdvertisementData') -> None: """ BLEスキャン時に呼ばれるコールバック関数。指定MACアドレスのデータを検出したらDB保存・出力を行う。 Args: device: 検出されたBLEデバイスオブジェクト advertisement_data: BLEアドバタイズデータオブジェクト Returns: なし """ try: if device.address.lower() == macaddr: result = parse_advertisement_data(advertisement_data) if result is not None and not scan_complete.done(): now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') logging.info(f"TIME: {now} | MAC: {macaddr} | [RESULT] {result}") insert_db(result) scan_complete.set_result(True) except Exception as e: logging.error(f"[コールバックエラー] {e}") scanner = BleakScanner(detection_callback) try: await scanner.start() try: await asyncio.wait_for(scan_complete, timeout=20.0) except asyncio.TimeoutError: logging.warning("[タイムアウト] デバイスが見つかりませんでした。") finally: await scanner.stop() except Exception as e: logging.error(f"[スキャンエラー] {e}") # 実行 if __name__ == "__main__": try: asyncio.run(run()) except KeyboardInterrupt: logging.info("[中断] ユーザーによって停止されました。")