''' Live read of the Senseair S88 CO2 sensor (used by the web "Get Data" button). Prints a JSON object: {"CO2": } or {"error": ""}. Modbus RTU 9600 8N1, reads IR1..IR4 in one frame. Usage: /usr/bin/python3 /var/www/nebuleair_pro_4g/S88/get_data.py [port] If no port is given, the script reads S88_port from config_table. ''' import json import sqlite3 import sys import serial DB_PATH = "/var/www/nebuleair_pro_4g/sqlite/sensors.db" DEFAULT_PORT = "/dev/ttyAMA5" BAUDRATE = 9600 SLAVE_ADDR = 0xFE READ_REQUEST = bytes([SLAVE_ADDR, 0x04, 0x00, 0x00, 0x00, 0x04, 0xE5, 0xC6]) EXPECTED_RESPONSE_LEN = 13 def crc16_modbus(data): crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc def get_port_from_config(): try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT value FROM config_table WHERE key = ?", ("S88_port",)) row = cursor.fetchone() conn.close() return row[0] if row else DEFAULT_PORT except Exception: return DEFAULT_PORT def read_co2(ser): ser.reset_input_buffer() ser.write(READ_REQUEST) response = ser.read(EXPECTED_RESPONSE_LEN) if len(response) < 5: return None, f"short response ({len(response)} bytes)" if response[1] & 0x80: return None, f"Modbus exception code {response[2]:#04x}" if len(response) != EXPECTED_RESPONSE_LEN: return None, f"unexpected response length {len(response)}" received_crc = response[-2] | (response[-1] << 8) if crc16_modbus(response[:-2]) != received_crc: return None, "CRC mismatch" if response[0] != SLAVE_ADDR or response[1] != 0x04 or response[2] != 0x08: return None, f"unexpected header {response[:3].hex()}" status = (response[3] << 8) | response[4] co2 = (response[9] << 8) | response[10] if status != 0: return None, f"sensor not ready (status={status:#06x})" return co2, None def main(): port = sys.argv[1] if len(sys.argv) > 1 else get_port_from_config() try: ser = serial.Serial( port=port, baudrate=BAUDRATE, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1, ) except Exception as e: print(json.dumps({"error": f"Cannot open {port}: {e}"})) return try: co2, err = read_co2(ser) if co2 is None: print(json.dumps({"error": err or "No data from S88"})) return print(json.dumps({"CO2": int(round(co2))})) except Exception as e: print(json.dumps({"error": f"S88 read error: {e}"})) finally: try: ser.close() except Exception: pass if __name__ == "__main__": main()