Files
nebuleair_pro_4g/SARA/reboot/start.py
Your Name 1c6af36313 update
2025-03-14 10:57:45 +01:00

343 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
'''
____ _ ____ _
/ ___| / \ | _ \ / \
\___ \ / _ \ | |_) | / _ \
___) / ___ \| _ < / ___ \
|____/_/ \_\_| \_\/_/ \_\
Script that starts at the boot of the RPI (with cron)
@reboot sleep 30 && /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py >> /var/www/nebuleair_pro_4g/logs/app.log 2>&1
/usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/reboot/start.py
'''
import serial
import time
import sys
import json
import re
#get data from config
def load_config(config_file):
try:
with open(config_file, 'r') as file:
config_data = json.load(file)
return config_data
except Exception as e:
print(f"Error loading config file: {e}")
return {}
#Fonction pour mettre à jour le JSON de configuration
def update_json_key(file_path, key, value):
"""
Updates a specific key in a JSON file with a new value.
:param file_path: Path to the JSON file.
:param key: The key to update in the JSON file.
:param value: The new value to assign to the key.
"""
try:
# Load the existing data
with open(file_path, "r") as file:
data = json.load(file)
# Check if the key exists in the JSON file
if key in data:
data[key] = value # Update the key with the new value
else:
print(f"Key '{key}' not found in the JSON file.")
return
# Write the updated data back to the file
with open(file_path, "w") as file:
json.dump(data, file, indent=2) # Use indent for pretty printing
print(f"💾 updating '{key}' to '{value}'.")
except Exception as e:
print(f"Error updating the JSON file: {e}")
# Define the config file path
config_file = '/var/www/nebuleair_pro_4g/config.json'
# Load the configuration data
config = load_config(config_file)
baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4
device_id = config.get('deviceID', '').upper() #device ID en maj
sara_r5_DPD_setup = False
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=baudrate, #115200 ou 9600
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
'''
Fonction très importante !!!
Reads the complete response from a serial connection and waits for specific lines.
'''
if wait_for_lines is None:
wait_for_lines = [] # Default to an empty list if not provided
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
elapsed_time = time.time() - start_time # Time since function start
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout # Reset timeout on new data
# Decode and check for any target line
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
if debug:
print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)")
return decoded_response # Return response immediately if a target line is found
elif time.time() > end_time:
if debug:
print(f"[DEBUG] Timeout reached. No more data received.")
break
time.sleep(0.1) # Short sleep to prevent busy waiting
# Final response and debug output
total_elapsed_time = time.time() - start_time
if debug:
print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️")
# Check if the elapsed time exceeded 10 seconds
if total_elapsed_time > 10 and debug:
print(f"[ALERT] 🚨 The operation took too long 🚨")
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠</span>')
return response.decode('utf-8', errors='replace') # Return the full response if no target line is found
try:
print('<h3>Start reboot python script</h3>')
#check modem status
#Attention:
# SARA R4 response: Manufacturer: u-blox Model: SARA-R410M-02B
# SArA R5 response: SARA-R500S-01B-00
print("Check SARA Status")
command = f'ATI\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_ATI = read_complete_response(ser_sara, wait_for_lines=["IMEI"])
print(response_SARA_ATI)
# Check for SARA model with more robust regex
model = "Unknown"
if "SARA-R410M" in response_SARA_ATI:
model = "SARA-R410M"
print("📱 Detected SARA R4 modem")
elif "SARA-R500" in response_SARA_ATI:
model = "SARA-R500"
print("📱 Detected SARA R5 modem")
sara_r5_DPD_setup = True
else:
# Fallback to regex match if direct string match fails
match = re.search(r"Model:\s*([A-Za-z0-9\-]+)", response_SARA_ATI)
if match:
model = match.group(1).strip()
else:
model = "Unknown"
print("⚠️ Could not identify modem model")
print(f"🔍 Model: {model}")
update_json_key(config_file, "modem_version", model)
time.sleep(1)
'''
AIRCARTO
'''
# 1. Set AIRCARTO URL (profile id = 0)
print('Set aircarto URL')
aircarto_profile_id = 0
aircarto_url="data.nebuleair.fr"
command = f'AT+UHTTP={aircarto_profile_id},1,"{aircarto_url}"\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_1)
time.sleep(1)
'''
uSpot
'''
print("Set uSpot URL with SSL")
security_profile_id = 1
uSpot_profile_id = 1
uSpot_url="api-prod.uspot.probesys.net"
#step 1: import the certificate
print("➡️ import certificate")
certificate_name = "e6"
with open("/var/www/nebuleair_pro_4g/SARA/SSL/certificate/e6.pem", "rb") as cert_file:
certificate = cert_file.read()
size_of_string = len(certificate)
# AT+USECMNG=0,<type>,<internal_name>,<data_size>
# type-> 0 -> trusted root CA
command = f'AT+USECMNG=0,0,"{certificate_name}",{size_of_string}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara)
print(response_SARA_1)
time.sleep(0.5)
print("➡️ add certificate")
ser_sara.write(certificate)
response_SARA_2 = read_complete_response(ser_sara)
print(response_SARA_2)
time.sleep(0.5)
# op_code: 0 -> certificate validation level
# param_val : 0 -> Level 0 No validation; 1-> Level 1 Root certificate validation
print("Set the security profile (params)")
certification_level=0
command = f'AT+USECPRF={security_profile_id},0,{certification_level}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5b = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5b)
time.sleep(0.5)
# op_code: 1 -> minimum SSL/TLS version
# param_val : 0 -> any; server can use any version for the connection; 1-> LSv1.0; 2->TLSv1.1; 3->TLSv1.2;
print("Set the security profile (params)")
minimum_SSL_version = 0
command = f'AT+USECPRF={security_profile_id},1,{minimum_SSL_version}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5bb = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5bb)
time.sleep(0.5)
#op_code: 2 -> legacy cipher suite selection
# 0 (factory-programmed value): a list of default cipher suites is proposed at the beginning of handshake process, and a cipher suite will be negotiated among the cipher suites proposed in the list.
print("Set cipher")
cipher_suite = 0
command = f'AT+USECPRF={security_profile_id},2,{cipher_suite}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5cc = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5cc)
time.sleep(0.5)
# op_code: 3 -> trusted root certificate internal name
print("Set the security profile (choose cert)")
command = f'AT+USECPRF={security_profile_id},3,"{certificate_name}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5c)
time.sleep(0.5)
# op_code: 10 -> SNI (server name indication)
print("Set the SNI")
command = f'AT+USECPRF={security_profile_id},10,"{uSpot_url}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5cf = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5cf)
time.sleep(0.5)
#step 4: set url (op_code = 1)
print("SET URL")
command = f'AT+UHTTP={uSpot_profile_id},1,"{uSpot_url}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5)
time.sleep(1)
#step 4: set PORT (op_code = 5)
print("SET PORT")
port = 443
command = f'AT+UHTTP={uSpot_profile_id},5,{port}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_55)
time.sleep(1)
#step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2)
print("SET SSL")
http_secure = 1
command = f'AT+UHTTP={uSpot_profile_id},6,{http_secure},{security_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_5fg = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5fg)
time.sleep(1)
'''
SARA R5
'''
if sara_r5_DPD_setup:
print("SARA R5 PDP SETUP")
# 2. Activate PDP context 1
print('Activate PDP context 1')
command = f'AT+CGACT=1,1\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_2, end="")
time.sleep(1)
# 2. Set the PDP type
print('Set the PDP type to IPv4 referring to the outputof the +CGDCONT read command')
command = f'AT+UPSD=0,0,0\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_3, end="")
time.sleep(1)
# 2. Profile #0 is mapped on CID=1.
print('Profile #0 is mapped on CID=1.')
command = f'AT+UPSD=0,100,1\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_3, end="")
time.sleep(1)
# 2. Set the PDP type
print('Activate the PSD profile #0: the IPv4 address is already assigned by the network.')
command = f'AT+UPSDA=0,3\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK","+UUPSDA"])
print(response_SARA_3, end="")
time.sleep(1)
#3. Get localisation (CellLocate)
mode = 2 #single shot position
sensor = 2 #use cellular CellLocate® location information
response_type = 0
timeout_s = 2
accuracy_m = 1
command = f'AT+ULOC={mode},{sensor},{response_type},{timeout_s},{accuracy_m}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["+UULOC"])
print(response_SARA_3)
match = re.search(r"\+UULOC: \d{2}/\d{2}/\d{4},\d{2}:\d{2}:\d{2}\.\d{3},([-+]?\d+\.\d+),([-+]?\d+\.\d+)", response_SARA_3)
if match:
latitude = match.group(1)
longitude = match.group(2)
print(f"📍 Latitude: {latitude}, Longitude: {longitude}")
else:
print("❌ Failed to extract coordinates.")
#update config.json
update_json_key(config_file, "latitude_raw", float(latitude))
update_json_key(config_file, "longitude_raw", float(longitude))
time.sleep(1)
except Exception as e:
print("An error occurred:", e)
traceback.print_exc() # This prints the full traceback