This commit is contained in:
Your Name
2025-03-13 11:39:40 +01:00
parent 14044a8856
commit 1037207df3
6 changed files with 272 additions and 14 deletions

View File

@@ -1,12 +1,215 @@
import serial
'''
__ __ ____ ____ _____
| \/ | _ \| _ \_ _|
| |\/| | |_) | |_) || |
| | | | __/| __/ | |
|_| |_|_| |_| |_|
Chargeur solaire Victron MPPT interface UART
def read_vedirect(port='/dev/serial0', baudrate=19200):
ser = serial.Serial(port, baudrate, timeout=1)
typical response from uart:
PID 0xA075 ->product ID
FW 164 ->firmware version
SER# HQ2249VJV9W ->serial num
V 13310 ->Battery voilatage in mV
I -130 ->Battery current in mA (negative means its discharging)
VPV 10 ->Solar Panel voltage
PPV 0 ->Solar Panel power (in W)
CS 0 ->Charger status (0=off, 2=Bulk, 3=Absorbtion, 4=Float)
MPPT 0 ->MPPT (Maximum Power Point Tracking) state: 0 = Off, 1 = Active, 2 = Not tracking
OR 0x00000001
ERR 0
LOAD ON
IL 100
H19 18 ->historical data (Total energy absorbed in kWh)
H20 0 -> Total energy discharged in kWh
H21 0
H22 9
H23 92
HSDS 19
Checksum u
sudo /usr/bin/python3 /var/www/nebuleair_pro_4g/MPPT/read.py
'''
import serial
import time
import sqlite3
# Connect to the SQLite database
conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db")
cursor = conn.cursor()
def read_vedirect(port='/dev/ttyAMA4', baudrate=19200, timeout=20, max_attempts=3):
"""
Read and parse data from Victron MPPT controller with retry logic
Returns parsed data as a dictionary or None if all attempts fail
"""
required_keys = ['V', 'I', 'VPV', 'PPV', 'CS'] # Essential keys we need
while True:
line = ser.readline().decode('utf-8', errors='ignore').strip()
if line:
print(line) # Raw data from Victron
for attempt in range(max_attempts):
try:
print(f"Attempt {attempt+1} of {max_attempts}...")
ser = serial.Serial(port, baudrate, timeout=1)
# Initialize data dictionary and tracking variables
data = {}
start_time = time.time()
while time.time() - start_time < timeout:
line = ser.readline().decode('utf-8', errors='ignore').strip()
if not line:
continue
# Check if line contains a key-value pair
if '\t' in line:
key, value = line.split('\t', 1)
data[key] = value
print(f"{key}: {value}")
else:
print(f"Info: {line}")
# Check if we have a complete data block
if 'Checksum' in data:
# Check if we have all required keys
missing_keys = [key for key in required_keys if key not in data]
if not missing_keys:
ser.close()
return data
else:
print(f"Incomplete data, missing: {', '.join(missing_keys)}")
# Clear data and continue reading
data = {}
# Timeout occurred
print(f"Timeout on attempt {attempt+1}: Could not get complete data")
ser.close()
# Add small delay between attempts
if attempt < max_attempts - 1:
print("Waiting before next attempt...")
time.sleep(2)
except Exception as e:
print(f"Error on attempt {attempt+1}: {e}")
try:
ser.close()
except:
pass
print("All attempts failed")
return None
def parse_values(data):
"""Convert string values to appropriate types"""
if not data:
return None
parsed = {}
# Define conversions for each key
conversions = {
'PID': str,
'FW': int,
'SER#': str,
'V': lambda x: float(x)/1000, # Convert mV to V
'I': lambda x: float(x)/1000, # Convert mA to A
'VPV': lambda x: float(x)/1000 if x != '---' else 0, # Convert mV to V
'PPV': int,
'CS': int,
'MPPT': int,
'OR': str,
'ERR': int,
'LOAD': str,
'IL': int,
'H19': int, # Total energy absorbed in kWh
'H20': int, # Total energy discharged in kWh
'H21': int,
'H22': int,
'H23': int,
'HSDS': int
}
# Convert values according to their type
for key, value in data.items():
if key in conversions:
try:
parsed[key] = conversions[key](value)
except (ValueError, TypeError):
parsed[key] = value # Keep as string if conversion fails
else:
parsed[key] = value
return parsed
def get_charger_status(cs_value):
"""Convert CS numeric value to human-readable status"""
status_map = {
0: "Off",
1: "Low power mode",
2: "Fault",
3: "Bulk",
4: "Absorption",
5: "Float",
6: "Storage",
7: "Equalize",
9: "Inverting",
11: "Power supply",
245: "Starting-up",
247: "Repeated absorption",
252: "External control"
}
return status_map.get(cs_value, f"Unknown ({cs_value})")
if __name__ == "__main__":
read_vedirect()
# Read data (with retry logic)
raw_data = read_vedirect()
if raw_data:
# Parse data
parsed_data = parse_values(raw_data)
if parsed_data:
# Check if we have valid battery voltage
if parsed_data.get('V', 0) > 0:
print("\n===== MPPT Summary =====")
print(f"Battery: {parsed_data.get('V', 0):.2f}V, {parsed_data.get('I', 0):.2f}A")
print(f"Solar: {parsed_data.get('VPV', 0):.2f}V, {parsed_data.get('PPV', 0)}W")
print(f"Charger status: {get_charger_status(parsed_data.get('CS', 0))}")
# Save to SQLite
cursor.execute("SELECT * FROM timestamp_table LIMIT 1")
row = cursor.fetchone()
rtc_time_str = row[1]
# Extract values
battery_voltage = parsed_data.get('V', 0)
battery_current = parsed_data.get('I', 0)
solar_voltage = parsed_data.get('VPV', 0)
solar_power = parsed_data.get('PPV', 0)
charger_status = parsed_data.get('CS', 0)
try:
cursor.execute('''
INSERT INTO data_MPPT (timestamp, battery_voltage, battery_current, solar_voltage, solar_power, charger_status)
VALUES (?, ?, ?, ?, ?, ?)''',
(rtc_time_str, battery_voltage, battery_current, solar_voltage, solar_power, charger_status))
conn.commit()
print("MPPT data saved successfully!")
except Exception as e:
print(f"Database error: {e}")
else:
print("Invalid data: Battery voltage is zero or missing")
else:
print("Failed to parse data")
else:
print("No valid data received from MPPT controller")
# Always close the connection
conn.close()