This commit is contained in:
root
2025-05-28 15:40:53 +02:00
parent 8d989de425
commit 6cd5191138

View File

@@ -1,11 +1,12 @@
''' #!/usr/bin/env python3
"""
__ __ ____ ____ _____ __ __ ____ ____ _____
| \/ | _ \| _ \_ _| | \/ | _ \| _ \_ _|
| |\/| | |_) | |_) || | | |\/| | |_) | |_) || |
| | | | __/| __/ | | | | | | __/| __/ | |
|_| |_|_| |_| |_| |_| |_|_| |_| |_|
Chargeur solaire Victron MPPT interface UART MPPT Chargeur solaire Victron interface UART
MPPT connections MPPT connections
5V / Rx / TX / GND 5V / Rx / TX / GND
@@ -13,113 +14,125 @@ RPI connection
-- / GPIO9 / GPIO8 / GND -- / GPIO9 / GPIO8 / GND
* pas besoin de connecter le 5V (le GND uniquement) * pas besoin de connecter le 5V (le GND uniquement)
typical response from uart: Fixed version - properly handles continuous data stream
"""
PID 0xA075 ->product ID
FW 164 ->firmware version
SER# HQ2249VJV9W ->serial num
V 13310 ->Battery voilatage in mV
I -130 ->Battery current in mA (negative means its discharging)
VPV 10 ->Solar Panel voltage
PPV 0 ->Solar Panel power (in W)
CS 0 ->Charger status:
0=off (no charging),
2=Bulk (Max current is being delivered to the battery),
3=Absorbtion (battery is nearly full,voltage is held constant.),
4=Float (Battery is fully charged, only maintaining charge)
MPPT 0 ->MPPT (Maximum Power Point Tracking) state: 0 = Off, 1 = Active, 2 = Not tracking
OR 0x00000001
ERR 0
LOAD ON
IL 100
H19 18 ->historical data (Total energy absorbed in kWh)
H20 0 -> Total energy discharged in kWh
H21 0
H22 9
H23 92
HSDS 19
Checksum u
sudo /usr/bin/python3 /var/www/nebuleair_pro_4g/MPPT/read.py
'''
import serial import serial
import time import time
import sqlite3 import sqlite3
import os
# ===== LOGGING CONFIGURATION =====
# Set to True to enable all print statements, False to run silently
DEBUG_MODE = False
# Alternative: Use environment variable (can be set in systemd service)
# DEBUG_MODE = os.environ.get('MPPT_DEBUG', 'false').lower() == 'true'
# Alternative: Check if running under systemd
# DEBUG_MODE = os.isatty(1) # True if running in terminal, False if systemd/cron
# Alternative: Use different log levels
# LOG_LEVEL = "ERROR" # Options: "DEBUG", "INFO", "ERROR", "NONE"
# =================================
# Connect to the SQLite database # Connect to the SQLite database
conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db")
cursor = conn.cursor() cursor = conn.cursor()
# Logging function
def log(message, level="INFO"):
"""Print message only if DEBUG_MODE is True"""
if DEBUG_MODE:
print(message)
# Alternative: could write to a log file instead
# with open('/var/log/mppt.log', 'a') as f:
# f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{level}] {message}\n")
def read_vedirect(port='/dev/ttyAMA4', baudrate=19200, timeout=20, max_attempts=3):
def read_vedirect(port='/dev/ttyAMA4', baudrate=19200, timeout=10):
""" """
Read and parse data from Victron MPPT controller with retry logic Read and parse data from Victron MPPT controller
Returns parsed data as a dictionary or None if all attempts fail Returns parsed data as a dictionary
""" """
required_keys = ['V', 'I', 'VPV', 'PPV', 'CS'] # Essential keys we need required_keys = ['V', 'I', 'VPV', 'PPV', 'CS'] # Essential keys we need
for attempt in range(max_attempts):
try: try:
print(f"Attempt {attempt+1} of {max_attempts}...") log(f"Opening serial port {port} at {baudrate} baud...")
ser = serial.Serial(port, baudrate, timeout=1) ser = serial.Serial(port, baudrate, timeout=1)
# Initialize data dictionary and tracking variables # Clear any buffered data
ser.reset_input_buffer()
time.sleep(0.5)
# Initialize data dictionary
data = {} data = {}
start_time = time.time() start_time = time.time()
checksum_found = False lines_read = 0
blocks_seen = 0
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
try:
line = ser.readline().decode('utf-8', errors='ignore').strip() line = ser.readline().decode('utf-8', errors='ignore').strip()
if not line: if not line:
continue continue
# Check if line contains a key-value pair lines_read += 1
if '\t' in line:
key, value = line.split('\t', 1) # Check if this line contains tab-separated key-value pair
data[key] = value if '\t' in line:
print(f"{key}: {value}") parts = line.split('\t', 1)
else: if len(parts) == 2:
print(f"Info: {line}") key, value = parts
# Check if this is the checksum line (end of data block) data[key] = value
if line.lower().startswith('checksum') or 'checksum' in line.lower(): log(f"{key}: {value}")
checksum_found = True
# Check for checksum line (end of block)
elif line.startswith('Checksum'):
blocks_seen += 1
log(f"--- End of block {blocks_seen} ---")
# Check if we have a complete data block
if checksum_found:
# Check if we have all required keys # Check if we have all required keys
missing_keys = [key for key in required_keys if key not in data] missing_keys = [key for key in required_keys if key not in data]
if not missing_keys: if not missing_keys:
print("✓ Complete data block received!") log(f"✓ Complete data block received after {lines_read} lines!")
ser.close() ser.close()
return data return data
else: else:
print(f"Incomplete data, missing: {', '.join(missing_keys)}") log(f"Block {blocks_seen} incomplete, missing: {', '.join(missing_keys)}")
# Reset and continue reading # Don't clear data, maybe we missed the beginning of first block
if blocks_seen > 1:
# If we've seen multiple blocks and still missing data,
# something is wrong
log("Multiple incomplete blocks, clearing data...")
data = {} data = {}
checksum_found = False
# Timeout occurred
print(f"Timeout on attempt {attempt+1}: Could not get complete data")
ser.close()
# Add small delay between attempts
if attempt < max_attempts - 1:
print("Waiting before next attempt...")
time.sleep(2)
except UnicodeDecodeError as e:
log(f"Decode error: {e}", "ERROR")
continue
except Exception as e: except Exception as e:
print(f"Error on attempt {attempt+1}: {e}") log(f"Error reading line: {e}", "ERROR")
try: continue
ser.close()
except: # Timeout reached
pass log(f"Timeout after {timeout}s, read {lines_read} lines, saw {blocks_seen} blocks")
ser.close()
# If we have some data but not all required keys, return what we have
if data and len(data) >= len(required_keys) - 1:
log("Returning partial data...")
return data
except serial.SerialException as e:
log(f"Serial port error: {e}", "ERROR")
except Exception as e:
log(f"Unexpected error: {e}", "ERROR")
print("All attempts failed")
return None return None
def parse_values(data): def parse_values(data):
"""Convert string values to appropriate types""" """Convert string values to appropriate types"""
if not data: if not data:
@@ -141,13 +154,13 @@ def parse_values(data):
'OR': str, 'OR': str,
'ERR': int, 'ERR': int,
'LOAD': str, 'LOAD': str,
'IL': int, 'IL': lambda x: float(x)/1000, # Convert mA to A
'H19': int, # Total energy absorbed in kWh 'H19': float, # Total energy absorbed in kWh (already in kWh)
'H20': int, # Total energy discharged in kWh 'H20': float, # Total energy discharged in kWh
'H21': int, 'H21': int, # Maximum power today (W)
'H22': int, 'H22': float, # Energy generated today (kWh)
'H23': int, 'H23': int, # Maximum power yesterday (W)
'HSDS': int 'HSDS': int # Day sequence number
} }
# Convert values according to their type # Convert values according to their type
@@ -155,18 +168,19 @@ def parse_values(data):
if key in conversions: if key in conversions:
try: try:
parsed[key] = conversions[key](value) parsed[key] = conversions[key](value)
except (ValueError, TypeError): except (ValueError, TypeError) as e:
log(f"Conversion error for {key}={value}: {e}", "ERROR")
parsed[key] = value # Keep as string if conversion fails parsed[key] = value # Keep as string if conversion fails
else: else:
parsed[key] = value parsed[key] = value
return parsed return parsed
def get_charger_status(cs_value): def get_charger_status(cs_value):
"""Convert CS numeric value to human-readable status""" """Convert CS numeric value to human-readable status"""
status_map = { status_map = {
0: "Off", 0: "Off",
1: "Low power mode",
2: "Fault", 2: "Fault",
3: "Bulk", 3: "Bulk",
4: "Absorption", 4: "Absorption",
@@ -181,8 +195,22 @@ def get_charger_status(cs_value):
} }
return status_map.get(cs_value, f"Unknown ({cs_value})") return status_map.get(cs_value, f"Unknown ({cs_value})")
def get_mppt_status(mppt_value):
"""Convert MPPT value to human-readable status"""
mppt_map = {
0: "Off",
1: "Voltage or current limited",
2: "MPP Tracker active"
}
return mppt_map.get(mppt_value, f"Unknown ({mppt_value})")
if __name__ == "__main__": if __name__ == "__main__":
# Read data (with retry logic) log("=== Victron MPPT Reader ===")
log(f"Started at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Read data
raw_data = read_vedirect() raw_data = read_vedirect()
if raw_data: if raw_data:
@@ -190,25 +218,37 @@ if __name__ == "__main__":
parsed_data = parse_values(raw_data) parsed_data = parse_values(raw_data)
if parsed_data: if parsed_data:
# Check if we have valid battery voltage # Display summary
if parsed_data.get('V', 0) > 0: log("\n===== MPPT Status Summary =====")
print("\n===== MPPT Summary =====") log(f"Product: {parsed_data.get('PID', 'Unknown')} (FW: {parsed_data.get('FW', '?')})")
print(f"Battery: {parsed_data.get('V', 0):.2f}V, {parsed_data.get('I', 0):.2f}A") log(f"Serial: {parsed_data.get('SER#', 'Unknown')}")
print(f"Solar: {parsed_data.get('VPV', 0):.2f}V, {parsed_data.get('PPV', 0)}W") log(f"\nBattery: {parsed_data.get('V', 0):.2f}V, {parsed_data.get('I', 0):.2f}A")
print(f"Charger status: {get_charger_status(parsed_data.get('CS', 0))}") log(f"Solar Panel: {parsed_data.get('VPV', 0):.2f}V, {parsed_data.get('PPV', 0)}W")
log(f"Charger Status: {get_charger_status(parsed_data.get('CS', 0))}")
log(f"MPPT Status: {get_mppt_status(parsed_data.get('MPPT', 0))}")
log(f"Load Output: {parsed_data.get('LOAD', 'Unknown')}, {parsed_data.get('IL', 0):.2f}A")
log(f"\nToday's Energy: {parsed_data.get('H22', 0)}kWh (Max: {parsed_data.get('H21', 0)}W)")
log(f"Total Energy: {parsed_data.get('H19', 0)}kWh")
# Save to SQLite # Validate critical values
battery_voltage = parsed_data.get('V', 0)
if battery_voltage > 0:
# Get timestamp
try:
cursor.execute("SELECT * FROM timestamp_table LIMIT 1") cursor.execute("SELECT * FROM timestamp_table LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
rtc_time_str = row[1] rtc_time_str = row[1] if row else time.strftime('%Y-%m-%d %H:%M:%S')
except:
rtc_time_str = time.strftime('%Y-%m-%d %H:%M:%S')
# Extract values # Extract values for database
battery_voltage = parsed_data.get('V', 0)
battery_current = parsed_data.get('I', 0) battery_current = parsed_data.get('I', 0)
solar_voltage = parsed_data.get('VPV', 0) solar_voltage = parsed_data.get('VPV', 0)
solar_power = parsed_data.get('PPV', 0) solar_power = parsed_data.get('PPV', 0)
charger_status = parsed_data.get('CS', 0) charger_status = parsed_data.get('CS', 0)
# Save to database
try: try:
cursor.execute(''' cursor.execute('''
INSERT INTO data_MPPT (timestamp, battery_voltage, battery_current, solar_voltage, solar_power, charger_status) INSERT INTO data_MPPT (timestamp, battery_voltage, battery_current, solar_voltage, solar_power, charger_status)
@@ -216,16 +256,27 @@ if __name__ == "__main__":
(rtc_time_str, battery_voltage, battery_current, solar_voltage, solar_power, charger_status)) (rtc_time_str, battery_voltage, battery_current, solar_voltage, solar_power, charger_status))
conn.commit() conn.commit()
print("MPPT data saved successfully!") log(f"\n✓ Data saved to database at {rtc_time_str}")
except Exception as e: except sqlite3.Error as e:
# Always log database errors regardless of DEBUG_MODE
if not DEBUG_MODE:
print(f"Database error: {e}") print(f"Database error: {e}")
else: else:
print("Invalid data: Battery voltage is zero or missing") log(f"\n✗ Database error: {e}", "ERROR")
conn.rollback()
else: else:
print("Failed to parse data") log("\n✗ Invalid data: Battery voltage is zero or missing", "ERROR")
else: else:
print("No valid data received from MPPT controller") log("\n✗ Failed to parse data", "ERROR")
else:
log("\n✗ No valid data received from MPPT controller", "ERROR")
log("\nPossible issues:")
log("- Check serial connection (TX/RX/GND)")
log("- Verify port is /dev/ttyAMA4")
log("- Ensure MPPT is powered on")
log("- Check baudrate (should be 19200)")
# Always close the connection # Always close the connection
conn.close() conn.close()
log("\nDone.")