Files
nebuleair_pro_4g/loop/SARA_send_data_v2.py
PaulVua dfba956685 update
2025-03-18 18:01:33 +01:00

1090 lines
47 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
____ _ ____ _ ____ _ ____ _
/ ___| / \ | _ \ / \ / ___| ___ _ __ __| | | _ \ __ _| |_ __ _
\___ \ / _ \ | |_) | / _ \ \___ \ / _ \ '_ \ / _` | | | | |/ _` | __/ _` |
___) / ___ \| _ < / ___ \ ___) | __/ | | | (_| | | |_| | (_| | || (_| |
|____/_/ \_\_| \_\/_/ \_\ |____/ \___|_| |_|\__,_| |____/ \__,_|\__\__,_|
Main loop to gather data from sensor inside SQLite database:
* NPM
* Envea
* I2C BME280
* Noise sensor
and send it to AirCarto servers via SARA R4 HTTP post requests
also send the timestamp (already stored inside the DB) !
/usr/bin/python3 /var/www/nebuleair_pro_4g/loop/SARA_send_data_v2.py
ATTENTION:
# This script is triggered every minutes by /var/www/nebuleair_pro_4g/master.py (as a service)
CSV PAYLOAD (AirCarto Servers)
Endpoint:
data.nebuleair.fr
/pro_4G/data.php?sensor_id={device_id}&timestamp={rtc_module_time}
ATTENTION : do not change order !
CSV size: 18
{PM1},{PM25},{PM10},{temp},{hum},{press},{avg_noise},{max_noise},{min_noise},{envea_no2},{envea_h2s},{envea_o3},{4g_signal_quality}
0 -> PM1 (μg/m3)
1 -> PM25 (μg/m3)
2 -> PM10 (μg/m3)
3 -> temp
4 -> hum
5 -> press
6 -> avg_noise
7 -> max_noise
8 -> min_noise
9 -> envea_no2
10 -> envea_h2s
11 -> envea_nh3
12 -> 4G signal quality,
13 -> PM 0.2μm to 0.5μm quantity (Nb/L)
14 -> PM 0.5μm to 1.0μm quantity (Nb/L)
15 -> PM 1.0μm to 2.5μm quantity (Nb/L)
16 -> PM 2.5μm to 5.0μm quantity (Nb/L)
17 -> PM 5.0μm to 10μm quantity (Nb/L)
18 -> NPM temp inside
19 -> NPM hum inside
20 -> battery_voltage
21 -> battery_current
22 -> solar_voltage
23 -> solar_power
24 -> charger_status
JSON PAYLOAD (Micro-Spot Servers)
Same as NebuleAir wifi
Endpoint:
api-prod.uspot.probesys.net
nebuleair?token=2AFF6dQk68daFZ
port 443
{"nebuleairid": "82D25549434",
"software_version": "ModuleAirV2-V1-042022",
"sensordatavalues":
[
{"value_type":"NPM_P0","value":"1.54"},
{"value_type":"NPM_P1","value":"1.54"},
{"value_type":"NPM_P2","value":"1.54"},
{"value_type":"NPM_N1","value":"0.02"},
{"value_type":"NPM_N10","value":"0.02"},
{"value_type":"NPM_N25","value":"0.02"},
{"value_type":"MHZ16_CO2","value":"793.00"},
{"value_type":"SGP40_VOC","value":"29915.00"},
{"value_type":"samples","value":"134400"},
{"value_type":"min_micro","value":"137"},
{"value_type":"max_micro","value":"155030"},
{"value_type":"interval","value":"145000"},
{"value_type":"signal","value":"-80"},
{"value_type":"latitude","value":"43.2964"},
{"value_type":"longitude","value":"5.36978"},
{"value_type":"state_npm","value":"State: 00000000"},
{"value_type":"BME280_temperature","value":"28.47"},
{"value_type":"BME280_humidity","value":"28.47"},
{"value_type":"BME280_pressure","value":"28.47"},
{"value_type":"CAIRSENS_NO2","value":"54"},
{"value_type":"CAIRSENS_H2S","value":"54"},
{"value_type":"CAIRSENS_O3","value":"54"}
]
}
"""
import board
import json
import serial
import time
import busio
import re
import os
import requests
import traceback
import threading
import sys
import sqlite3
import RPi.GPIO as GPIO
from threading import Thread
from datetime import datetime
# Record the start time of the script
start_time_script = time.time()
# Check system uptime
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
# Skip execution if uptime is less than 2 minutes (120 seconds)
if uptime_seconds < 120:
print(f"System just booted ({uptime_seconds:.2f} seconds uptime), skipping execution.")
sys.exit()
#Payload CSV to be sent to data.nebuleair.fr
payload_csv = [None] * 30
#Payload JSON to be sent to uSpot
payload_json = {
"nebuleairid": "XXX",
"software_version": "ModuleAirV2-V1-042022",
"sensordatavalues": [] # Empty list to start with
}
# SARA R4 UHTTPC profile IDs
aircarto_profile_id = 0
uSpot_profile_id = 1
# database connection
conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db")
cursor = conn.cursor()
_gpio_lock = threading.Lock() # Global lock for GPIO access
def blink_led(pin, blink_count, delay=1):
"""
Blink an LED on a specified GPIO pin.
Args:
pin (int): GPIO pin number (BCM mode) to which the LED is connected.
blink_count (int): Number of times the LED should blink.
delay (float): Time in seconds for the LED to stay ON or OFF (default is 1 second).
"""
with _gpio_lock:
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM) # Use BCM numbering
GPIO.setup(pin, GPIO.OUT) # Ensure pin is set as OUTPUT
try:
for _ in range(blink_count):
GPIO.output(pin, GPIO.HIGH) # Turn LED on
time.sleep(delay)
GPIO.output(pin, GPIO.LOW) # Turn LED off
time.sleep(delay)
finally:
GPIO.output(pin, GPIO.LOW) # Ensure LED is off
print(f"LED on GPIO {pin} turned OFF (cleanup avoided)")
#get data from config
def load_config(config_file):
try:
with open(config_file, 'r') as file:
config_data = json.load(file)
return config_data
except Exception as e:
print(f"Error loading config file: {e}")
return {}
#Fonction pour mettre à jour le JSON de configuration
def update_json_key(file_path, key, value):
"""
Updates a specific key in a JSON file with a new value.
:param file_path: Path to the JSON file.
:param key: The key to update in the JSON file.
:param value: The new value to assign to the key.
"""
try:
# Load the existing data
with open(file_path, "r") as file:
data = json.load(file)
# Check if the key exists in the JSON file
if key in data:
data[key] = value # Update the key with the new value
else:
print(f"Key '{key}' not found in the JSON file.")
return
# Write the updated data back to the file
with open(file_path, "w") as file:
json.dump(data, file, indent=2) # Use indent for pretty printing
print(f"💾updating '{key}' to '{value}'.")
except Exception as e:
print(f"Error updating the JSON file: {e}")
# Define the config file path
config_file = '/var/www/nebuleair_pro_4g/config.json'
# Load the configuration data
config = load_config(config_file)
device_latitude_raw = config.get('latitude_raw', 0)
device_longitude_raw = config.get('longitude_raw', 0)
baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4
device_id = config.get('deviceID', '').upper() #device ID en maj
bme_280_config = config.get('BME280/get_data_v2.py', False) #présence du BME280
envea_cairsens= config.get('envea/read_value_v2.py', False)
mppt_charger= config.get('MPPT/read.py', False)
wind_meter= config.get('windMeter/read.py', False)
send_aircarto = config.get('send_aircarto', True) #envoi sur AirCarto (data.nebuleair.fr)
send_uSpot = config.get('send_uSpot', False) #envoi sur MicroSpot ()
reset_uSpot_url = False
selected_networkID = int(config.get('SARA_R4_neworkID', 0))
npm_5channel = config.get('NextPM_5channels', False) #5 canaux du NPM
modem_version=config.get('modem_version', "")
modem_config_mode = config.get('modem_config_mode', False) #modem 4G en mode configuration
#update device id in the payload json
payload_json["nebuleairid"] = device_id
# Skip execution if modem_config_mode is true
if modem_config_mode:
print("Modem 4G (SARA R4) is in config mode -> EXIT")
sys.exit()
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=baudrate, #115200 ou 9600
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
'''
Fonction très importante !!!
Reads the complete response from a serial connection and waits for specific lines.
'''
if wait_for_lines is None:
wait_for_lines = [] # Default to an empty list if not provided
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
elapsed_time = time.time() - start_time # Time since function start
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout # Reset timeout on new data
# Decode and check for any target line
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
if debug:
print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)")
return decoded_response # Return response immediately if a target line is found
elif time.time() > end_time:
if debug:
print(f"[DEBUG] Timeout reached. No more data received.")
break
time.sleep(0.1) # Short sleep to prevent busy waiting
# Final response and debug output
total_elapsed_time = time.time() - start_time
if debug:
print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️")
# Check if the elapsed time exceeded 10 seconds
if total_elapsed_time > 10 and debug:
print(f"[ALERT] 🚨 The operation took too long 🚨")
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠</span>')
return response.decode('utf-8', errors='replace') # Return the full response if no target line is found
def extract_error_code(response):
"""
Extract just the error code from AT+UHTTPER response
"""
for line in response.split('\n'):
if '+UHTTPER' in line:
try:
# Split the line and get the third value (error code)
parts = line.split(':')[1].strip().split(',')
if len(parts) >= 3:
error_code = int(parts[2])
return error_code
except:
pass
# Return None if we couldn't find the error code
return None
try:
'''
_ ___ ___ ____
| | / _ \ / _ \| _ \
| | | | | | | | | |_) |
| |__| |_| | |_| | __/
|_____\___/ \___/|_|
'''
print('<h3>START LOOP</h3>')
print(f'Modem version: {modem_version}')
#Local timestamp
#ATTENTION:
# -> RTC module can be deconnected ""
# -> RTC module can be out of time like "2000-01-01T00:55:21Z"
print("Getting local timestamp")
cursor.execute("SELECT * FROM timestamp_table LIMIT 1")
row = cursor.fetchone() # Get the first (and only) row
rtc_time_str = row[1] # '2025-02-07 12:30:45' ou '2000-01-01 00:55:21' ou 'not connected'
print(rtc_time_str)
if rtc_time_str == 'not connected':
print("⛔ Atttention RTC module not connected⛔")
rtc_status = "disconnected"
influx_timestamp="rtc_disconnected"
else :
# Convert to a datetime object
dt_object = datetime.strptime(rtc_time_str, '%Y-%m-%d %H:%M:%S')
# Check if timestamp is reset (year 2000)
if dt_object.year == 2000:
print("⛔ Attention: RTC has been reset to default date ⛔")
rtc_status = "reset"
else:
print("✅ RTC timestamp is valid")
rtc_status = "valid"
# Always convert to InfluxDB format
# Convert to InfluxDB RFC3339 format with UTC 'Z' suffix
influx_timestamp = dt_object.strftime('%Y-%m-%dT%H:%M:%SZ')
rtc_status = "valid"
print(influx_timestamp)
#NEXTPM
# We take the last measures (order by rowid and not by timestamp)
print("Getting NPM values (last 6 measures)")
#cursor.execute("SELECT * FROM data_NPM ORDER BY timestamp DESC LIMIT 1")
#cursor.execute("SELECT * FROM data_NPM ORDER BY timestamp DESC LIMIT 6")
cursor.execute("SELECT rowid, * FROM data_NPM ORDER BY rowid DESC LIMIT 6")
rows = cursor.fetchall()
# Exclude the timestamp column (assuming first column is timestamp)
data_values = [row[2:] for row in rows] # Exclude timestamp
# Compute column-wise average
num_columns = len(data_values[0])
averages = [round(sum(col) / len(col),1) for col in zip(*data_values)]
PM1 = averages[0]
PM25 = averages[1]
PM10 = averages[2]
npm_temp = averages[3]
npm_hum = averages[4]
#Add data to payload CSV
payload_csv[0] = PM1
payload_csv[1] = PM25
payload_csv[2] = PM10
payload_csv[18] = npm_temp
payload_csv[19] = npm_hum
#Add data to payload JSON
payload_json["sensordatavalues"].append({"value_type": "NPM_P0", "value": str(PM1)})
payload_json["sensordatavalues"].append({"value_type": "NPM_P1", "value": str(PM10)})
payload_json["sensordatavalues"].append({"value_type": "NPM_P2", "value": str(PM25)})
#NextPM 5 channels
if npm_5channel:
print("Getting NextPM 5 channels values (last 6 measures)")
cursor.execute("SELECT * FROM data_NPM_5channels ORDER BY rowid DESC LIMIT 6")
rows = cursor.fetchall()
# Exclude the timestamp column (assuming first column is timestamp)
data_values = [row[1:] for row in rows] # Exclude timestamp
# Compute column-wise average
num_columns = len(data_values[0])
averages = [round(sum(col) / len(col)) for col in zip(*data_values)]
# Store averages in specific indices
payload_csv[13] = averages[0] # Channel 1
payload_csv[14] = averages[1] # Channel 2
payload_csv[15] = averages[2] # Channel 3
payload_csv[16] = averages[3] # Channel 4
payload_csv[17] = averages[4] # Channel 5
#BME280
if bme_280_config:
print("Getting BME280 values")
cursor.execute("SELECT * FROM data_BME280 ORDER BY rowid DESC LIMIT 1")
last_row = cursor.fetchone()
if last_row:
print("SQLite DB last available row:", last_row)
BME280_temperature = last_row[1]
BME280_humidity = last_row[2]
BME280_pressure = last_row[3]
#Add data to payload CSV
payload_csv[3] = BME280_temperature
payload_csv[4] = BME280_humidity
payload_csv[5] = BME280_pressure
#Add data to payload JSON
payload_json["sensordatavalues"].append({"value_type": "BME280_temperature", "value": str(BME280_temperature)})
payload_json["sensordatavalues"].append({"value_type": "BME280_humidity", "value": str(BME280_humidity)})
payload_json["sensordatavalues"].append({"value_type": "BME280_pressure", "value": str(BME280_pressure)})
else:
print("No data available in the database.")
#envea
if envea_cairsens:
print("Getting envea cairsens values")
cursor.execute("SELECT * FROM data_envea ORDER BY rowid DESC LIMIT 6")
rows = cursor.fetchall()
# Exclude the timestamp column (assuming first column is timestamp)
data_values = [row[1:] for row in rows] # Exclude timestamp
# Compute column-wise average, ignoring 0 values
averages = []
for col in zip(*data_values): # Iterate column-wise
filtered_values = [val for val in col if val != 0] # Remove zeros
if filtered_values:
avg = round(sum(filtered_values) / len(filtered_values)) # Compute average
else:
avg = 0 # If all values were zero, store 0
averages.append(avg)
# Store averages in specific indices
payload_csv[9] = averages[0] # envea_no2
payload_csv[10] = averages[1] # envea_h2s
payload_csv[11] = averages[2] # envea_nh3
#Add data to payload JSON
payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_NO2", "value": str(averages[0])})
payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_NO2", "value": str(averages[1])})
payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_NH3", "value": str(averages[2])})
#Wind meter
if wind_meter:
print("Getting wind meter values")
#MPPT charger
if mppt_charger:
print("Getting MPPT charger values")
cursor.execute("SELECT * FROM data_MPPT ORDER BY rowid DESC LIMIT 1")
last_row = cursor.fetchone()
if last_row:
print("SQLite DB last available row:", last_row)
battery_voltage = last_row[1]
battery_current = last_row[2]
solar_voltage = last_row[3]
solar_power = last_row[4]
charger_status = last_row[5]
#Add data to payload CSV
payload_csv[20] = battery_voltage
payload_csv[21] = battery_current
payload_csv[22] = solar_voltage
payload_csv[23] = solar_power
payload_csv[24] = charger_status
else:
print("No data available in the database.")
print("Verify SARA R4 connection")
# Getting the LTE Signal
print("Getting LTE signal")
ser_sara.write(b'AT+CSQ\r')
response2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
#Here it's possible that the SARA do not repond at all
#-> TO DO : harware reboot
#-> send notification
#-> end loop, no need to continue
if response2 is None or response2 == "" or not any(expected in response2 for expected in ["OK", "ERROR", "+", "AT"]):
print("No answer from SARA module")
print('🛑STOP LOOP🛑')
print("<hr>")
# Send notification
try:
# Format the URL with the device_id
alert_url = f'http://data.nebuleair.fr/pro_4G/alert.php?capteur_id={device_id}&error_type=serial_error'
# Send POST request with short timeout
response = requests.post(alert_url, timeout=3)
if response.status_code == 200:
print(f"Alert notification sent successfully")
else:
print(f"Alert notification failed with status code: {response.status_code}")
except Exception as e:
# Catch any exception and continue
print(f"Alert notification failed: {e}")
#end loop
sys.exit()
else :
print("✅SARA is connected over serial")
print('<p class="text-danger-emphasis">')
print(response2)
print("</p>", end="")
match = re.search(r'\+CSQ:\s*(\d+),', response2)
if match:
signal_quality = int(match.group(1))
payload_csv[12]=signal_quality
time.sleep(0.1)
# On vérifie si le signal n'est pas à 99 pour déconnexion
# si c'est le cas on essaie de se reconnecter
if signal_quality == 99:
print('<span style="color: red;font-weight: bold;">⚠ATTENTION: Signal Quality indicates no signal (99)⚠️</span>')
print("TRY TO RECONNECT:")
command = f'AT+COPS=1,2,{selected_networkID}\r'
#command = f'AT+COPS=0\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara, timeout=20, end_of_response_timeout=20)
print('<p class="text-danger-emphasis">')
print(responseReconnect)
print("</p>", end="")
print('🛑STOP LOOP🛑')
print("<hr>")
#on arrete le script pas besoin de continuer
sys.exit()
else:
print("Signal Quality:", signal_quality)
'''
SEND TO AIRCARTO
'''
print('➡️<p class="fw-bold">SEND TO AIRCARTO SERVERS</p>')
# Write Data to saraR4
# 1. Open sensordata_csv.json (with correct data size)
csv_string = ','.join(str(value) if value is not None else '' for value in payload_csv)
size_of_string = len(csv_string)
print("Open JSON:")
command = f'AT+UDWNFILE="sensordata_csv.json",{size_of_string}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=[">"], debug=False)
print(response_SARA_1)
time.sleep(1)
#2. Write to shell
print("Write data to memory:")
ser_sara.write(csv_string.encode())
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_2)
#3. Send to endpoint (with device ID)
print("Send data (POST REQUEST):")
command= f'AT+UHTTPC={aircarto_profile_id},4,"/pro_4G/data.php?sensor_id={device_id}&lat{device_latitude_raw}=&long={device_longitude_raw}&datetime={influx_timestamp}","aircarto_server_response.txt","sensordata_csv.json",4\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(response_SARA_3)
print("</p>", end="")
# si on recoit la réponse UHTTPCR
if "+UUHTTPCR" in response_SARA_3:
print("✅ Received +UUHTTPCR response.")
# Les types de réponse
# 1.La commande n'a pas fonctionné
# +CME ERROR: No connection to phone
# +CME ERROR: Operation not allowed
# 2.La commande fonctionne: elle renvoie un code
# +UUHTTPCR: <profile_id>,<http_command>,<http_result>
# <http_result>: 1 pour sucess et 0 pour fail
# +UUHTTPCR: 0,4,1 -> OK ✅
# +UUHTTPCR: 0,4,0 -> error ⛔
# Split response into lines
lines = response_SARA_3.strip().splitlines()
# 1.Vérifier si la réponse contient un message d'erreur CME
if "+CME ERROR" in lines[-1]:
print("*****")
print('<span style="color: red;font-weight: bold;">ATTENTION: CME ERROR</span>')
print("error:", lines[-1])
print("*****")
#update status
update_json_key(config_file, "SARA_R4_network_status", "disconnected")
# Gestion de l'erreur spécifique
if "No connection to phone" in lines[-1]:
print("No connection to the phone. Retrying or reset may be required.")
# Actions spécifiques pour ce type d'erreur (par exemple, réinitialiser ou tenter de reconnecter)
# need to reconnect to network
# and reset HTTP profile (AT+UHTTP=0) -> ne fonctionne pas..
# tester un reset avec CFUN 15
# 1.Reconnexion au réseau (AT+COPS)
command = f'AT+COPS=1,2,{selected_networkID}\r'
#command = f'AT+COPS=0\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara)
print("Response reconnect:")
print(responseReconnect)
print("End response reconnect")
elif "Operation not allowed" in lines[-1]:
print("Operation not allowed. This may require a different configuration.")
# Actions spécifiques pour ce type d'erreur
# Clignotement LED rouge en cas d'erreur
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
else:
# 2.Si la réponse contient une réponse UUHTTPCR
# Extract UUHTTPCR response code from the last line
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
parts = http_response.split(',')
# 2.1 code 0 (HTTP failed) ⛔⛔⛔
# -> GET error code
# -> reboot module
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
print("*****")
print('<span style="color: red;font-weight: bold;">⛔ATTENTION: HTTP operation failed</span>')
update_json_key(config_file, "SARA_R4_network_status", "disconnected")
print("*****")
print("Blink red LED")
# Run LED blinking in a separate thread
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
# Get error code
print("Getting error code")
command = f'AT+UHTTPER={aircarto_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_9 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-danger-emphasis">')
print(response_SARA_9)
print("</p>", end="")
# Extract just the error code
error_code = extract_error_code(response_SARA_9)
if error_code is not None:
# Display interpretation based on error code
if error_code == 0:
print('<p class="text-success">No error detected</p>')
elif error_code == 4:
print('<p class="text-danger">Error 4: Invalid server Hostname</p>')
elif error_code == 11:
print('<p class="text-danger">Error 11: Server connection error</p>')
elif error_code == 22:
print('<p class="text-danger">⚠Error 22: PSD or CSD connection not established (SARA-R5 need to reset PDP conection)⚠️</p>')
elif error_code == 73:
print('<p class="text-danger">Error 73: Secure socket connect error</p>')
else:
print(f'<p class="text-danger">Unknown error code: {error_code}</p>')
else:
print('<p class="text-danger">Could not extract error code from response</p>')
#Reboot du SARA R4
#ATTENTION: sur le SARA R5 la commande renvoie une erreur
print('<span style="color: orange;font-weight: bold;">🔄SARA reboot!🔄</span>')
command = f'AT+CFUN=15\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_9r = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=True)
print('<p class="text-danger-emphasis">')
print(response_SARA_9r)
print("</p>", end="")
reset_uSpot_url = True
print("Sleep 10 secs before continuing")
time.sleep(10)
#reset l'url AirCarto
print('<span style="color: orange;font-weight: bold;">🔧Resetting the HTTP Profile</span>')
command = f'AT+UHTTP={aircarto_profile_id},1,"data.nebuleair.fr"\r'
ser_sara.write(command.encode('utf-8'))
responseResetHTTP2_profile = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(responseResetHTTP2_profile)
print("</p>", end="")
#si on a un sara r5 il faut également reset la connection PDP
if modem_version == "SARA-R500":
print("Need to reset PDP connection")
# 2. Activate PDP context 1
print('Activate PDP context 1')
command = f'AT+CGACT=1,1\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_2dpd = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_2dpd, end="")
time.sleep(1)
# 2. Set the PDP type
print('Set the PDP type to IPv4 referring to the outputof the +CGDCONT read command')
command = f'AT+UPSD=0,0,0\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_31dpd = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_31dpd, end="")
time.sleep(1)
# 2. Profile #0 is mapped on CID=1.
print('Profile #0 is mapped on CID=1.')
command = f'AT+UPSD=0,100,1\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_32dpd = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_32dpd, end="")
time.sleep(1)
# 2. Set the PDP type
print('Activate the PSD profile #0: the IPv4 address is already assigned by the network.')
command = f'AT+UPSDA=0,3\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_33dpd = read_complete_response(ser_sara, wait_for_lines=["OK","+UUPSDA"])
print(response_SARA_33dpd, end="")
time.sleep(1)
# 2.2 code 1 (✅✅HHTP / UUHTTPCR succeded✅✅)
else:
print('<span style="font-weight: bold;">✅✅HTTP operation successful.</span>')
update_json_key(config_file, "SARA_R4_network_status", "connected")
print("Blink blue LED")
led_thread = Thread(target=blink_led, args=(23, 5, 0.5))
led_thread.start()
#4. Read reply from server
print("Reply from server:")
ser_sara.write(b'AT+URDFILE="aircarto_server_response.txt"\r')
response_SARA_4 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-success">')
print(response_SARA_4)
print("</p>", end="")
#Parse the server datetime
# Extract just the date from the response
date_string = None
date_start = response_SARA_4.find("Date: ")
if date_start != -1:
date_end = response_SARA_4.find("\n", date_start)
date_string = response_SARA_4[date_start + 6:date_end].strip()
print(f'<div class="text-primary">Server date: {date_string}</div>', end="")
# Optionally convert to datetime object
try:
from datetime import datetime
server_datetime = datetime.strptime(
date_string,
"%a, %d %b %Y %H:%M:%S %Z"
)
#print(f'<p class="text-primary">Parsed datetime: {server_datetime}</p>')
except Exception as e:
print(f'<p class="text-warning">Error parsing date: {e}</p>')
# Get RTC time from SQLite
cursor.execute("SELECT * FROM timestamp_table LIMIT 1")
row = cursor.fetchone()
rtc_time_str = row[1] # '2025-02-07 12:30:45' or '2000-01-01 00:55:21' or 'not connected'
print(f'<div class="text-primary">RTC time: {rtc_time_str}</div>', end="")
# Compare times if both are available
if server_datetime and rtc_time_str != 'not connected':
try:
# Convert RTC time string to datetime
rtc_datetime = datetime.strptime(rtc_time_str, '%Y-%m-%d %H:%M:%S')
# Calculate time difference in seconds
time_diff = abs((server_datetime - rtc_datetime).total_seconds())
print(f'<div class="text-primary">Time difference: {time_diff:.2f} seconds</div>', end="")
# Check if difference is more than 60 seconds
# and update the RTC clock
if time_diff > 60:
print(f'<div class="text-warning"><strong>⚠️ RTC time differs from server time by {time_diff:.2f} seconds!</strong></div>', end="")
# Format server time for RTC update
server_time_formatted = server_datetime.strftime('%Y-%m-%d %H:%M:%S')
#update RTC module do not wait for answer, non blocking
#/usr/bin/python3 /var/www/nebuleair_pro_4g/RTC/set_with_browserTime.py '2024-01-30 12:48:39'
# Launch RTC update script as non-blocking subprocess
import subprocess
update_command = [
"/usr/bin/python3",
"/var/www/nebuleair_pro_4g/RTC/set_with_browserTime.py",
server_time_formatted
]
# Execute the command without waiting for result
subprocess.Popen(update_command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
print(f'<div class="text-warning">➡️ Updating RTC with server time: {server_time_formatted}</div>', end="")
else:
print(f'<div class="text-success">✅ RTC time is synchronized with server time (within 60 seconds)</div>')
except Exception as e:
print(f'<p class="text-warning">Error comparing times: {e}</p>')
#Si non ne recoit pas de réponse UHTTPCR
#on a peut etre une ERROR de type "+CME ERROR: No connection to phone" ou "Operation not allowed"
else:
print('<span style="color: red;font-weight: bold;">No UUHTTPCR response</span>')
print("Blink red LED")
# Run LED blinking in a separate thread
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
#Vérification de l'erreur
print("Getting type of error")
# Split the response into lines and search for "+CME ERROR:"
lines2 = response_SARA_3.strip().splitlines()
for line in lines2:
if "+CME ERROR" in line:
error_message = line.split("+CME ERROR:")[1].strip()
print("*****")
print('<span style="color: red;font-weight: bold;">⚠ATTENTION: CME ERROR⚠</span>')
print(f"Error type: {error_message}")
print("*****")
# Handle "No connection to phone" error
if error_message == "No connection to phone":
print('<span style="color: orange;font-weight: bold;">📞Try reconnect to network📞</span>')
#IMPORTANT!
# Reconnexion au réseau (AT+COPS)
command = f'AT+COPS=1,2,{selected_networkID}\r'
#command = f'AT+COPS=0\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["OK", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(responseReconnect)
print("</p>", end="")
# Handle "Operation not allowed" error
if error_message == "Operation not allowed":
print('<span style="color: orange;font-weight: bold;">❓Try Resetting the HTTP Profile❓</span>')
command = f'AT+UHTTP={aircarto_profile_id},1,"data.nebuleair.fr"\r'
ser_sara.write(command.encode('utf-8'))
responseResetHTTP_profile = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(responseResetHTTP_profile)
print("</p>", end="")
check_lines = responseResetHTTP_profile.strip().splitlines()
for line in check_lines:
if "+CME ERROR: Operation not allowed" in line:
print('<span style="color: red;font-weight: bold;">⚠ATTENTION: CME ERROR⚠</span>')
print('<span style="color: orange;font-weight: bold;">❓Try Reboot the module❓</span>')
#5. empty json
print("Empty SARA memory:")
ser_sara.write(b'AT+UDELFILE="sensordata_csv.json"\r')
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_5)
'''
SEND TO uSPOT
'''
if send_uSpot:
print('➡️<p class="fw-bold">SEND TO uSPOT SERVERS</p>')
if reset_uSpot_url:
#2. Set uSpot URL (profile id = 1)
print('Set uSpot URL')
uSpot_profile_id = 1
uSpot_url="api-prod.uspot.probesys.net"
security_profile_id = 1
#step 1: import the certificate
print("****")
certificate_name = "e6"
with open("/var/www/nebuleair_pro_4g/SARA/SSL/certificate/e6.pem", "rb") as cert_file:
certificate = cert_file.read()
size_of_string = len(certificate)
print("\033[0;33m Import certificate\033[0m")
# AT+USECMNG=0,<type>,<internal_name>,<data_size>
# type-> 0 -> trusted root CA
command = f'AT+USECMNG=0,0,"{certificate_name}",{size_of_string}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara)
print(response_SARA_1)
time.sleep(0.5)
print("\033[0;33mAdd certificate\033[0m")
ser_sara.write(certificate)
response_SARA_2 = read_complete_response(ser_sara)
print(response_SARA_2)
time.sleep(0.5)
# SECURITY PROFILE
# op_code: 3 -> trusted root certificate internal name
print("\033[0;33mSet the security profile (choose cert)\033[0m")
command = f'AT+USECPRF={security_profile_id},3,"{certificate_name}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5c)
time.sleep(0.5)
#step 4: set url (op_code = 1)
command = f'AT+UHTTP={uSpot_profile_id},1,"{uSpot_url}"\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_2)
time.sleep(1)
#step 4: set PORT (op_code = 5)
print("set port 443")
command = f'AT+UHTTP={uSpot_profile_id},5,443\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_55)
time.sleep(1)
#step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2)
print("\033[0;33mSET SSL\033[0m")
http_secure = 1
command = f'AT+UHTTP={uSpot_profile_id},6,{http_secure},{security_profile_id}\r'
#command = f'AT+UHTTP={profile_id},6,{http_secure}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5)
time.sleep(1)
# 1. Open sensordata_json.json (with correct data size)
print("Open JSON:")
payload_string = json.dumps(payload_json) # Convert dict to JSON string
size_of_string = len(payload_string)
command = f'AT+UDWNFILE="sensordata_json.json",{size_of_string}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_6 = read_complete_response(ser_sara, wait_for_lines=[">"], debug=False)
print(response_SARA_6)
time.sleep(1)
#2. Write to shell
print("Write to memory:")
ser_sara.write(payload_string.encode())
response_SARA_7 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_7)
#step 4: trigger the request (http_command=1 for GET and http_command=1 for POST)
print("****")
print("Trigger POST REQUEST")
command = f'AT+UHTTPC={uSpot_profile_id},4,"/nebuleair?token=2AFF6dQk68daFZ","uSpot_server_response.txt","sensordata_json.json",4\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_8 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(response_SARA_8)
print("</p>", end="")
# si on recoit la réponse UHTTPCR
if "+UUHTTPCR" in response_SARA_8:
print("✅ Received +UUHTTPCR response.")
lines = response_SARA_8.strip().splitlines()
# 1.Vérifier si la réponse contient un message d'erreur CME
if "+CME ERROR" in lines[-1]:
print("*****")
print('<span style="color: red;font-weight: bold;">⛔ATTENTION: CME ERROR</span>')
print("error:", lines[-1])
print("*****")
#update status
#update_json_key(config_file, "SARA_R4_network_status", "disconnected")
# Gestion de l'erreur spécifique
if "No connection to phone" in lines[-1]:
print("No connection to the phone.")
elif "Operation not allowed" in lines[-1]:
print("Operation not allowed. This may require a different configuration.")
# Actions spécifiques pour ce type d'erreur
# Clignotement LED rouge en cas d'erreur
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
else:
# 2.Si la réponse contient une réponse HTTP valide
# Extract HTTP response code from the last line
# ATTENTION: lines[-1] renvoie l'avant dernière ligne et il peut y avoir un soucis avec le OK
# rechercher plutot
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
parts = http_response.split(',')
# 2.1 code 0 (HTTP failed)
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
print("*****")
print('<span style="color: red;font-weight: bold;">⛔ATTENTION: HTTP operation failed</span>')
update_json_key(config_file, "SARA_R4_network_status", "disconnected")
print("*****")
print("Blink red LED")
# Run LED blinking in a separate thread
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
# Get error code
print("Getting error code")
command = f'AT+UHTTPER={uSpot_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_9b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-danger-emphasis">')
print(response_SARA_9b)
print("</p>", end="")
# Extract just the error code
error_code = extract_error_code(response_SARA_9b)
if error_code is not None:
# Display interpretation based on error code
if error_code == 0:
print('<p class="text-success">No error detected</p>')
elif error_code == 4:
print('<p class="text-danger">Error 4: Invalid server Hostname</p>')
elif error_code == 11:
print('<p class="text-danger">Error 11: Server connection error</p>')
elif error_code == 22:
print('<p class="text-danger">Error 22: PSD or CSD connection not established</p>')
elif error_code == 73:
print('<p class="text-danger">Error 73: Secure socket connect error</p>')
else:
print(f'<p class="text-danger">Unknown error code: {error_code}</p>')
else:
print('<p class="text-danger">Could not extract error code from response</p>')
#Pas forcément un moyen de résoudre le soucis
# 2.2 code 1 (HHTP succeded)
else:
# Si la commande HTTP a réussi
print('<span style="font-weight: bold;">✅✅HTTP operation successful.</span>')
update_json_key(config_file, "SARA_R4_network_status", "connected")
print("Blink blue LED")
led_thread = Thread(target=blink_led, args=(23, 5, 0.5))
led_thread.start()
#4. Read reply from server
print("Reply from server:")
ser_sara.write(b'AT+URDFILE="uSpot_server_response.txt"\r')
response_SARA_4b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-success">')
print(response_SARA_4b)
print("</p>", end="")
#5. empty json
print("Empty SARA memory:")
ser_sara.write(b'AT+UDELFILE="sensordata_json.json"\r')
response_SARA_9t = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_9t)
# Calculate and print the elapsed time
elapsed_time = time.time() - start_time_script
print(f"Elapsed time: {elapsed_time:.2f} seconds")
print("<hr>")
except Exception as e:
print("An error occurred:", e)
traceback.print_exc() # This prints the full traceback