diff --git a/SARA/reboot/start.py b/SARA/reboot/start.py index bca10f6..a30a4c2 100644 --- a/SARA/reboot/start.py +++ b/SARA/reboot/start.py @@ -1,95 +1,57 @@ r''' - ____ _ ____ _ - / ___| / \ | _ \ / \ - \___ \ / _ \ | |_) | / _ \ - ___) / ___ \| _ < / ___ \ + ____ _ ____ _ + / ___| / \ | _ \ / \ + \___ \ / _ \ | |_) | / _ \ + ___) / ___ \| _ < / ___ \ |____/_/ \_\_| \_\/_/ \_\ - + Script that starts at the boot of the RPI (with cron) @reboot sleep 30 && /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py >> /var/www/nebuleair_pro_4g/logs/app.log 2>&1 /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py +Roles: + 1. Reset modem_config_mode to 0 (boot safety) + 2. Power on SARA modem via GPIO 16 + 3. Detect modem model (SARA R4 or R5) and save to SQLite + +All other configuration (AirCarto URL, uSpot HTTPS, PDP setup, geolocation) +is handled by the main loop script: loop/SARA_send_data_v2.py ''' import serial import RPi.GPIO as GPIO import time -import sys -import json import re import sqlite3 import traceback #GPIO -SARA_power_GPIO = 16 -SARA_ON_GPIO = 20 +SARA_power_GPIO = 16 GPIO.setmode(GPIO.BCM) # Use BCM numbering -GPIO.setup(SARA_power_GPIO, GPIO.OUT) # Set GPIO17 as an output +GPIO.setup(SARA_power_GPIO, GPIO.OUT) # database connection conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() -#get config data from SQLite table -def load_config_sqlite(): - """ - Load configuration data from SQLite config table - - Returns: - dict: Configuration data with proper type conversion - """ - try: - - # Query the config table - cursor.execute("SELECT key, value, type FROM config_table") - rows = cursor.fetchall() - - # Create config dictionary - config_data = {} - for key, value, type_name in rows: - # Convert value based on its type - if type_name == 'bool': - config_data[key] = value == '1' or value == 'true' - elif type_name == 'int': - config_data[key] = int(value) - elif type_name == 'float': - config_data[key] = float(value) - else: - config_data[key] = value - - return config_data - - except Exception as e: - print(f"Error loading config from SQLite: {e}") - return {} - def update_sqlite_config(key, value): """ Updates a specific key in the SQLite config_table with a new value. - - :param key: The key to update in the config_table. - :param value: The new value to assign to the key. """ try: - - # Check if the key exists and get its type cursor.execute("SELECT type FROM config_table WHERE key = ?", (key,)) result = cursor.fetchone() - + if result is None: print(f"Key '{key}' not found in the config_table.") - conn.close() return - - # Get the type of the value from the database + value_type = result[0] - - # Convert the value to the appropriate string representation based on its type + if value_type == 'bool': - # Convert Python boolean or string 'true'/'false' to '1'/'0' if isinstance(value, bool): str_value = '1' if value else '0' else: @@ -100,29 +62,23 @@ def update_sqlite_config(key, value): str_value = str(float(value)) else: str_value = str(value) - - # Update the value in the database + cursor.execute("UPDATE config_table SET value = ? WHERE key = ?", (str_value, key)) - - # Commit the changes and close the connection conn.commit() - - print(f"💾 Updated '{key}' to '{value}' in database.") + + print(f"Updated '{key}' to '{value}' in database.") except Exception as e: print(f"Error updating the SQLite database: {e}") -#Load config -config = load_config_sqlite() -#config -baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4 -device_id = config.get('deviceID', '').upper() #device ID en maj - -sara_r5_DPD_setup = False +# Load baudrate from config +cursor.execute("SELECT value FROM config_table WHERE key = 'SaraR4_baudrate'") +row = cursor.fetchone() +baudrate = int(row[0]) if row else 115200 ser_sara = serial.Serial( port='/dev/ttyAMA2', - baudrate=baudrate, #115200 ou 9600 - parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD + baudrate=baudrate, + parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 2 @@ -130,11 +86,10 @@ ser_sara = serial.Serial( def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True): ''' - Fonction très importante !!! Reads the complete response from a serial connection and waits for specific lines. ''' if wait_for_lines is None: - wait_for_lines = [] # Default to an empty list if not provided + wait_for_lines = [] response = bytearray() serial_connection.timeout = timeout @@ -142,264 +97,72 @@ def read_complete_response(serial_connection, timeout=2, end_of_response_timeout start_time = time.time() while True: - elapsed_time = time.time() - start_time # Time since function start + elapsed_time = time.time() - start_time if serial_connection.in_waiting > 0: data = serial_connection.read(serial_connection.in_waiting) response.extend(data) - end_time = time.time() + end_of_response_timeout # Reset timeout on new data + end_time = time.time() + end_of_response_timeout - # Decode and check for any target line decoded_response = response.decode('utf-8', errors='replace') for target_line in wait_for_lines: if target_line in decoded_response: if debug: - print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)") - return decoded_response # Return response immediately if a target line is found + print(f"[DEBUG] Found target line: {target_line} (in {elapsed_time:.2f}s)") + return decoded_response elif time.time() > end_time: if debug: print(f"[DEBUG] Timeout reached. No more data received.") break - time.sleep(0.1) # Short sleep to prevent busy waiting + time.sleep(0.1) - # Final response and debug output total_elapsed_time = time.time() - start_time if debug: - print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️") - # Check if the elapsed time exceeded 10 seconds + print(f"[DEBUG] elapsed time: {total_elapsed_time:.2f}s.") if total_elapsed_time > 10 and debug: - print(f"[ALERT] 🚨 The operation took too long 🚨") - print(f'[ALERT] ⚠️{total_elapsed_time:.2f}s⚠️') + print(f"[ALERT] The operation took too long ({total_elapsed_time:.2f}s)") - return response.decode('utf-8', errors='replace') # Return the full response if no target line is found + return response.decode('utf-8', errors='replace') try: print('

Start reboot python script

') - # Reset modem_config_mode at boot to prevent capteur from staying stuck in config mode + # 1. Reset modem_config_mode at boot to prevent capteur from staying stuck in config mode cursor.execute("UPDATE config_table SET value = '0' WHERE key = 'modem_config_mode'") conn.commit() print("modem_config_mode reset to 0 (boot safety)") - #First we need to power on the module (if connected to mosfet via gpio16) + # 2. Power on the module (MOSFET via GPIO 16) GPIO.output(SARA_power_GPIO, GPIO.HIGH) time.sleep(5) - #check modem status - #Attention: - # SARA R4 response: Manufacturer: u-blox Model: SARA-R410M-02B - # SArA R5 response: SARA-R500S-01B-00 - print("⚙️Check SARA Status") + # 3. Detect modem model + # SARA R4 response: Manufacturer: u-blox Model: SARA-R410M-02B + # SARA R5 response: SARA-R500S-01B-00 + print("Check SARA Status") command = f'ATI\r' ser_sara.write(command.encode('utf-8')) response_SARA_ATI = read_complete_response(ser_sara, wait_for_lines=["IMEI"]) print(response_SARA_ATI) - # Check for SARA model with more robust regex model = "Unknown" if "SARA-R410M" in response_SARA_ATI: model = "SARA-R410M" - print("📱 Detected SARA R4 modem") + print("Detected SARA R4 modem") elif "SARA-R500" in response_SARA_ATI: model = "SARA-R500" - print("📱 Detected SARA R5 modem") - sara_r5_DPD_setup = True + print("Detected SARA R5 modem") else: - # Fallback to regex match if direct string match fails match = re.search(r"Model:\s*([A-Za-z0-9\-]+)", response_SARA_ATI) if match: model = match.group(1).strip() else: - model = "Unknown" - print("⚠️ Could not identify modem model") + print("Could not identify modem model") - print(f"🔍 Model: {model}") + print(f"Model: {model}") update_sqlite_config("modem_version", model) - time.sleep(1) - - ''' - AIRCARTO - ''' - # 1. Set AIRCARTO URL (profile id = 0) - print('➡️Set aircarto URL') - aircarto_profile_id = 0 - aircarto_url="data.nebuleair.fr" - command = f'AT+UHTTP={aircarto_profile_id},1,"{aircarto_url}"\r' - ser_sara.write(command.encode('utf-8')) - response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_1) - time.sleep(1) - ''' - uSpot - ''' - print("➡️➡️Set uSpot URL with SSL") - - security_profile_id = 1 - uSpot_profile_id = 1 - uSpot_url="api-prod.uspot.probesys.net" - - - #step 1: import the certificate - print("➡️ import certificate") - certificate_name = "e6" - with open("/var/www/nebuleair_pro_4g/SARA/SSL/certificate/e6.pem", "rb") as cert_file: - certificate = cert_file.read() - size_of_string = len(certificate) - - # AT+USECMNG=0,,, - # type-> 0 -> trusted root CA - command = f'AT+USECMNG=0,0,"{certificate_name}",{size_of_string}\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_1 = read_complete_response(ser_sara) - print(response_SARA_1) - - time.sleep(0.5) - - print("➡️ add certificate") - ser_sara.write(certificate) - response_SARA_2 = read_complete_response(ser_sara) - print(response_SARA_2) - - time.sleep(0.5) - - # op_code: 0 -> certificate validation level - # param_val : 0 -> Level 0 No validation; 1-> Level 1 Root certificate validation - print("➡️Set the security profile (params)") - certification_level=0 - command = f'AT+USECPRF={security_profile_id},0,{certification_level}\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_5b = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5b) - time.sleep(0.5) - - # op_code: 1 -> minimum SSL/TLS version - # param_val : 0 -> any; server can use any version for the connection; 1-> LSv1.0; 2->TLSv1.1; 3->TLSv1.2; - print("➡️Set the security profile (params)") - minimum_SSL_version = 0 - command = f'AT+USECPRF={security_profile_id},1,{minimum_SSL_version}\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_5bb = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5bb) - time.sleep(0.5) - - #op_code: 2 -> legacy cipher suite selection - # 0 (factory-programmed value): a list of default cipher suites is proposed at the beginning of handshake process, and a cipher suite will be negotiated among the cipher suites proposed in the list. - print("➡️Set cipher") - cipher_suite = 0 - command = f'AT+USECPRF={security_profile_id},2,{cipher_suite}\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_5cc = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5cc) - time.sleep(0.5) - - # op_code: 3 -> trusted root certificate internal name - print("➡️Set the security profile (choose cert)") - command = f'AT+USECPRF={security_profile_id},3,"{certificate_name}"\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5c) - time.sleep(0.5) - - # op_code: 10 -> SNI (server name indication) - print("➡️Set the SNI") - command = f'AT+USECPRF={security_profile_id},10,"{uSpot_url}"\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_5cf = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5cf) - time.sleep(0.5) - - #step 4: set url (op_code = 1) - print("➡️SET URL") - command = f'AT+UHTTP={uSpot_profile_id},1,"{uSpot_url}"\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5) - time.sleep(1) - - #step 4: set PORT (op_code = 5) - print("➡️SET PORT") - port = 443 - command = f'AT+UHTTP={uSpot_profile_id},5,{port}\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_55) - time.sleep(1) - - #step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2) - print("➡️SET SSL") - http_secure = 1 - command = f'AT+UHTTP={uSpot_profile_id},6,{http_secure},{security_profile_id}\r' - - ser_sara.write(command.encode('utf-8')) - response_SARA_5fg = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5fg) - time.sleep(1) - - - ''' - SARA R5 - ''' - - if sara_r5_DPD_setup: - print("➡️➡️SARA R5 PDP SETUP") - # 2. Activate PDP context 1 - print('➡️Activate PDP context 1') - command = f'AT+CGACT=1,1\r' - ser_sara.write(command.encode('utf-8')) - response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_2, end="") - time.sleep(1) - - # 2. Set the PDP type - print('➡️Set the PDP type to IPv4 referring to the outputof the +CGDCONT read command') - command = f'AT+UPSD=0,0,0\r' - ser_sara.write(command.encode('utf-8')) - response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_3, end="") - time.sleep(1) - - # 2. Profile #0 is mapped on CID=1. - print('➡️Profile #0 is mapped on CID=1.') - command = f'AT+UPSD=0,100,1\r' - ser_sara.write(command.encode('utf-8')) - response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_3, end="") - time.sleep(1) - - # 2. Set the PDP type - print('➡️Activate the PSD profile #0: the IPv4 address is already assigned by the network.') - command = f'AT+UPSDA=0,3\r' - ser_sara.write(command.encode('utf-8')) - response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK","+UUPSDA"]) - print(response_SARA_3, end="") - time.sleep(1) - - - #3. Get localisation (CellLocate) - mode = 2 #single shot position - sensor = 2 #use cellular CellLocate® location information - response_type = 0 - timeout_s = 2 - accuracy_m = 1 - command = f'AT+ULOC={mode},{sensor},{response_type},{timeout_s},{accuracy_m}\r' - ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["+UULOC"]) - print(response_SARA_3) - - match = re.search(r"\+UULOC: \d{2}/\d{2}/\d{4},\d{2}:\d{2}:\d{2}\.\d{3},([-+]?\d+\.\d+),([-+]?\d+\.\d+)", response_SARA_3) - if match: - latitude = match.group(1) - longitude = match.group(2) - print(f"📍 Latitude: {latitude}, Longitude: {longitude}") - #update sqlite table - update_sqlite_config("latitude_raw", float(latitude)) - update_sqlite_config("longitude_raw", float(longitude)) - else: - print("❌ Failed to extract coordinates.") - - - time.sleep(1) + print('

Boot script complete. Modem ready for main loop.

') except Exception as e: print("An error occurred:", e) - traceback.print_exc() # This prints the full traceback \ No newline at end of file + traceback.print_exc() \ No newline at end of file diff --git a/VERSION b/VERSION index 9c6d629..fdd3be6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.6.1 +1.6.2 diff --git a/changelog.json b/changelog.json index 590e3b6..74ff5b8 100644 --- a/changelog.json +++ b/changelog.json @@ -1,5 +1,19 @@ { "versions": [ + { + "version": "1.6.2", + "date": "2026-03-27", + "changes": { + "features": [], + "improvements": [ + "Simplification du script de boot SARA (start.py): suppression config AirCarto, uSpot/SSL, PDP et geolocalisation", + "La configuration modem est desormais entierement geree par le script principal (SARA_send_data_v2.py)" + ], + "fixes": [], + "compatibility": [] + }, + "notes": "Le script de boot ne fait plus que 3 choses: reset modem_config_mode, alimentation modem GPIO 16, detection modele R4/R5. Toute la configuration (URLs, certificats, PDP, geolocalisation) est deja geree par le script principal qui tourne chaque minute avec gestion d'erreur et retry." + }, { "version": "1.6.1", "date": "2026-03-19",