Files
nebuleair_pro_4g/NPM/get_data_modbus.py
Your Name 4a5e0b3577 update
2025-01-29 10:04:01 +01:00

104 lines
3.2 KiB
Python
Executable File

'''
Script to get NPM data via Modbus
need parameter: port
/usr/bin/python3 /var/www/nebuleair_pro_4g/NPM/get_data_modbus.py ttyAMA5
Modbus RTU
[Slave Address][Function Code][Starting Address][Quantity of Registers][CRC]
Pour récupérer les 5 cannaux (a partir du registre 0x80)
Request
\x01\x03\x00\x80\x00\x0A\xE4\x1E
\x01 Slave Address (slave device address)
\x03 Function code (read multiple holding registers)
\x00\x80 Starting Address (The request starts reading from holding register address 0x80 or 128)
\x00\x0A Quantity of Registers (Requests to read 0x0A or 10 consecutive registers starting from address 128)
\xE4\x1E Cyclic Redundancy Check (checksum )
'''
import serial
import requests
import json
import sys
import crcmod
parameter = sys.argv[1:] # Exclude the script name
#print("Parameters received:")
port='/dev/'+parameter[0]
ser = serial.Serial(
port=port,
baudrate=115200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 0.5
)
# Define Modbus CRC-16 function
crc16 = crcmod.predefined.mkPredefinedCrcFun('modbus')
# Request frame without CRC
data = b'\x01\x03\x00\x80\x00\x0A'
# Calculate CRC
crc = crc16(data)
crc_low = crc & 0xFF
crc_high = (crc >> 8) & 0xFF
# Append CRC to the frame
request = data + bytes([crc_low, crc_high])
print(f"Request frame: {request.hex()}")
ser.write(request)
while True:
try:
byte_data = ser.readline()
formatted = ''.join(f'\\x{byte:02x}' for byte in byte_data)
print(formatted)
# Extract LSW (first 2 bytes) and MSW (last 2 bytes)
lsw_channel1 = int.from_bytes(byte_data[3:5], byteorder='little')
msw_chanel1 = int.from_bytes(byte_data[5:7], byteorder='little')
raw_value_channel1 = (msw_chanel1 << 16) | lsw_channel1
lsw_channel2 = int.from_bytes(byte_data[7:9], byteorder='little')
msw_chanel2 = int.from_bytes(byte_data[9:11], byteorder='little')
raw_value_channel2 = (msw_chanel2 << 16) | lsw_channel2
lsw_channel3 = int.from_bytes(byte_data[11:13], byteorder='little')
msw_chanel3 = int.from_bytes(byte_data[13:15], byteorder='little')
raw_value_channel3 = (msw_chanel3 << 16) | lsw_channel3
lsw_channel4 = int.from_bytes(byte_data[15:17], byteorder='little')
msw_chanel4 = int.from_bytes(byte_data[17:19], byteorder='little')
raw_value_channel4 = (msw_chanel1 << 16) | lsw_channel4
lsw_channel5 = int.from_bytes(byte_data[19:21], byteorder='little')
msw_chanel5 = int.from_bytes(byte_data[21:23], byteorder='little')
raw_value_channel5 = (msw_chanel5 << 16) | lsw_channel5
print(f"Channel 1 (0.2->0.5): {raw_value_channel1}")
print(f"Channel 2 (0.5->1.0): {raw_value_channel2}")
print(f"Channel 3 (1.0->2.5): {raw_value_channel3}")
print(f"Channel 4 (2.5->5.0): {raw_value_channel4}")
print(f"Channel 5 (5.0->10.): {raw_value_channel5}")
break
except KeyboardInterrupt:
print("User interrupt encountered. Exiting...")
time.sleep(3)
exit()
except:
# for all other kinds of error, but not specifying which one
print("Unknown error...")
time.sleep(3)
exit()