263 lines
8.8 KiB
Python
263 lines
8.8 KiB
Python
'''
|
||
____ _ ____ _
|
||
/ ___| / \ | _ \ / \
|
||
\___ \ / _ \ | |_) | / _ \
|
||
___) / ___ \| _ < / ___ \
|
||
|____/_/ \_\_| \_\/_/ \_\
|
||
|
||
Script that starts at the boot of the RPI (with cron)
|
||
|
||
@reboot sleep 30 && /usr/bin/python3 /var/www/moduleair_pro_4g/SARA/reboot/start.py >> /var/www/moduleair_pro_4g/logs/app.log 2>&1
|
||
|
||
/usr/bin/python3 /var/www/moduleair_pro_4g/SARA/reboot/start.py
|
||
|
||
'''
|
||
import serial
|
||
import time
|
||
import sys
|
||
import json
|
||
import re
|
||
import sqlite3
|
||
|
||
# database connection
|
||
conn = sqlite3.connect("/var/www/moduleair_pro_4g/sqlite/sensors.db")
|
||
cursor = conn.cursor()
|
||
|
||
#get config data from SQLite table
|
||
def load_config_sqlite():
|
||
"""
|
||
Load configuration data from SQLite config table
|
||
|
||
Returns:
|
||
dict: Configuration data with proper type conversion
|
||
"""
|
||
try:
|
||
|
||
# Query the config table
|
||
cursor.execute("SELECT key, value, type FROM config_table")
|
||
rows = cursor.fetchall()
|
||
|
||
# Create config dictionary
|
||
config_data = {}
|
||
for key, value, type_name in rows:
|
||
# Convert value based on its type
|
||
if type_name == 'bool':
|
||
config_data[key] = value == '1' or value == 'true'
|
||
elif type_name == 'int':
|
||
config_data[key] = int(value)
|
||
elif type_name == 'float':
|
||
config_data[key] = float(value)
|
||
else:
|
||
config_data[key] = value
|
||
|
||
return config_data
|
||
|
||
except Exception as e:
|
||
print(f"Error loading config from SQLite: {e}")
|
||
return {}
|
||
|
||
def update_sqlite_config(key, value):
|
||
"""
|
||
Updates a specific key in the SQLite config_table with a new value.
|
||
|
||
:param key: The key to update in the config_table.
|
||
:param value: The new value to assign to the key.
|
||
"""
|
||
try:
|
||
|
||
# Check if the key exists and get its type
|
||
cursor.execute("SELECT type FROM config_table WHERE key = ?", (key,))
|
||
result = cursor.fetchone()
|
||
|
||
if result is None:
|
||
print(f"Key '{key}' not found in the config_table.")
|
||
conn.close()
|
||
return
|
||
|
||
# Get the type of the value from the database
|
||
value_type = result[0]
|
||
|
||
# Convert the value to the appropriate string representation based on its type
|
||
if value_type == 'bool':
|
||
# Convert Python boolean or string 'true'/'false' to '1'/'0'
|
||
if isinstance(value, bool):
|
||
str_value = '1' if value else '0'
|
||
else:
|
||
str_value = '1' if str(value).lower() in ('true', '1', 'yes', 'y') else '0'
|
||
elif value_type == 'int':
|
||
str_value = str(int(value))
|
||
elif value_type == 'float':
|
||
str_value = str(float(value))
|
||
else:
|
||
str_value = str(value)
|
||
|
||
# Update the value in the database
|
||
cursor.execute("UPDATE config_table SET value = ? WHERE key = ?", (str_value, key))
|
||
|
||
# Commit the changes and close the connection
|
||
conn.commit()
|
||
|
||
print(f"💾 Updated '{key}' to '{value}' in database.")
|
||
except Exception as e:
|
||
print(f"Error updating the SQLite database: {e}")
|
||
|
||
#Load config
|
||
config = load_config_sqlite()
|
||
#config
|
||
baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4
|
||
device_id = config.get('deviceID', '').upper() #device ID en maj
|
||
sara_r5_DPD_setup = False
|
||
|
||
ser_sara = serial.Serial(
|
||
port='/dev/ttyAMA2',
|
||
baudrate=baudrate, #115200 ou 9600
|
||
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD
|
||
stopbits=serial.STOPBITS_ONE,
|
||
bytesize=serial.EIGHTBITS,
|
||
timeout = 2
|
||
)
|
||
|
||
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
|
||
'''
|
||
Fonction très importante !!!
|
||
Reads the complete response from a serial connection and waits for specific lines.
|
||
'''
|
||
if wait_for_lines is None:
|
||
wait_for_lines = [] # Default to an empty list if not provided
|
||
|
||
response = bytearray()
|
||
serial_connection.timeout = timeout
|
||
end_time = time.time() + end_of_response_timeout
|
||
start_time = time.time()
|
||
|
||
while True:
|
||
elapsed_time = time.time() - start_time # Time since function start
|
||
if serial_connection.in_waiting > 0:
|
||
data = serial_connection.read(serial_connection.in_waiting)
|
||
response.extend(data)
|
||
end_time = time.time() + end_of_response_timeout # Reset timeout on new data
|
||
|
||
# Decode and check for any target line
|
||
decoded_response = response.decode('utf-8', errors='replace')
|
||
for target_line in wait_for_lines:
|
||
if target_line in decoded_response:
|
||
if debug:
|
||
print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)")
|
||
return decoded_response # Return response immediately if a target line is found
|
||
elif time.time() > end_time:
|
||
if debug:
|
||
print(f"[DEBUG] Timeout reached. No more data received.")
|
||
break
|
||
time.sleep(0.1) # Short sleep to prevent busy waiting
|
||
|
||
# Final response and debug output
|
||
total_elapsed_time = time.time() - start_time
|
||
if debug:
|
||
print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️")
|
||
# Check if the elapsed time exceeded 10 seconds
|
||
if total_elapsed_time > 10 and debug:
|
||
print(f"[ALERT] 🚨 The operation took too long 🚨")
|
||
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠️</span>')
|
||
|
||
return response.decode('utf-8', errors='replace') # Return the full response if no target line is found
|
||
|
||
|
||
|
||
try:
|
||
print('<h3>Start reboot python script</h3>')
|
||
|
||
#check modem status
|
||
#Attention:
|
||
# SARA R4 response: Manufacturer: u-blox Model: SARA-R410M-02B
|
||
# SArA R5 response: SARA-R500S-01B-00
|
||
print("⚙️Check SARA Status")
|
||
command = f'ATI\r'
|
||
ser_sara.write(command.encode('utf-8'))
|
||
response_SARA_ATI = read_complete_response(ser_sara, wait_for_lines=["IMEI"])
|
||
print(response_SARA_ATI)
|
||
|
||
# Check for SARA model with more robust regex
|
||
model = "Unknown"
|
||
if "SARA-R410M" in response_SARA_ATI:
|
||
model = "SARA-R410M"
|
||
print("📱 Detected SARA R4 modem")
|
||
elif "SARA-R500" in response_SARA_ATI:
|
||
model = "SARA-R500"
|
||
print("📱 Detected SARA R5 modem")
|
||
sara_r5_DPD_setup = True
|
||
else:
|
||
# Fallback to regex match if direct string match fails
|
||
match = re.search(r"Model:\s*([A-Za-z0-9\-]+)", response_SARA_ATI)
|
||
if match:
|
||
model = match.group(1).strip()
|
||
else:
|
||
model = "Unknown"
|
||
print("⚠️ Could not identify modem model")
|
||
|
||
print(f"🔍 Model: {model}")
|
||
update_sqlite_config("modem_version", model)
|
||
time.sleep(1)
|
||
|
||
'''
|
||
AIRCARTO
|
||
'''
|
||
|
||
# 1. Set AIRCARTO URL
|
||
print('➡️Set aircarto URL')
|
||
aircarto_profile_id = 0
|
||
aircarto_url="data.moduleair.fr"
|
||
command = f'AT+UHTTP={aircarto_profile_id},1,"{aircarto_url}"\r'
|
||
ser_sara.write(command.encode('utf-8'))
|
||
response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||
print(response_SARA_1)
|
||
time.sleep(1)
|
||
|
||
'''
|
||
uSpot
|
||
'''
|
||
|
||
#2. Set uSpot URL
|
||
print('➡️Set uSpot URL')
|
||
uSpot_profile_id = 1
|
||
uSpot_url="api-prod.uspot.probesys.net"
|
||
command = f'AT+UHTTP={uSpot_profile_id},1,"{uSpot_url}"\r'
|
||
ser_sara.write(command.encode('utf-8'))
|
||
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||
print(response_SARA_2)
|
||
time.sleep(1)
|
||
|
||
print("set port 81")
|
||
command = f'AT+UHTTP={uSpot_profile_id},5,81\r'
|
||
ser_sara.write((command + '\r').encode('utf-8'))
|
||
response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||
print(response_SARA_55)
|
||
time.sleep(1)
|
||
|
||
#3. Get localisation (CellLocate)
|
||
mode = 2
|
||
sensor = 2
|
||
response_type = 0
|
||
timeout_s = 2
|
||
accuracy_m = 1
|
||
command = f'AT+ULOC={mode},{sensor},{response_type},{timeout_s},{accuracy_m}\r'
|
||
ser_sara.write((command + '\r').encode('utf-8'))
|
||
response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["+UULOC"])
|
||
print(response_SARA_3)
|
||
|
||
match = re.search(r"\+UULOC: \d{2}/\d{2}/\d{4},\d{2}:\d{2}:\d{2}\.\d{3},([-+]?\d+\.\d+),([-+]?\d+\.\d+)", response_SARA_3)
|
||
if match:
|
||
latitude = match.group(1)
|
||
longitude = match.group(2)
|
||
print(f"📍 Latitude: {latitude}, Longitude: {longitude}")
|
||
else:
|
||
print("❌ Failed to extract coordinates.")
|
||
|
||
#update sqlite table
|
||
update_sqlite_config("latitude_raw", float(latitude))
|
||
update_sqlite_config("longitude_raw", float(longitude))
|
||
|
||
time.sleep(1)
|
||
|
||
except Exception as e:
|
||
print("An error occurred:", e)
|
||
traceback.print_exc() # This prints the full traceback |