""" ____ _ ____ _ ____ _ ____ _ / ___| / \ | _ \ / \ / ___| ___ _ __ __| | | _ \ __ _| |_ __ _ \___ \ / _ \ | |_) | / _ \ \___ \ / _ \ '_ \ / _` | | | | |/ _` | __/ _` | ___) / ___ \| _ < / ___ \ ___) | __/ | | | (_| | | |_| | (_| | || (_| | |____/_/ \_\_| \_\/_/ \_\ |____/ \___|_| |_|\__,_| |____/ \__,_|\__\__,_| Main loop to gather data from sensor inside SQLite database: * NPM * Envea * I2C BME280 * Noise sensor and send it to AirCarto servers via SARA R4 HTTP post requests also send the timestamp (already stored inside the DB) ! /usr/bin/python3 /var/www/nebuleair_pro_4g/loop/SARA_send_data_v2.py ATTENTION: # This script is triggered every minutes by /var/www/nebuleair_pro_4g/master.py (as a service) CSV PAYLOAD (AirCarto Servers) Endpoint: data.nebuleair.fr /pro_4G/data.php?sensor_id={device_id}×tamp={rtc_module_time} ATTENTION : do not change order ! CSV size: 18 {PM1},{PM25},{PM10},{temp},{hum},{press},{avg_noise},{max_noise},{min_noise},{envea_no2},{envea_h2s},{envea_o3},{4g_signal_quality} 0 -> PM1 (μg/m3) 1 -> PM25 (μg/m3) 2 -> PM10 (μg/m3) 3 -> temp 4 -> hum 5 -> press 6 -> avg_noise 7 -> max_noise 8 -> min_noise 9 -> envea_no2 10 -> envea_h2s 11 -> envea_nh3 12 -> 4G signal quality, 13 -> PM 0.2μm to 0.5μm quantity (Nb/L) 14 -> PM 0.5μm to 1.0μm quantity (Nb/L) 15 -> PM 1.0μm to 2.5μm quantity (Nb/L) 16 -> PM 2.5μm to 5.0μm quantity (Nb/L) 17 -> PM 5.0μm to 10μm quantity (Nb/L) 18 -> NPM temp inside 19 -> NPM hum inside JSON PAYLOAD (Micro-Spot Servers) Same as NebuleAir wifi Endpoint: api-prod.uspot.probesys.net nebuleair?token=2AFF6dQk68daFZ port 443 {"nebuleairid": "82D25549434", "software_version": "ModuleAirV2-V1-042022", "sensordatavalues": [ {"value_type":"NPM_P0","value":"1.54"}, {"value_type":"NPM_P1","value":"1.54"}, {"value_type":"NPM_P2","value":"1.54"}, {"value_type":"NPM_N1","value":"0.02"}, {"value_type":"NPM_N10","value":"0.02"}, {"value_type":"NPM_N25","value":"0.02"}, {"value_type":"MHZ16_CO2","value":"793.00"}, {"value_type":"SGP40_VOC","value":"29915.00"}, {"value_type":"samples","value":"134400"}, {"value_type":"min_micro","value":"137"}, {"value_type":"max_micro","value":"155030"}, {"value_type":"interval","value":"145000"}, {"value_type":"signal","value":"-80"}, {"value_type":"latitude","value":"43.2964"}, {"value_type":"longitude","value":"5.36978"}, {"value_type":"state_npm","value":"State: 00000000"}, {"value_type":"BME280_temperature","value":"28.47"}, {"value_type":"BME280_humidity","value":"28.47"}, {"value_type":"BME280_pressure","value":"28.47"}, {"value_type":"CAIRSENS_NO2","value":"54"}, {"value_type":"CAIRSENS_H2S","value":"54"}, {"value_type":"CAIRSENS_O3","value":"54"} ] } """ import board import json import serial import time import busio import re import os import traceback import threading import sys import sqlite3 import RPi.GPIO as GPIO from threading import Thread from datetime import datetime # Record the start time of the script start_time_script = time.time() # Check system uptime with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) # Skip execution if uptime is less than 2 minutes (120 seconds) if uptime_seconds < 120: print(f"System just booted ({uptime_seconds:.2f} seconds uptime), skipping execution.") sys.exit() #Payload CSV to be sent to data.nebuleair.fr payload_csv = [None] * 25 #Payload JSON to be sent to uSpot payload_json = { "nebuleairid": "XXX", "software_version": "ModuleAirV2-V1-042022", "sensordatavalues": [] # Empty list to start with } # SARA R4 UHTTPC profile IDs aircarto_profile_id = 0 uSpot_profile_id = 1 # database connection conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() _gpio_lock = threading.Lock() # Global lock for GPIO access def blink_led(pin, blink_count, delay=1): """ Blink an LED on a specified GPIO pin. Args: pin (int): GPIO pin number (BCM mode) to which the LED is connected. blink_count (int): Number of times the LED should blink. delay (float): Time in seconds for the LED to stay ON or OFF (default is 1 second). """ with _gpio_lock: GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) # Use BCM numbering GPIO.setup(pin, GPIO.OUT) # Ensure pin is set as OUTPUT try: for _ in range(blink_count): GPIO.output(pin, GPIO.HIGH) # Turn LED on time.sleep(delay) GPIO.output(pin, GPIO.LOW) # Turn LED off time.sleep(delay) finally: GPIO.output(pin, GPIO.LOW) # Ensure LED is off print(f"LED on GPIO {pin} turned OFF (cleanup avoided)") #get data from config def load_config(config_file): try: with open(config_file, 'r') as file: config_data = json.load(file) return config_data except Exception as e: print(f"Error loading config file: {e}") return {} #Fonction pour mettre à jour le JSON de configuration def update_json_key(file_path, key, value): """ Updates a specific key in a JSON file with a new value. :param file_path: Path to the JSON file. :param key: The key to update in the JSON file. :param value: The new value to assign to the key. """ try: # Load the existing data with open(file_path, "r") as file: data = json.load(file) # Check if the key exists in the JSON file if key in data: data[key] = value # Update the key with the new value else: print(f"Key '{key}' not found in the JSON file.") return # Write the updated data back to the file with open(file_path, "w") as file: json.dump(data, file, indent=2) # Use indent for pretty printing print(f"💾updating '{key}' to '{value}'.") except Exception as e: print(f"Error updating the JSON file: {e}") # Define the config file path config_file = '/var/www/nebuleair_pro_4g/config.json' # Load the configuration data config = load_config(config_file) device_latitude_raw = config.get('latitude_raw', 0) device_longitude_raw = config.get('longitude_raw', 0) baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4 device_id = config.get('deviceID', '').upper() #device ID en maj bme_280_config = config.get('BME280/get_data_v2.py', False) #présence du BME280 envea_cairsens= config.get('envea/read_value_v2.py', False) send_aircarto = config.get('send_aircarto', True) #envoi sur AirCarto (data.nebuleair.fr) send_uSpot = config.get('send_uSpot', False) #envoi sur MicroSpot () selected_networkID = config.get('SARA_R4_neworkID', '') npm_5channel = config.get('NextPM_5channels', False) #5 canaux du NPM modem_config_mode = config.get('modem_config_mode', False) #modem 4G en mode configuration #update device id in the payload json payload_json["nebuleairid"] = device_id # Skip execution if modem_config_mode is true if modem_config_mode: print("Modem 4G (SARA R4) is in config mode -> EXIT") sys.exit() ser_sara = serial.Serial( port='/dev/ttyAMA2', baudrate=baudrate, #115200 ou 9600 parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 2 ) def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True): ''' Fonction très importante !!! Reads the complete response from a serial connection and waits for specific lines. ''' if wait_for_lines is None: wait_for_lines = [] # Default to an empty list if not provided response = bytearray() serial_connection.timeout = timeout end_time = time.time() + end_of_response_timeout start_time = time.time() while True: elapsed_time = time.time() - start_time # Time since function start if serial_connection.in_waiting > 0: data = serial_connection.read(serial_connection.in_waiting) response.extend(data) end_time = time.time() + end_of_response_timeout # Reset timeout on new data # Decode and check for any target line decoded_response = response.decode('utf-8', errors='replace') for target_line in wait_for_lines: if target_line in decoded_response: if debug: print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)") return decoded_response # Return response immediately if a target line is found elif time.time() > end_time: if debug: print(f"[DEBUG] Timeout reached. No more data received.") break time.sleep(0.1) # Short sleep to prevent busy waiting # Final response and debug output total_elapsed_time = time.time() - start_time if debug: print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️") # Check if the elapsed time exceeded 10 seconds if total_elapsed_time > 10 and debug: print(f"[ALERT] 🚨 The operation took too long 🚨") print(f'[ALERT] ⚠️{total_elapsed_time:.2f}s⚠️') return response.decode('utf-8', errors='replace') # Return the full response if no target line is found try: ''' _ ___ ___ ____ | | / _ \ / _ \| _ \ | | | | | | | | | |_) | | |__| |_| | |_| | __/ |_____\___/ \___/|_| ''' print('

