Découvert en vérif SSH sur nebuleair-pro100 : le timer CCS811 échouait en ModuleNotFoundError car l'OTA fait git pull mais ne réinstallait jamais les dépendances pip (installation_part1.sh ne tourne qu'à l'install neuve). - requirements.txt: source unique de vérité des deps Python - installation_part1.sh: install via requirements.txt (chemin relatif au script, le repo n'est pas encore cloné dans /var/www à cette étape) - update_firmware.sh: nouvelle étape 2a, pip install -r requirements.txt (idempotent) -> les capteurs déjà déployés récupèrent les libs manquantes à l'OTA - CCS811/write_data.py + get_data.py: skip des lectures eCO2 < 400 ppm (échantillon 0/0 parasite juste après init du driver, plancher physique = 400) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
92 lines
2.8 KiB
Python
92 lines
2.8 KiB
Python
'''
|
|
Live read of the AMS CCS811 air-quality sensor (used by the web "Get Data" button).
|
|
Prints a JSON object: {"eCO2": <int_ppm>, "TVOC": <int_ppb>} or {"error": "<message>"}.
|
|
|
|
CCS811 is a MOX gas sensor: it outputs an equivalent CO2 (eCO2, derived from VOCs)
|
|
and a Total VOC (TVOC). It is NOT an NDIR CO2 sensor like the S88. TVOC is the
|
|
primary measurement of interest here.
|
|
|
|
I2C, library adafruit-circuitpython-ccs811. Address read from config_table
|
|
(key CCS811_address, e.g. "0x5A" Adafruit / "0x5B" SparkFun), default 0x5A.
|
|
|
|
Usage: /usr/bin/python3 /var/www/nebuleair_pro_4g/CCS811/get_data.py [address]
|
|
'''
|
|
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
|
|
DB_PATH = "/var/www/nebuleair_pro_4g/sqlite/sensors.db"
|
|
DEFAULT_ADDRESS = 0x5A
|
|
# CCS811 produces a fresh sample every 1 s in drive mode 1. Poll data_ready a few
|
|
# times to cover the case where the driver was just (re)initialised.
|
|
DATA_READY_RETRIES = 30
|
|
DATA_READY_DELAY = 0.2 # seconds
|
|
|
|
|
|
def get_address_from_config():
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT value FROM config_table WHERE key = ?", ("CCS811_address",))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
if row and row[0]:
|
|
return int(str(row[0]), 16)
|
|
except Exception:
|
|
pass
|
|
return DEFAULT_ADDRESS
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) > 1:
|
|
try:
|
|
address = int(sys.argv[1], 16)
|
|
except ValueError:
|
|
print(json.dumps({"error": f"invalid address {sys.argv[1]}"}))
|
|
return
|
|
else:
|
|
address = get_address_from_config()
|
|
|
|
try:
|
|
import board
|
|
import busio
|
|
import adafruit_ccs811
|
|
except Exception as e:
|
|
print(json.dumps({"error": f"library import failed: {e}"}))
|
|
return
|
|
|
|
try:
|
|
i2c = busio.I2C(board.SCL, board.SDA)
|
|
ccs811 = adafruit_ccs811.CCS811(i2c, address=address)
|
|
except Exception as e:
|
|
print(json.dumps({"error": f"cannot init CCS811 at {hex(address)}: {e}"}))
|
|
return
|
|
|
|
try:
|
|
ready = False
|
|
for _ in range(DATA_READY_RETRIES):
|
|
if ccs811.data_ready:
|
|
ready = True
|
|
break
|
|
time.sleep(DATA_READY_DELAY)
|
|
|
|
if not ready:
|
|
print(json.dumps({"error": "CCS811 data not ready (warming up?)"}))
|
|
return
|
|
|
|
eco2 = int(ccs811.eco2)
|
|
tvoc = int(ccs811.tvoc)
|
|
# eCO2 floor is 400 ppm; a sub-400 value is a not-yet-settled sample.
|
|
if eco2 < 400:
|
|
print(json.dumps({"error": "CCS811 reading not settled (warming up?)"}))
|
|
return
|
|
print(json.dumps({"eCO2": eco2, "TVOC": tvoc}))
|
|
except Exception as e:
|
|
print(json.dumps({"error": f"CCS811 read error: {e}"}))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|