#!/usr/bin/env python3 """ __ __ ____ ____ _____ | \/ | _ \| _ \_ _| | |\/| | |_) | |_) || | | | | | __/| __/ | | |_| |_|_| |_| |_| MPPT Chargeur solaire Victron interface UART MPPT connections 5V / Rx / TX / GND RPI connection -- / GPIO9 / GPIO8 / GND * pas besoin de connecter le 5V (le GND uniquement) Fixed version - properly handles continuous data stream """ import serial import time import sqlite3 import os # ===== LOGGING CONFIGURATION ===== # Set to True to enable all print statements, False to run silently DEBUG_MODE = False # Alternative: Use environment variable (can be set in systemd service) # DEBUG_MODE = os.environ.get('MPPT_DEBUG', 'false').lower() == 'true' # Alternative: Check if running under systemd # DEBUG_MODE = os.isatty(1) # True if running in terminal, False if systemd/cron # Alternative: Use different log levels # LOG_LEVEL = "ERROR" # Options: "DEBUG", "INFO", "ERROR", "NONE" # ================================= # Connect to the SQLite database conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() # Logging function def log(message, level="INFO"): """Print message only if DEBUG_MODE is True""" if DEBUG_MODE: print(message) # Alternative: could write to a log file instead # with open('/var/log/mppt.log', 'a') as f: # f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} [{level}] {message}\n") def read_vedirect(port='/dev/ttyAMA4', baudrate=19200, timeout=10): """ Read and parse data from Victron MPPT controller Returns parsed data as a dictionary """ required_keys = ['V', 'I', 'VPV', 'PPV', 'CS'] # Essential keys we need try: log(f"Opening serial port {port} at {baudrate} baud...") ser = serial.Serial(port, baudrate, timeout=1) # Clear any buffered data ser.reset_input_buffer() time.sleep(0.5) # Initialize data dictionary data = {} start_time = time.time() lines_read = 0 blocks_seen = 0 while time.time() - start_time < timeout: try: line = ser.readline().decode('utf-8', errors='ignore').strip() if not line: continue lines_read += 1 # Check if this line contains tab-separated key-value pair if '\t' in line: parts = line.split('\t', 1) if len(parts) == 2: key, value = parts data[key] = value log(f"{key}: {value}") # Check for checksum line (end of block) elif line.startswith('Checksum'): blocks_seen += 1 log(f"--- End of block {blocks_seen} ---") # Check if we have all required keys missing_keys = [key for key in required_keys if key not in data] if not missing_keys: log(f"✓ Complete data block received after {lines_read} lines!") ser.close() return data else: log(f"Block {blocks_seen} incomplete, missing: {', '.join(missing_keys)}") # Don't clear data, maybe we missed the beginning of first block if blocks_seen > 1: # If we've seen multiple blocks and still missing data, # something is wrong log("Multiple incomplete blocks, clearing data...") data = {} except UnicodeDecodeError as e: log(f"Decode error: {e}", "ERROR") continue except Exception as e: log(f"Error reading line: {e}", "ERROR") continue # Timeout reached log(f"Timeout after {timeout}s, read {lines_read} lines, saw {blocks_seen} blocks") ser.close() # If we have some data but not all required keys, return what we have if data and len(data) >= len(required_keys) - 1: log("Returning partial data...") return data except serial.SerialException as e: log(f"Serial port error: {e}", "ERROR") except Exception as e: log(f"Unexpected error: {e}", "ERROR") return None def parse_values(data): """Convert string values to appropriate types""" if not data: return None parsed = {} # Define conversions for each key conversions = { 'PID': str, 'FW': int, 'SER#': str, 'V': lambda x: float(x)/1000, # Convert mV to V 'I': lambda x: float(x)/1000, # Convert mA to A 'VPV': lambda x: float(x)/1000 if x != '---' else 0, # Convert mV to V 'PPV': int, 'CS': int, 'MPPT': int, 'OR': str, 'ERR': int, 'LOAD': str, 'IL': lambda x: float(x)/1000, # Convert mA to A 'H19': float, # Total energy absorbed in kWh (already in kWh) 'H20': float, # Total energy discharged in kWh 'H21': int, # Maximum power today (W) 'H22': float, # Energy generated today (kWh) 'H23': int, # Maximum power yesterday (W) 'HSDS': int # Day sequence number } # Convert values according to their type for key, value in data.items(): if key in conversions: try: parsed[key] = conversions[key](value) except (ValueError, TypeError) as e: log(f"Conversion error for {key}={value}: {e}", "ERROR") parsed[key] = value # Keep as string if conversion fails else: parsed[key] = value return parsed def get_charger_status(cs_value): """Convert CS numeric value to human-readable status""" status_map = { 0: "Off", 2: "Fault", 3: "Bulk", 4: "Absorption", 5: "Float", 6: "Storage", 7: "Equalize", 9: "Inverting", 11: "Power supply", 245: "Starting-up", 247: "Repeated absorption", 252: "External control" } return status_map.get(cs_value, f"Unknown ({cs_value})") def get_mppt_status(mppt_value): """Convert MPPT value to human-readable status""" mppt_map = { 0: "Off", 1: "Voltage or current limited", 2: "MPP Tracker active" } return mppt_map.get(mppt_value, f"Unknown ({mppt_value})") if __name__ == "__main__": log("=== Victron MPPT Reader ===") log(f"Started at: {time.strftime('%Y-%m-%d %H:%M:%S')}") # Read data raw_data = read_vedirect() if raw_data: # Parse data parsed_data = parse_values(raw_data) if parsed_data: # Display summary log("\n===== MPPT Status Summary =====") log(f"Product: {parsed_data.get('PID', 'Unknown')} (FW: {parsed_data.get('FW', '?')})") log(f"Serial: {parsed_data.get('SER#', 'Unknown')}") log(f"\nBattery: {parsed_data.get('V', 0):.2f}V, {parsed_data.get('I', 0):.2f}A") log(f"Solar Panel: {parsed_data.get('VPV', 0):.2f}V, {parsed_data.get('PPV', 0)}W") log(f"Charger Status: {get_charger_status(parsed_data.get('CS', 0))}") log(f"MPPT Status: {get_mppt_status(parsed_data.get('MPPT', 0))}") log(f"Load Output: {parsed_data.get('LOAD', 'Unknown')}, {parsed_data.get('IL', 0):.2f}A") log(f"\nToday's Energy: {parsed_data.get('H22', 0)}kWh (Max: {parsed_data.get('H21', 0)}W)") log(f"Total Energy: {parsed_data.get('H19', 0)}kWh") # Validate critical values battery_voltage = parsed_data.get('V', 0) if battery_voltage > 0: # Get timestamp try: cursor.execute("SELECT * FROM timestamp_table LIMIT 1") row = cursor.fetchone() rtc_time_str = row[1] if row else time.strftime('%Y-%m-%d %H:%M:%S') except: rtc_time_str = time.strftime('%Y-%m-%d %H:%M:%S') # Extract values for database battery_current = parsed_data.get('I', 0) solar_voltage = parsed_data.get('VPV', 0) solar_power = parsed_data.get('PPV', 0) charger_status = parsed_data.get('CS', 0) # Save to database try: cursor.execute(''' INSERT INTO data_MPPT (timestamp, battery_voltage, battery_current, solar_voltage, solar_power, charger_status) VALUES (?, ?, ?, ?, ?, ?)''', (rtc_time_str, battery_voltage, battery_current, solar_voltage, solar_power, charger_status)) conn.commit() log(f"\n✓ Data saved to database at {rtc_time_str}") except sqlite3.Error as e: # Always log database errors regardless of DEBUG_MODE if not DEBUG_MODE: print(f"Database error: {e}") else: log(f"\n✗ Database error: {e}", "ERROR") conn.rollback() else: log("\n✗ Invalid data: Battery voltage is zero or missing", "ERROR") else: log("\n✗ Failed to parse data", "ERROR") else: log("\n✗ No valid data received from MPPT controller", "ERROR") log("\nPossible issues:") log("- Check serial connection (TX/RX/GND)") log("- Verify port is /dev/ttyAMA4") log("- Ensure MPPT is powered on") log("- Check baudrate (should be 19200)") # Always close the connection conn.close() log("\nDone.")