read_co2() lit IR1..IR4 en une trame (status + CO2) à 9600 8N1, adresse 0xFE 'any address', avec vérification CRC16-Modbus et rejet de la mesure si status non-nul (warm-up ou erreur). CRC requête/réponse validés contre les exemples du datasheet TDE14367. Doc protocole consolidée dans S88/README.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
113 lines
2.9 KiB
Python
113 lines
2.9 KiB
Python
'''
|
|
Live read of the Senseair S88 CO2 sensor (used by the web "Get Data" button).
|
|
Prints a JSON object: {"CO2": <int_ppm>} or {"error": "<message>"}.
|
|
|
|
Modbus RTU 9600 8N1, reads IR1..IR4 in one frame.
|
|
|
|
Usage: /usr/bin/python3 /var/www/nebuleair_pro_4g/S88/get_data.py [port]
|
|
If no port is given, the script reads S88_port from config_table.
|
|
'''
|
|
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
|
|
import serial
|
|
|
|
DB_PATH = "/var/www/nebuleair_pro_4g/sqlite/sensors.db"
|
|
DEFAULT_PORT = "/dev/ttyAMA5"
|
|
BAUDRATE = 9600
|
|
|
|
SLAVE_ADDR = 0xFE
|
|
READ_REQUEST = bytes([SLAVE_ADDR, 0x04, 0x00, 0x00, 0x00, 0x04, 0xE5, 0xC6])
|
|
EXPECTED_RESPONSE_LEN = 13
|
|
|
|
|
|
def crc16_modbus(data):
|
|
crc = 0xFFFF
|
|
for byte in data:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 1:
|
|
crc = (crc >> 1) ^ 0xA001
|
|
else:
|
|
crc >>= 1
|
|
return crc
|
|
|
|
|
|
def get_port_from_config():
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT value FROM config_table WHERE key = ?", ("S88_port",))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
return row[0] if row else DEFAULT_PORT
|
|
except Exception:
|
|
return DEFAULT_PORT
|
|
|
|
|
|
def read_co2(ser):
|
|
ser.reset_input_buffer()
|
|
ser.write(READ_REQUEST)
|
|
response = ser.read(EXPECTED_RESPONSE_LEN)
|
|
|
|
if len(response) < 5:
|
|
return None, f"short response ({len(response)} bytes)"
|
|
|
|
if response[1] & 0x80:
|
|
return None, f"Modbus exception code {response[2]:#04x}"
|
|
|
|
if len(response) != EXPECTED_RESPONSE_LEN:
|
|
return None, f"unexpected response length {len(response)}"
|
|
|
|
received_crc = response[-2] | (response[-1] << 8)
|
|
if crc16_modbus(response[:-2]) != received_crc:
|
|
return None, "CRC mismatch"
|
|
|
|
if response[0] != SLAVE_ADDR or response[1] != 0x04 or response[2] != 0x08:
|
|
return None, f"unexpected header {response[:3].hex()}"
|
|
|
|
status = (response[3] << 8) | response[4]
|
|
co2 = (response[9] << 8) | response[10]
|
|
|
|
if status != 0:
|
|
return None, f"sensor not ready (status={status:#06x})"
|
|
|
|
return co2, None
|
|
|
|
|
|
def main():
|
|
port = sys.argv[1] if len(sys.argv) > 1 else get_port_from_config()
|
|
|
|
try:
|
|
ser = serial.Serial(
|
|
port=port,
|
|
baudrate=BAUDRATE,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
bytesize=serial.EIGHTBITS,
|
|
timeout=1,
|
|
)
|
|
except Exception as e:
|
|
print(json.dumps({"error": f"Cannot open {port}: {e}"}))
|
|
return
|
|
|
|
try:
|
|
co2, err = read_co2(ser)
|
|
if co2 is None:
|
|
print(json.dumps({"error": err or "No data from S88"}))
|
|
return
|
|
print(json.dumps({"CO2": int(round(co2))}))
|
|
except Exception as e:
|
|
print(json.dumps({"error": f"S88 read error: {e}"}))
|
|
finally:
|
|
try:
|
|
ser.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|