START LOOP

') #Local timestamp print("➡️Getting local timestamp") cursor.execute("SELECT * FROM timestamp_table LIMIT 1") row = cursor.fetchone() # Get the first (and only) row rtc_time_str = row[1] # '2025-02-07 12:30:45' # Convert to a datetime object dt_object = datetime.strptime(rtc_time_str, '%Y-%m-%d %H:%M:%S') # Convert to InfluxDB RFC3339 format with UTC 'Z' suffix influx_timestamp = dt_object.strftime('%Y-%m-%dT%H:%M:%SZ') print(influx_timestamp) #NEXTPM print("➡️Getting NPM values (last 6 measures)") #cursor.execute("SELECT * FROM data_NPM ORDER BY timestamp DESC LIMIT 1") cursor.execute("SELECT * FROM data_NPM ORDER BY timestamp DESC LIMIT 6") rows = cursor.fetchall() # Exclude the timestamp column (assuming first column is timestamp) data_values = [row[1:] for row in rows] # Exclude timestamp # Compute column-wise average num_columns = len(data_values[0]) averages = [round(sum(col) / len(col),1) for col in zip(*data_values)] PM1 = averages[0] PM25 = averages[1] PM10 = averages[2] npm_temp = averages[3] npm_hum = averages[4] #Add data to payload CSV payload_csv[0] = PM1 payload_csv[1] = PM25 payload_csv[2] = PM10 payload_csv[18] = npm_temp payload_csv[19] = npm_hum #Add data to payload JSON payload_json["sensordatavalues"].append({"value_type": "NPM_P0", "value": str(PM1)}) payload_json["sensordatavalues"].append({"value_type": "NPM_P1", "value": str(PM10)}) payload_json["sensordatavalues"].append({"value_type": "NPM_P2", "value": str(PM25)}) #NextPM 5 channels if npm_5channel: print("➡️Getting NextPM 5 channels values (last 6 measures)") cursor.execute("SELECT * FROM data_NPM_5channels ORDER BY timestamp DESC LIMIT 6") rows = cursor.fetchall() # Exclude the timestamp column (assuming first column is timestamp) data_values = [row[1:] for row in rows] # Exclude timestamp # Compute column-wise average num_columns = len(data_values[0]) averages = [round(sum(col) / len(col)) for col in zip(*data_values)] # Store averages in specific indices payload_csv[13] = averages[0] # Channel 1 payload_csv[14] = averages[1] # Channel 2 payload_csv[15] = averages[2] # Channel 3 payload_csv[16] = averages[3] # Channel 4 payload_csv[17] = averages[4] # Channel 5 #BME280 if bme_280_config: print("➡️Getting BME280 values") cursor.execute("SELECT * FROM data_BME280 ORDER BY timestamp DESC LIMIT 1") last_row = cursor.fetchone() if last_row: print("SQLite DB last available row:", last_row) BME280_temperature = last_row[1] BME280_humidity = last_row[2] BME280_pressure = last_row[3] #Add data to payload CSV payload_csv[3] = BME280_temperature payload_csv[4] = BME280_humidity payload_csv[5] = BME280_pressure #Add data to payload JSON payload_json["sensordatavalues"].append({"value_type": "BME280_temperature", "value": str(BME280_temperature)}) payload_json["sensordatavalues"].append({"value_type": "BME280_humidity", "value": str(BME280_humidity)}) payload_json["sensordatavalues"].append({"value_type": "BME280_pressure", "value": str(BME280_pressure)}) else: print("No data available in the database.") #envea if envea_cairsens: print("➡️Getting envea cairsens values") cursor.execute("SELECT * FROM data_envea ORDER BY timestamp DESC LIMIT 6") rows = cursor.fetchall() # Exclude the timestamp column (assuming first column is timestamp) data_values = [row[1:] for row in rows] # Exclude timestamp # Compute column-wise average, ignoring 0 values averages = [] for col in zip(*data_values): # Iterate column-wise filtered_values = [val for val in col if val != 0] # Remove zeros if filtered_values: avg = round(sum(filtered_values) / len(filtered_values)) # Compute average else: avg = 0 # If all values were zero, store 0 averages.append(avg) # Store averages in specific indices payload_csv[9] = averages[0] # envea_no2 payload_csv[10] = averages[1] # envea_h2s payload_csv[11] = averages[2] # envea_nh3 #Add data to payload JSON payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_NO2", "value": str(averages[0])}) payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_NO2", "value": str(averages[1])}) payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_NH3", "value": str(averages[2])}) print("Verify SARA R4 connection") # Getting the LTE Signal print("-> Getting LTE signal <-") ser_sara.write(b'AT+CSQ\r') response2 = read_complete_response(ser_sara, wait_for_lines=["OK"]) print('

