''' _ _ ____ __ __ | \ | | _ \| \/ | | \| | |_) | |\/| | | |\ | __/| | | | |_| \_|_| |_| |_| Script to get NPM data via Modbus need parameter: port /usr/bin/python3 /var/www/nebuleair_pro_4g/NPM/old/test_modbus.py Modbus RTU [Slave Address][Function Code][Starting Address][Quantity of Registers][CRC] Pour récupérer les concentrations en PM1, PM10 et PM2.5 (a partir du registre 0x38) les 5 cannaux Donnée actualisée toutes les 10 secondes Request \x01\x03\x00\x38\x00\x55\...\... \x01 Slave Address (slave device address) \x03 Function code (read multiple holding registers) \x00\x38 Starting Address (The request starts reading from holding register address x38 or 56) \x00\x55 Quantity of Registers (Requests to read x55 or 85 consecutive registers starting from address 56) \...\... Cyclic Redundancy Check (checksum ) ''' import serial import requests import json import sys import crcmod import sqlite3 # Connect to the SQLite database conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() def load_config(config_file): try: with open(config_file, 'r') as file: config_data = json.load(file) return config_data except Exception as e: print(f"Error loading config file: {e}") return {} # Load the configuration data config_file = '/var/www/nebuleair_pro_4g/config.json' config = load_config(config_file) npm_solo_port = config.get('NPM_solo_port', '') #port du NPM solo ser = serial.Serial( port=npm_solo_port, baudrate=115200, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 0.5 ) # Define Modbus CRC-16 function crc16 = crcmod.predefined.mkPredefinedCrcFun('modbus') # Request frame without CRC data = b'\x01\x03\x00\x44\x00\x06' # Calculate CRC crc = crc16(data) crc_low = crc & 0xFF crc_high = (crc >> 8) & 0xFF # Append CRC to the frame request = data + bytes([crc_low, crc_high]) #print(f"Request frame: {request.hex()}") ser.write(request) while True: try: byte_data = ser.readline() formatted = ''.join(f'\\x{byte:02x}' for byte in byte_data) print(formatted) if len(byte_data) < 14: print(f"Error: Received {len(byte_data)} bytes, expected 14!") continue #10 secs concentration lsw_pm1 = int.from_bytes(byte_data[3:5], byteorder='big') msw_pm1 = int.from_bytes(byte_data[5:7], byteorder='big') raw_value_pm1 = (msw_pm1 << 16) | lsw_pm1 raw_value_pm1 = raw_value_pm1 / 1000 lsw_pm25 = int.from_bytes(byte_data[7:9], byteorder='big') msw_pm25 = int.from_bytes(byte_data[9:11], byteorder='big') raw_value_pm25 = (msw_pm25 << 16) | lsw_pm25 raw_value_pm25 = raw_value_pm25 / 1000 lsw_pm10 = int.from_bytes(byte_data[11:13], byteorder='big') msw_pm10 = int.from_bytes(byte_data[13:15], byteorder='big') raw_value_pm10 = (msw_pm10 << 16) | lsw_pm10 raw_value_pm10 = raw_value_pm10 / 1000 print("1 min") print(f"PM1: {raw_value_pm1}") print(f"PM2.5: {raw_value_pm25}") print(f"PM10: {raw_value_pm10}") break except KeyboardInterrupt: print("User interrupt encountered. Exiting...") time.sleep(3) exit() except: # for all other kinds of error, but not specifying which one print("Unknown error...") time.sleep(3) exit() conn.close()