v1.9.13: Capteur CO2 Senseair S88 - scaffolding

Table data_S88, flag config S88 + port configurable S88_port
(default /dev/ttyAMA5), service/timer systemd 10s, carte
sensors.html, endpoint launcher.php, toggle admin.html.

read_co2() est un stub NotImplementedError en attente du datasheet.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
PaulVua
2026-06-01 16:15:37 +02:00
parent 05734715a7
commit 239bdfea69
10 changed files with 304 additions and 5 deletions

72
S88/get_data.py Normal file
View File

@@ -0,0 +1,72 @@
'''
Live read of the Senseair S88 CO2 sensor (used by the web "Get Data" button).
Prints a JSON object: {"CO2": <int_ppm>} or {"error": "<message>"}.
Usage: /usr/bin/python3 /var/www/nebuleair_pro_4g/S88/get_data.py [port]
If no port is given, the script reads S88_port from config_table.
'''
import json
import sqlite3
import sys
import serial
DB_PATH = "/var/www/nebuleair_pro_4g/sqlite/sensors.db"
DEFAULT_PORT = "/dev/ttyAMA5"
BAUDRATE = 9600
def get_port_from_config():
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT value FROM config_table WHERE key = ?", ("S88_port",))
row = cursor.fetchone()
conn.close()
return row[0] if row else DEFAULT_PORT
except Exception:
return DEFAULT_PORT
def read_co2(ser):
# TODO: implement the Senseair S88 read protocol once the datasheet is provided.
# Expected return: integer CO2 concentration in ppm, or None on failure.
raise NotImplementedError("Senseair S88 read protocol not implemented yet")
def main():
port = sys.argv[1] if len(sys.argv) > 1 else get_port_from_config()
try:
ser = serial.Serial(
port=port,
baudrate=BAUDRATE,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1,
)
except Exception as e:
print(json.dumps({"error": f"Cannot open {port}: {e}"}))
return
try:
co2 = read_co2(ser)
if co2 is None:
print(json.dumps({"error": "No data from S88"}))
return
print(json.dumps({"CO2": int(round(co2))}))
except NotImplementedError as e:
print(json.dumps({"error": str(e)}))
except Exception as e:
print(json.dumps({"error": f"S88 read error: {e}"}))
finally:
try:
ser.close()
except Exception:
pass
if __name__ == "__main__":
main()