''' __ __ ____ ____ _____ | \/ | _ \| _ \_ _| | |\/| | |_) | |_) || | | | | | __/| __/ | | |_| |_|_| |_| |_| Chargeur solaire Victron MPPT interface UART MPPT connections 5V / Rx / TX / GND RPI connection -- / GPIO9 / GPIO8 / GND * pas besoin de connecter le 5V (le GND uniquement) typical response from uart: PID 0xA075 ->product ID FW 164 ->firmware version SER# HQ2249VJV9W ->serial num V 13310 ->Battery voilatage in mV I -130 ->Battery current in mA (negative means its discharging) VPV 10 ->Solar Panel voltage PPV 0 ->Solar Panel power (in W) CS 0 ->Charger status: 0=off (no charging), 2=Bulk (Max current is being delivered to the battery), 3=Absorbtion (battery is nearly full,voltage is held constant.), 4=Float (Battery is fully charged, only maintaining charge) MPPT 0 ->MPPT (Maximum Power Point Tracking) state: 0 = Off, 1 = Active, 2 = Not tracking OR 0x00000001 ERR 0 LOAD ON IL 100 H19 18 ->historical data (Total energy absorbed in kWh) H20 0 -> Total energy discharged in kWh H21 0 H22 9 H23 92 HSDS 19 Checksum u sudo /usr/bin/python3 /var/www/nebuleair_pro_4g/MPPT/read.py ''' import serial import time import sqlite3 # Connect to the SQLite database conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() def read_vedirect(port='/dev/ttyAMA4', baudrate=19200, timeout=20, max_attempts=3): """ Read and parse data from Victron MPPT controller with retry logic Returns parsed data as a dictionary or None if all attempts fail """ required_keys = ['V', 'I', 'VPV', 'PPV', 'CS'] # Essential keys we need for attempt in range(max_attempts): try: print(f"Attempt {attempt+1} of {max_attempts}...") ser = serial.Serial(port, baudrate, timeout=1) # Initialize data dictionary and tracking variables data = {} start_time = time.time() checksum_found = False while time.time() - start_time < timeout: line = ser.readline().decode('utf-8', errors='ignore').strip() if not line: continue # Check if line contains a key-value pair if '\t' in line: key, value = line.split('\t', 1) data[key] = value print(f"{key}: {value}") else: print(f"Info: {line}") # Check if this is the checksum line (end of data block) if line.lower().startswith('checksum') or 'checksum' in line.lower(): checksum_found = True # Check if we have a complete data block if checksum_found: # Check if we have all required keys missing_keys = [key for key in required_keys if key not in data] if not missing_keys: print("✓ Complete data block received!") ser.close() return data else: print(f"Incomplete data, missing: {', '.join(missing_keys)}") # Reset and continue reading data = {} checksum_found = False # Timeout occurred print(f"Timeout on attempt {attempt+1}: Could not get complete data") ser.close() # Add small delay between attempts if attempt < max_attempts - 1: print("Waiting before next attempt...") time.sleep(2) except Exception as e: print(f"Error on attempt {attempt+1}: {e}") try: ser.close() except: pass print("All attempts failed") return None def parse_values(data): """Convert string values to appropriate types""" if not data: return None parsed = {} # Define conversions for each key conversions = { 'PID': str, 'FW': int, 'SER#': str, 'V': lambda x: float(x)/1000, # Convert mV to V 'I': lambda x: float(x)/1000, # Convert mA to A 'VPV': lambda x: float(x)/1000 if x != '---' else 0, # Convert mV to V 'PPV': int, 'CS': int, 'MPPT': int, 'OR': str, 'ERR': int, 'LOAD': str, 'IL': int, 'H19': int, # Total energy absorbed in kWh 'H20': int, # Total energy discharged in kWh 'H21': int, 'H22': int, 'H23': int, 'HSDS': int } # Convert values according to their type for key, value in data.items(): if key in conversions: try: parsed[key] = conversions[key](value) except (ValueError, TypeError): parsed[key] = value # Keep as string if conversion fails else: parsed[key] = value return parsed def get_charger_status(cs_value): """Convert CS numeric value to human-readable status""" status_map = { 0: "Off", 1: "Low power mode", 2: "Fault", 3: "Bulk", 4: "Absorption", 5: "Float", 6: "Storage", 7: "Equalize", 9: "Inverting", 11: "Power supply", 245: "Starting-up", 247: "Repeated absorption", 252: "External control" } return status_map.get(cs_value, f"Unknown ({cs_value})") if __name__ == "__main__": # Read data (with retry logic) raw_data = read_vedirect() if raw_data: # Parse data parsed_data = parse_values(raw_data) if parsed_data: # Check if we have valid battery voltage if parsed_data.get('V', 0) > 0: print("\n===== MPPT Summary =====") print(f"Battery: {parsed_data.get('V', 0):.2f}V, {parsed_data.get('I', 0):.2f}A") print(f"Solar: {parsed_data.get('VPV', 0):.2f}V, {parsed_data.get('PPV', 0)}W") print(f"Charger status: {get_charger_status(parsed_data.get('CS', 0))}") # Save to SQLite cursor.execute("SELECT * FROM timestamp_table LIMIT 1") row = cursor.fetchone() rtc_time_str = row[1] # Extract values battery_voltage = parsed_data.get('V', 0) battery_current = parsed_data.get('I', 0) solar_voltage = parsed_data.get('VPV', 0) solar_power = parsed_data.get('PPV', 0) charger_status = parsed_data.get('CS', 0) try: cursor.execute(''' INSERT INTO data_MPPT (timestamp, battery_voltage, battery_current, solar_voltage, solar_power, charger_status) VALUES (?, ?, ?, ?, ?, ?)''', (rtc_time_str, battery_voltage, battery_current, solar_voltage, solar_power, charger_status)) conn.commit() print("MPPT data saved successfully!") except Exception as e: print(f"Database error: {e}") else: print("Invalid data: Battery voltage is zero or missing") else: print("Failed to parse data") else: print("No valid data received from MPPT controller") # Always close the connection conn.close()