') print(response2) print("

") match = re.search(r'\+CSQ:\s*(\d+),', response2) if match: signal_quality = int(match.group(1)) payload_csv[12]=signal_quality time.sleep(0.1) # On vérifie si le signal n'est pas à 99 pour déconnexion # si c'est le cas on essaie de se reconnecter if signal_quality == 99: print('⚠️ATTENTION: Signal Quality indicates no signal (99)⚠️') print("TRY TO RECONNECT:") command = f'AT+COPS=1,2,"{selected_networkID}"\r' ser_sara.write(command.encode('utf-8')) responseReconnect = read_complete_response(ser_sara, timeout=20, end_of_response_timeout=20) print('

') print(responseReconnect) print("

") print('🛑STOP LOOP🛑') print("
") #on arrete le script pas besoin de continuer sys.exit() else: print("Signal Quality:", signal_quality) ''' SEND TO AIRCARTO ''' print('➡️

SEND TO AIRCARTO SERVERS

') # Write Data to saraR4 # 1. Open sensordata_csv.json (with correct data size) csv_string = ','.join(str(value) if value is not None else '' for value in payload_csv) size_of_string = len(csv_string) print("Open JSON:") command = f'AT+UDWNFILE="sensordata_csv.json",{size_of_string}\r' ser_sara.write(command.encode('utf-8')) response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=[">"], debug=False) print(response_SARA_1) time.sleep(1) #2. Write to shell print("Write data to memory:") ser_sara.write(csv_string.encode()) response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print(response_SARA_2) #3. Send to endpoint (with device ID) print("Send data (POST REQUEST):") command= f'AT+UHTTPC={aircarto_profile_id},4,"/pro_4G/data.php?sensor_id={device_id}&lat{device_latitude_raw}=&long={device_longitude_raw}&datetime={influx_timestamp}","aircarto_server_response.txt","sensordata_csv.json",4\r' ser_sara.write(command.encode('utf-8')) response_SARA_3 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True) print('

') print(response_SARA_3) print("

") # si on recoit la réponse UHTTPCR if "+UUHTTPCR" in response_SARA_3: print("✅ Received +UUHTTPCR response.") # Les types de réponse # 1.La commande n'a pas fonctionné # +CME ERROR: No connection to phone # +CME ERROR: Operation not allowed # 2.La commande fonctionne: elle renvoie un code # +UUHTTPCR: ,, # : 1 pour sucess et 0 pour fail # +UUHTTPCR: 0,4,1 -> OK ✅ # +UUHTTPCR: 0,4,0 -> error ⛔ # Split response into lines lines = response_SARA_3.strip().splitlines() # 1.Vérifier si la réponse contient un message d'erreur CME if "+CME ERROR" in lines[-1]: print("*****") print('ATTENTION: CME ERROR') print("error:", lines[-1]) print("*****") #update status update_json_key(config_file, "SARA_R4_network_status", "disconnected") # Gestion de l'erreur spécifique if "No connection to phone" in lines[-1]: print("No connection to the phone. Retrying or reset may be required.") # Actions spécifiques pour ce type d'erreur (par exemple, réinitialiser ou tenter de reconnecter) # need to reconnect to network # and reset HTTP profile (AT+UHTTP=0) -> ne fonctionne pas.. # tester un reset avec CFUN 15 # 1.Reconnexion au réseau (AT+COPS) command = f'AT+COPS=1,2,"{selected_networkID}"\r' ser_sara.write(command.encode('utf-8')) responseReconnect = read_complete_response(ser_sara) print("Response reconnect:") print(responseReconnect) print("End response reconnect") elif "Operation not allowed" in lines[-1]: print("Operation not allowed. This may require a different configuration.") # Actions spécifiques pour ce type d'erreur # Clignotement LED rouge en cas d'erreur led_thread = Thread(target=blink_led, args=(24, 5, 0.5)) led_thread.start() else: # 2.Si la réponse contient une réponse HTTP valide # Extract HTTP response code from the last line # ATTENTION: lines[-1] renvoie l'avant dernière ligne et il peut y avoir un soucis avec le OK # rechercher plutot http_response = lines[-1] # "+UUHTTPCR: 0,4,0" parts = http_response.split(',') # 2.1 code 0 (HTTP failed) ⛔⛔⛔ if len(parts) == 3 and parts[-1] == '0': # The third value indicates success print("*****") print('⛔ATTENTION: HTTP operation failed') update_json_key(config_file, "SARA_R4_network_status", "disconnected") print("*****") print("Blink red LED") # Run LED blinking in a separate thread led_thread = Thread(target=blink_led, args=(24, 5, 0.5)) led_thread.start() # Get error code print("Getting error code (11->Server connection error, 73->Secure socket connect error)") command = f'AT+UHTTPER={aircarto_profile_id}\r' ser_sara.write(command.encode('utf-8')) response_SARA_9 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print('

