v1.9.14: Senseair S88 - implémentation lecture Modbus RTU

read_co2() lit IR1..IR4 en une trame (status + CO2) à 9600 8N1,
adresse 0xFE 'any address', avec vérification CRC16-Modbus et rejet
de la mesure si status non-nul (warm-up ou erreur).

CRC requête/réponse validés contre les exemples du datasheet TDE14367.
Doc protocole consolidée dans S88/README.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
PaulVua
2026-06-01 16:24:44 +02:00
parent 239bdfea69
commit 7681578f22
5 changed files with 284 additions and 15 deletions

View File

@@ -2,6 +2,8 @@
Live read of the Senseair S88 CO2 sensor (used by the web "Get Data" button).
Prints a JSON object: {"CO2": <int_ppm>} or {"error": "<message>"}.
Modbus RTU 9600 8N1, reads IR1..IR4 in one frame.
Usage: /usr/bin/python3 /var/www/nebuleair_pro_4g/S88/get_data.py [port]
If no port is given, the script reads S88_port from config_table.
'''
@@ -16,6 +18,22 @@ DB_PATH = "/var/www/nebuleair_pro_4g/sqlite/sensors.db"
DEFAULT_PORT = "/dev/ttyAMA5"
BAUDRATE = 9600
SLAVE_ADDR = 0xFE
READ_REQUEST = bytes([SLAVE_ADDR, 0x04, 0x00, 0x00, 0x00, 0x04, 0xE5, 0xC6])
EXPECTED_RESPONSE_LEN = 13
def crc16_modbus(data):
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def get_port_from_config():
try:
@@ -30,9 +48,33 @@ def get_port_from_config():
def read_co2(ser):
# TODO: implement the Senseair S88 read protocol once the datasheet is provided.
# Expected return: integer CO2 concentration in ppm, or None on failure.
raise NotImplementedError("Senseair S88 read protocol not implemented yet")
ser.reset_input_buffer()
ser.write(READ_REQUEST)
response = ser.read(EXPECTED_RESPONSE_LEN)
if len(response) < 5:
return None, f"short response ({len(response)} bytes)"
if response[1] & 0x80:
return None, f"Modbus exception code {response[2]:#04x}"
if len(response) != EXPECTED_RESPONSE_LEN:
return None, f"unexpected response length {len(response)}"
received_crc = response[-2] | (response[-1] << 8)
if crc16_modbus(response[:-2]) != received_crc:
return None, "CRC mismatch"
if response[0] != SLAVE_ADDR or response[1] != 0x04 or response[2] != 0x08:
return None, f"unexpected header {response[:3].hex()}"
status = (response[3] << 8) | response[4]
co2 = (response[9] << 8) | response[10]
if status != 0:
return None, f"sensor not ready (status={status:#06x})"
return co2, None
def main():
@@ -52,13 +94,11 @@ def main():
return
try:
co2 = read_co2(ser)
co2, err = read_co2(ser)
if co2 is None:
print(json.dumps({"error": "No data from S88"}))
print(json.dumps({"error": err or "No data from S88"}))
return
print(json.dumps({"CO2": int(round(co2))}))
except NotImplementedError as e:
print(json.dumps({"error": str(e)}))
except Exception as e:
print(json.dumps({"error": f"S88 read error: {e}"}))
finally: