v1.6.2: Simplification script boot SARA — config modem deleguee au script principal

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
PaulVua
2026-03-27 17:15:24 +01:00
parent 0ed18dd5c1
commit 9f76e3b2de
3 changed files with 62 additions and 285 deletions

View File

@@ -1,95 +1,57 @@
r''' r'''
____ _ ____ _ ____ _ ____ _
/ ___| / \ | _ \ / \ / ___| / \ | _ \ / \
\___ \ / _ \ | |_) | / _ \ \___ \ / _ \ | |_) | / _ \
___) / ___ \| _ < / ___ \ ___) / ___ \| _ < / ___ \
|____/_/ \_\_| \_\/_/ \_\ |____/_/ \_\_| \_\/_/ \_\
Script that starts at the boot of the RPI (with cron) Script that starts at the boot of the RPI (with cron)
@reboot sleep 30 && /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py >> /var/www/nebuleair_pro_4g/logs/app.log 2>&1 @reboot sleep 30 && /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py >> /var/www/nebuleair_pro_4g/logs/app.log 2>&1
/usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py
Roles:
1. Reset modem_config_mode to 0 (boot safety)
2. Power on SARA modem via GPIO 16
3. Detect modem model (SARA R4 or R5) and save to SQLite
All other configuration (AirCarto URL, uSpot HTTPS, PDP setup, geolocation)
is handled by the main loop script: loop/SARA_send_data_v2.py
''' '''
import serial import serial
import RPi.GPIO as GPIO import RPi.GPIO as GPIO
import time import time
import sys
import json
import re import re
import sqlite3 import sqlite3
import traceback import traceback
#GPIO #GPIO
SARA_power_GPIO = 16 SARA_power_GPIO = 16
SARA_ON_GPIO = 20
GPIO.setmode(GPIO.BCM) # Use BCM numbering GPIO.setmode(GPIO.BCM) # Use BCM numbering
GPIO.setup(SARA_power_GPIO, GPIO.OUT) # Set GPIO17 as an output GPIO.setup(SARA_power_GPIO, GPIO.OUT)
# database connection # database connection
conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db")
cursor = conn.cursor() cursor = conn.cursor()
#get config data from SQLite table
def load_config_sqlite():
"""
Load configuration data from SQLite config table
Returns:
dict: Configuration data with proper type conversion
"""
try:
# Query the config table
cursor.execute("SELECT key, value, type FROM config_table")
rows = cursor.fetchall()
# Create config dictionary
config_data = {}
for key, value, type_name in rows:
# Convert value based on its type
if type_name == 'bool':
config_data[key] = value == '1' or value == 'true'
elif type_name == 'int':
config_data[key] = int(value)
elif type_name == 'float':
config_data[key] = float(value)
else:
config_data[key] = value
return config_data
except Exception as e:
print(f"Error loading config from SQLite: {e}")
return {}
def update_sqlite_config(key, value): def update_sqlite_config(key, value):
""" """
Updates a specific key in the SQLite config_table with a new value. Updates a specific key in the SQLite config_table with a new value.
:param key: The key to update in the config_table.
:param value: The new value to assign to the key.
""" """
try: try:
# Check if the key exists and get its type
cursor.execute("SELECT type FROM config_table WHERE key = ?", (key,)) cursor.execute("SELECT type FROM config_table WHERE key = ?", (key,))
result = cursor.fetchone() result = cursor.fetchone()
if result is None: if result is None:
print(f"Key '{key}' not found in the config_table.") print(f"Key '{key}' not found in the config_table.")
conn.close()
return return
# Get the type of the value from the database
value_type = result[0] value_type = result[0]
# Convert the value to the appropriate string representation based on its type
if value_type == 'bool': if value_type == 'bool':
# Convert Python boolean or string 'true'/'false' to '1'/'0'
if isinstance(value, bool): if isinstance(value, bool):
str_value = '1' if value else '0' str_value = '1' if value else '0'
else: else:
@@ -100,29 +62,23 @@ def update_sqlite_config(key, value):
str_value = str(float(value)) str_value = str(float(value))
else: else:
str_value = str(value) str_value = str(value)
# Update the value in the database
cursor.execute("UPDATE config_table SET value = ? WHERE key = ?", (str_value, key)) cursor.execute("UPDATE config_table SET value = ? WHERE key = ?", (str_value, key))
# Commit the changes and close the connection
conn.commit() conn.commit()
print(f"💾 Updated '{key}' to '{value}' in database.") print(f"Updated '{key}' to '{value}' in database.")
except Exception as e: except Exception as e:
print(f"Error updating the SQLite database: {e}") print(f"Error updating the SQLite database: {e}")
#Load config # Load baudrate from config
config = load_config_sqlite() cursor.execute("SELECT value FROM config_table WHERE key = 'SaraR4_baudrate'")
#config row = cursor.fetchone()
baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4 baudrate = int(row[0]) if row else 115200
device_id = config.get('deviceID', '').upper() #device ID en maj
sara_r5_DPD_setup = False
ser_sara = serial.Serial( ser_sara = serial.Serial(
port='/dev/ttyAMA2', port='/dev/ttyAMA2',
baudrate=baudrate, #115200 ou 9600 baudrate=baudrate,
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS, bytesize=serial.EIGHTBITS,
timeout = 2 timeout = 2
@@ -130,11 +86,10 @@ ser_sara = serial.Serial(
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True): def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
''' '''
Fonction très importante !!!
Reads the complete response from a serial connection and waits for specific lines. Reads the complete response from a serial connection and waits for specific lines.
''' '''
if wait_for_lines is None: if wait_for_lines is None:
wait_for_lines = [] # Default to an empty list if not provided wait_for_lines = []
response = bytearray() response = bytearray()
serial_connection.timeout = timeout serial_connection.timeout = timeout
@@ -142,264 +97,72 @@ def read_complete_response(serial_connection, timeout=2, end_of_response_timeout
start_time = time.time() start_time = time.time()
while True: while True:
elapsed_time = time.time() - start_time # Time since function start elapsed_time = time.time() - start_time
if serial_connection.in_waiting > 0: if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting) data = serial_connection.read(serial_connection.in_waiting)
response.extend(data) response.extend(data)
end_time = time.time() + end_of_response_timeout # Reset timeout on new data end_time = time.time() + end_of_response_timeout
# Decode and check for any target line
decoded_response = response.decode('utf-8', errors='replace') decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines: for target_line in wait_for_lines:
if target_line in decoded_response: if target_line in decoded_response:
if debug: if debug:
print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)") print(f"[DEBUG] Found target line: {target_line} (in {elapsed_time:.2f}s)")
return decoded_response # Return response immediately if a target line is found return decoded_response
elif time.time() > end_time: elif time.time() > end_time:
if debug: if debug:
print(f"[DEBUG] Timeout reached. No more data received.") print(f"[DEBUG] Timeout reached. No more data received.")
break break
time.sleep(0.1) # Short sleep to prevent busy waiting time.sleep(0.1)
# Final response and debug output
total_elapsed_time = time.time() - start_time total_elapsed_time = time.time() - start_time
if debug: if debug:
print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️") print(f"[DEBUG] elapsed time: {total_elapsed_time:.2f}s.")
# Check if the elapsed time exceeded 10 seconds
if total_elapsed_time > 10 and debug: if total_elapsed_time > 10 and debug:
print(f"[ALERT] 🚨 The operation took too long 🚨") print(f"[ALERT] The operation took too long ({total_elapsed_time:.2f}s)")
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠</span>')
return response.decode('utf-8', errors='replace') # Return the full response if no target line is found return response.decode('utf-8', errors='replace')
try: try:
print('<h3>Start reboot python script</h3>') print('<h3>Start reboot python script</h3>')
# Reset modem_config_mode at boot to prevent capteur from staying stuck in config mode # 1. Reset modem_config_mode at boot to prevent capteur from staying stuck in config mode
cursor.execute("UPDATE config_table SET value = '0' WHERE key = 'modem_config_mode'") cursor.execute("UPDATE config_table SET value = '0' WHERE key = 'modem_config_mode'")
conn.commit() conn.commit()
print("modem_config_mode reset to 0 (boot safety)") print("modem_config_mode reset to 0 (boot safety)")
#First we need to power on the module (if connected to mosfet via gpio16) # 2. Power on the module (MOSFET via GPIO 16)
GPIO.output(SARA_power_GPIO, GPIO.HIGH) GPIO.output(SARA_power_GPIO, GPIO.HIGH)
time.sleep(5) time.sleep(5)
#check modem status # 3. Detect modem model
#Attention: # SARA R4 response: Manufacturer: u-blox Model: SARA-R410M-02B
# SARA R4 response: Manufacturer: u-blox Model: SARA-R410M-02B # SARA R5 response: SARA-R500S-01B-00
# SArA R5 response: SARA-R500S-01B-00 print("Check SARA Status")
print("Check SARA Status")
command = f'ATI\r' command = f'ATI\r'
ser_sara.write(command.encode('utf-8')) ser_sara.write(command.encode('utf-8'))
response_SARA_ATI = read_complete_response(ser_sara, wait_for_lines=["IMEI"]) response_SARA_ATI = read_complete_response(ser_sara, wait_for_lines=["IMEI"])
print(response_SARA_ATI) print(response_SARA_ATI)
# Check for SARA model with more robust regex
model = "Unknown" model = "Unknown"
if "SARA-R410M" in response_SARA_ATI: if "SARA-R410M" in response_SARA_ATI:
model = "SARA-R410M" model = "SARA-R410M"
print("📱 Detected SARA R4 modem") print("Detected SARA R4 modem")
elif "SARA-R500" in response_SARA_ATI: elif "SARA-R500" in response_SARA_ATI:
model = "SARA-R500" model = "SARA-R500"
print("📱 Detected SARA R5 modem") print("Detected SARA R5 modem")
sara_r5_DPD_setup = True
else: else:
# Fallback to regex match if direct string match fails
match = re.search(r"Model:\s*([A-Za-z0-9\-]+)", response_SARA_ATI) match = re.search(r"Model:\s*([A-Za-z0-9\-]+)", response_SARA_ATI)
if match: if match:
model = match.group(1).strip() model = match.group(1).strip()
else: else:
model = "Unknown" print("Could not identify modem model")
print("⚠️ Could not identify modem model")
print(f"🔍 Model: {model}") print(f"Model: {model}")
update_sqlite_config("modem_version", model) update_sqlite_config("modem_version", model)
time.sleep(1)
'''
AIRCARTO
'''
# 1. Set AIRCARTO URL (profile id = 0)
print('Set aircarto URL')
aircarto_profile_id = 0
aircarto_url="data.nebuleair.fr"
command = f'AT+UHTTP={aircarto_profile_id},1,"{aircarto_url}"\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_1)
time.sleep(1)
''' print('<h3>Boot script complete. Modem ready for main loop.</h3>')
uSpot
'''
print("Set uSpot URL with SSL")
security_profile_id = 1
uSpot_profile_id = 1
uSpot_url="api-prod.uspot.probesys.net"
#step 1: import the certificate
print("➡️ import certificate")
certificate_name = "e6"
with open("/var/www/nebuleair_pro_4g/SARA/SSL/certificate/e6.pem", "rb") as cert_file:
certificate = cert_file.read()
size_of_string = len(certificate)
# AT+USECMNG=0,<type>,<internal_name>,<data_size>
# type-> 0 -> trusted root CA
command = f'AT+USECMNG=0,0,"{certificate_name}",{size_of_string}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara)
print(response_SARA_1)
time.sleep(0.5)
print("➡️ add certificate")
ser_sara.write(certificate)
response_SARA_2 = read_complete_response(ser_sara)
print(response_SARA_2)
time.sleep(0.5)
# op_code: 0 -> certificate validation level
# param_val : 0 -> Level 0 No validation; 1-> Level 1 Root certificate validation
print("Set the security profile (params)")
certification_level=0
command = f'AT+USECPRF={security_profile_id},0,{certification_level}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5b = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5b)
time.sleep(0.5)
# op_code: 1 -> minimum SSL/TLS version
# param_val : 0 -> any; server can use any version for the connection; 1-> LSv1.0; 2->TLSv1.1; 3->TLSv1.2;
print("Set the security profile (params)")
minimum_SSL_version = 0
command = f'AT+USECPRF={security_profile_id},1,{minimum_SSL_version}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5bb = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5bb)
time.sleep(0.5)
#op_code: 2 -> legacy cipher suite selection
# 0 (factory-programmed value): a list of default cipher suites is proposed at the beginning of handshake process, and a cipher suite will be negotiated among the cipher suites proposed in the list.
print("Set cipher")
cipher_suite = 0
command = f'AT+USECPRF={security_profile_id},2,{cipher_suite}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5cc = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5cc)
time.sleep(0.5)
# op_code: 3 -> trusted root certificate internal name
print("Set the security profile (choose cert)")
command = f'AT+USECPRF={security_profile_id},3,"{certificate_name}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5c)
time.sleep(0.5)
# op_code: 10 -> SNI (server name indication)
print("Set the SNI")
command = f'AT+USECPRF={security_profile_id},10,"{uSpot_url}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5cf = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5cf)
time.sleep(0.5)
#step 4: set url (op_code = 1)
print("SET URL")
command = f'AT+UHTTP={uSpot_profile_id},1,"{uSpot_url}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5)
time.sleep(1)
#step 4: set PORT (op_code = 5)
print("SET PORT")
port = 443
command = f'AT+UHTTP={uSpot_profile_id},5,{port}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_55)
time.sleep(1)
#step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2)
print("SET SSL")
http_secure = 1
command = f'AT+UHTTP={uSpot_profile_id},6,{http_secure},{security_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_5fg = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5fg)
time.sleep(1)
'''
SARA R5
'''
if sara_r5_DPD_setup:
print("SARA R5 PDP SETUP")
# 2. Activate PDP context 1
print('Activate PDP context 1')
command = f'AT+CGACT=1,1\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_2, end="")
time.sleep(1)
# 2. Set the PDP type
print('Set the PDP type to IPv4 referring to the outputof the +CGDCONT read command')
command = f'AT+UPSD=0,0,0\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_3, end="")
time.sleep(1)
# 2. Profile #0 is mapped on CID=1.
print('Profile #0 is mapped on CID=1.')
command = f'AT+UPSD=0,100,1\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_3, end="")
time.sleep(1)
# 2. Set the PDP type
print('Activate the PSD profile #0: the IPv4 address is already assigned by the network.')
command = f'AT+UPSDA=0,3\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK","+UUPSDA"])
print(response_SARA_3, end="")
time.sleep(1)
#3. Get localisation (CellLocate)
mode = 2 #single shot position
sensor = 2 #use cellular CellLocate® location information
response_type = 0
timeout_s = 2
accuracy_m = 1
command = f'AT+ULOC={mode},{sensor},{response_type},{timeout_s},{accuracy_m}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["+UULOC"])
print(response_SARA_3)
match = re.search(r"\+UULOC: \d{2}/\d{2}/\d{4},\d{2}:\d{2}:\d{2}\.\d{3},([-+]?\d+\.\d+),([-+]?\d+\.\d+)", response_SARA_3)
if match:
latitude = match.group(1)
longitude = match.group(2)
print(f"📍 Latitude: {latitude}, Longitude: {longitude}")
#update sqlite table
update_sqlite_config("latitude_raw", float(latitude))
update_sqlite_config("longitude_raw", float(longitude))
else:
print("❌ Failed to extract coordinates.")
time.sleep(1)
except Exception as e: except Exception as e:
print("An error occurred:", e) print("An error occurred:", e)
traceback.print_exc() # This prints the full traceback traceback.print_exc()

View File

@@ -1 +1 @@
1.6.1 1.6.2

View File

@@ -1,5 +1,19 @@
{ {
"versions": [ "versions": [
{
"version": "1.6.2",
"date": "2026-03-27",
"changes": {
"features": [],
"improvements": [
"Simplification du script de boot SARA (start.py): suppression config AirCarto, uSpot/SSL, PDP et geolocalisation",
"La configuration modem est desormais entierement geree par le script principal (SARA_send_data_v2.py)"
],
"fixes": [],
"compatibility": []
},
"notes": "Le script de boot ne fait plus que 3 choses: reset modem_config_mode, alimentation modem GPIO 16, detection modele R4/R5. Toute la configuration (URLs, certificats, PDP, geolocalisation) est deja geree par le script principal qui tourne chaque minute avec gestion d'erreur et retry."
},
{ {
"version": "1.6.1", "version": "1.6.1",
"date": "2026-03-19", "date": "2026-03-19",