') print(response_SARA_9) print("

") ''' +UHTTPER: profile_id,error_class,error_code error_class 0 OK, no error 3 HTTP Protocol error class 10 Wrong HTTP API USAGE error_code (for error_class 3 and 10) 0 No error 4 Invalid server Hostname 11 Server connection error 73 Secure socket connect error ''' #Essayer un reboot du SARA R4 #print("🔄SARA reboot!🔄") #command = f'AT+CFUN=15\r' #ser_sara.write(command.encode('utf-8')) #response_SARA_9r = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) #print('

') #print(response_SARA_9r) #print("

") # 2.2 code 1 (HHTP succeded) else: # Si la commande HTTP a réussi print('✅✅HTTP operation successful.') update_json_key(config_file, "SARA_R4_network_status", "connected") print("Blink blue LED") led_thread = Thread(target=blink_led, args=(23, 5, 0.5)) led_thread.start() #4. Read reply from server print("Reply from server:") ser_sara.write(b'AT+URDFILE="aircarto_server_response.txt"\r') response_SARA_4 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print('

') print(response_SARA_4) print('

') #Si non ne recoit pas de réponse UHTTPCR #on a peut etre une ERROR de type "+CME ERROR: No connection to phone" else: print('No UUHTTPCR response') print("Blink red LED") # Run LED blinking in a separate thread led_thread = Thread(target=blink_led, args=(24, 5, 0.5)) led_thread.start() #Vérification de l'erreur print("Getting type of error") # Split the response into lines and search for "+CME ERROR:" lines2 = response_SARA_3.strip().splitlines() for line in lines2: if "+CME ERROR" in line: error_message = line.split("+CME ERROR:")[1].strip() print("*****") print('⚠️ATTENTION: CME ERROR⚠️') print(f"Error type: {error_message}") print("*****") # Handle "No connection to phone" error if error_message == "No connection to phone": print('📞Try reconnect to network📞') #IMPORTANT! # Reconnexion au réseau (AT+COPS) #command = f'AT+COPS=1,2,{selected_networkID}\r' command = f'AT+COPS=0\r' ser_sara.write(command.encode('utf-8')) responseReconnect = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["OK", "+CME ERROR"], debug=True) print('

') print(responseReconnect) print("

