Files
2026-03-27 17:15:24 +01:00

168 lines
5.4 KiB
Python

r'''
____ _ ____ _
/ ___| / \ | _ \ / \
\___ \ / _ \ | |_) | / _ \
___) / ___ \| _ < / ___ \
|____/_/ \_\_| \_\/_/ \_\
Script that starts at the boot of the RPI (with cron)
@reboot sleep 30 && /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py >> /var/www/nebuleair_pro_4g/logs/app.log 2>&1
/usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py
Roles:
1. Reset modem_config_mode to 0 (boot safety)
2. Power on SARA modem via GPIO 16
3. Detect modem model (SARA R4 or R5) and save to SQLite
All other configuration (AirCarto URL, uSpot HTTPS, PDP setup, geolocation)
is handled by the main loop script: loop/SARA_send_data_v2.py
'''
import serial
import RPi.GPIO as GPIO
import time
import re
import sqlite3
import traceback
#GPIO
SARA_power_GPIO = 16
GPIO.setmode(GPIO.BCM) # Use BCM numbering
GPIO.setup(SARA_power_GPIO, GPIO.OUT)
# database connection
conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db")
cursor = conn.cursor()
def update_sqlite_config(key, value):
"""
Updates a specific key in the SQLite config_table with a new value.
"""
try:
cursor.execute("SELECT type FROM config_table WHERE key = ?", (key,))
result = cursor.fetchone()
if result is None:
print(f"Key '{key}' not found in the config_table.")
return
value_type = result[0]
if value_type == 'bool':
if isinstance(value, bool):
str_value = '1' if value else '0'
else:
str_value = '1' if str(value).lower() in ('true', '1', 'yes', 'y') else '0'
elif value_type == 'int':
str_value = str(int(value))
elif value_type == 'float':
str_value = str(float(value))
else:
str_value = str(value)
cursor.execute("UPDATE config_table SET value = ? WHERE key = ?", (str_value, key))
conn.commit()
print(f"Updated '{key}' to '{value}' in database.")
except Exception as e:
print(f"Error updating the SQLite database: {e}")
# Load baudrate from config
cursor.execute("SELECT value FROM config_table WHERE key = 'SaraR4_baudrate'")
row = cursor.fetchone()
baudrate = int(row[0]) if row else 115200
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
'''
Reads the complete response from a serial connection and waits for specific lines.
'''
if wait_for_lines is None:
wait_for_lines = []
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
elapsed_time = time.time() - start_time
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
if debug:
print(f"[DEBUG] Found target line: {target_line} (in {elapsed_time:.2f}s)")
return decoded_response
elif time.time() > end_time:
if debug:
print(f"[DEBUG] Timeout reached. No more data received.")
break
time.sleep(0.1)
total_elapsed_time = time.time() - start_time
if debug:
print(f"[DEBUG] elapsed time: {total_elapsed_time:.2f}s.")
if total_elapsed_time > 10 and debug:
print(f"[ALERT] The operation took too long ({total_elapsed_time:.2f}s)")
return response.decode('utf-8', errors='replace')
try:
print('<h3>Start reboot python script</h3>')
# 1. Reset modem_config_mode at boot to prevent capteur from staying stuck in config mode
cursor.execute("UPDATE config_table SET value = '0' WHERE key = 'modem_config_mode'")
conn.commit()
print("modem_config_mode reset to 0 (boot safety)")
# 2. Power on the module (MOSFET via GPIO 16)
GPIO.output(SARA_power_GPIO, GPIO.HIGH)
time.sleep(5)
# 3. Detect modem model
# SARA R4 response: Manufacturer: u-blox Model: SARA-R410M-02B
# SARA R5 response: SARA-R500S-01B-00
print("Check SARA Status")
command = f'ATI\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_ATI = read_complete_response(ser_sara, wait_for_lines=["IMEI"])
print(response_SARA_ATI)
model = "Unknown"
if "SARA-R410M" in response_SARA_ATI:
model = "SARA-R410M"
print("Detected SARA R4 modem")
elif "SARA-R500" in response_SARA_ATI:
model = "SARA-R500"
print("Detected SARA R5 modem")
else:
match = re.search(r"Model:\s*([A-Za-z0-9\-]+)", response_SARA_ATI)
if match:
model = match.group(1).strip()
else:
print("Could not identify modem model")
print(f"Model: {model}")
update_sqlite_config("modem_version", model)
print('<h3>Boot script complete. Modem ready for main loop.</h3>')
except Exception as e:
print("An error occurred:", e)
traceback.print_exc()