''' Script to get NPM data via Modbus need parameter: port /usr/bin/python3 /var/www/nebuleair_pro_4g/NPM/get_data_modbus.py ttyAMA5 Modbus RTU [Slave Address][Function Code][Starting Address][Quantity of Registers][CRC] Pour récupérer les 5 cannaux (a partir du registre 0x80) Request \x01\x03\x00\x80\x00\x0A\xE4\x1E \x01 Slave Address (slave device address) \x03 Function code (read multiple holding registers) \x00\x80 Starting Address (The request starts reading from holding register address 0x80 or 128) \x00\x0A Quantity of Registers (Requests to read 0x0A or 10 consecutive registers starting from address 128) \xE4\x1E Cyclic Redundancy Check (checksum ) ''' import serial import requests import json import sys import crcmod parameter = sys.argv[1:] # Exclude the script name #print("Parameters received:") port='/dev/'+parameter[0] ser = serial.Serial( port=port, baudrate=115200, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 0.5 ) # Define Modbus CRC-16 function crc16 = crcmod.predefined.mkPredefinedCrcFun('modbus') # Request frame without CRC data = b'\x01\x03\x00\x80\x00\x0A' # Calculate CRC crc = crc16(data) crc_low = crc & 0xFF crc_high = (crc >> 8) & 0xFF # Append CRC to the frame request = data + bytes([crc_low, crc_high]) print(f"Request frame: {request.hex()}") ser.write(request) while True: try: byte_data = ser.readline() formatted = ''.join(f'\\x{byte:02x}' for byte in byte_data) print(formatted) # Extract LSW (first 2 bytes) and MSW (last 2 bytes) lsw_channel1 = int.from_bytes(byte_data[3:5], byteorder='little') msw_chanel1 = int.from_bytes(byte_data[5:7], byteorder='little') raw_value_channel1 = (msw_chanel1 << 16) | lsw_channel1 lsw_channel2 = int.from_bytes(byte_data[7:9], byteorder='little') msw_chanel2 = int.from_bytes(byte_data[9:11], byteorder='little') raw_value_channel2 = (msw_chanel2 << 16) | lsw_channel2 lsw_channel3 = int.from_bytes(byte_data[11:13], byteorder='little') msw_chanel3 = int.from_bytes(byte_data[13:15], byteorder='little') raw_value_channel3 = (msw_chanel3 << 16) | lsw_channel3 lsw_channel4 = int.from_bytes(byte_data[15:17], byteorder='little') msw_chanel4 = int.from_bytes(byte_data[17:19], byteorder='little') raw_value_channel4 = (msw_chanel1 << 16) | lsw_channel4 lsw_channel5 = int.from_bytes(byte_data[19:21], byteorder='little') msw_chanel5 = int.from_bytes(byte_data[21:23], byteorder='little') raw_value_channel5 = (msw_chanel5 << 16) | lsw_channel5 print(f"Channel 1 (0.2->0.5): {raw_value_channel1}") print(f"Channel 2 (0.5->1.0): {raw_value_channel2}") print(f"Channel 3 (1.0->2.5): {raw_value_channel3}") print(f"Channel 4 (2.5->5.0): {raw_value_channel4}") print(f"Channel 5 (5.0->10.): {raw_value_channel5}") break except KeyboardInterrupt: print("User interrupt encountered. Exiting...") time.sleep(3) exit() except: # for all other kinds of error, but not specifying which one print("Unknown error...") time.sleep(3) exit()