") # Handle "Operation not allowed" error if error_message == "Operation not allowed": print('❓Try Resetting the HTTP Profile❓') command = f'AT+UHTTP={aircarto_profile_id},1,"data.nebuleair.fr"\r' ser_sara.write(command.encode('utf-8')) responseResetHTTP_profile = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR"], debug=True) print('

') print(responseResetHTTP_profile) print("

") #5. empty json print("Empty SARA memory:") ser_sara.write(b'AT+UDELFILE="sensordata_csv.json"\r') response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print(response_SARA_5) ''' SEND TO uSPOT ''' if send_uSpot: print('➡️

SEND TO uSPOT SERVERS

') # 1. Open sensordata_json.json (with correct data size) print("Open JSON:") payload_string = json.dumps(payload_json) # Convert dict to JSON string size_of_string = len(payload_string) command = f'AT+UDWNFILE="sensordata_json.json",{size_of_string}\r' ser_sara.write((command + '\r').encode('utf-8')) response_SARA_6 = read_complete_response(ser_sara, wait_for_lines=[">"], debug=False) print(response_SARA_6) time.sleep(1) #2. Write to shell print("Write to memory:") ser_sara.write(payload_string.encode()) response_SARA_7 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print(response_SARA_7) #step 4: trigger the request (http_command=1 for GET and http_command=1 for POST) print("****") print("Trigger POST REQUEST") command = f'AT+UHTTPC={uSpot_profile_id},4,"/nebuleair?token=2AFF6dQk68daFZ","uSpot_server_response.txt","sensordata_json.json",4\r' ser_sara.write(command.encode('utf-8')) response_SARA_8 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True) print('

') print(response_SARA_8) print("

") # si on recoit la réponse UHTTPCR if "+UUHTTPCR" in response_SARA_8: print("✅ Received +UUHTTPCR response.") lines = response_SARA_8.strip().splitlines() # 1.Vérifier si la réponse contient un message d'erreur CME if "+CME ERROR" in lines[-1]: print("*****") print('⛔ATTENTION: CME ERROR') print("error:", lines[-1]) print("*****") #update status #update_json_key(config_file, "SARA_R4_network_status", "disconnected") # Gestion de l'erreur spécifique if "No connection to phone" in lines[-1]: print("No connection to the phone.") elif "Operation not allowed" in lines[-1]: print("Operation not allowed. This may require a different configuration.") # Actions spécifiques pour ce type d'erreur # Clignotement LED rouge en cas d'erreur led_thread = Thread(target=blink_led, args=(24, 5, 0.5)) led_thread.start() else: # 2.Si la réponse contient une réponse HTTP valide # Extract HTTP response code from the last line # ATTENTION: lines[-1] renvoie l'avant dernière ligne et il peut y avoir un soucis avec le OK # rechercher plutot http_response = lines[-1] # "+UUHTTPCR: 0,4,0" parts = http_response.split(',') # 2.1 code 0 (HTTP failed) if len(parts) == 3 and parts[-1] == '0': # The third value indicates success print("*****") print('⛔ATTENTION: HTTP operation failed') update_json_key(config_file, "SARA_R4_network_status", "disconnected") print("*****") print("Blink red LED") # Run LED blinking in a separate thread led_thread = Thread(target=blink_led, args=(24, 5, 0.5)) led_thread.start() # Get error code print("Getting error code (4-> Invalid server Hostname, 11->Server connection error, 73->Secure socket connect error)") command = f'AT+UHTTPER={uSpot_profile_id}\r' ser_sara.write(command.encode('utf-8')) response_SARA_9b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print('

') print(response_SARA_9b) print("

") ''' +UHTTPER: profile_id,error_class,error_code error_class 0 OK, no error 3 HTTP Protocol error class 10 Wrong HTTP API USAGE error_code (for error_class 3) 0 No error 4 Invalid server Hostname 11 Server connection error 73 Secure socket connect error ''' #Pas forcément un moyen de résoudre le soucis # 2.2 code 1 (HHTP succeded) else: # Si la commande HTTP a réussi print('✅✅HTTP operation successful.') update_json_key(config_file, "SARA_R4_network_status", "connected") print("Blink blue LED") led_thread = Thread(target=blink_led, args=(23, 5, 0.5)) led_thread.start() #4. Read reply from server print("Reply from server:") ser_sara.write(b'AT+URDFILE="uSpot_server_response.txt"\r') response_SARA_4b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print('

') print(response_SARA_4b) print('

') #5. empty json print("Empty SARA memory:") ser_sara.write(b'AT+UDELFILE="sensordata_json.json"\r') response_SARA_9t = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print(response_SARA_9t) # Calculate and print the elapsed time elapsed_time = time.time() - start_time_script print(f"Elapsed time: {elapsed_time:.2f} seconds") print("
") except Exception as e: print("An error occurred:", e) traceback.print_exc() # This prints the full traceback