This commit is contained in:
root
2025-02-20 15:07:49 +01:00
commit 59133628af
228 changed files with 282657 additions and 0 deletions

702
loop/1_NPM/send_data.py Normal file
View File

@@ -0,0 +1,702 @@
"""
Main loop to gather data from sensor:
* NPM
* Envea
* I2C BME280
* Noise sensor
and send it to AirCarto servers via SARA R4 HTTP post requests
CSV PAYLOAD (AirCarto Servers)
Endpoint:
data.nebuleair.fr
/pro_4G/data.php?sensor_id={device_id}
ATTENTION : do not change order !
CSV size: 18
{PM1},{PM25},{PM10},{temp},{hum},{press},{avg_noise},{max_noise},{min_noise},{envea_no2},{envea_h2s},{envea_o3},{4g_signal_quality}
0 -> PM1 (μg/m3)
1 -> PM25 (μg/m3)
2 -> PM10 (μg/m3)
3 -> temp
4 -> hum
5 -> press
6 -> avg_noise
7 -> max_noise
8 -> min_noise
9 -> envea_no2
10 -> envea_h2s
11 -> envea_o3
12 -> 4G signal quality,
13 -> PM 0.2μm to 0.5μm quantity (Nb/L)
14 -> PM 0.5μm to 1.0μm quantity (Nb/L)
15 -> PM 1.0μm to 2.5μm quantity (Nb/L)
16 -> PM 2.5μm to 5.0μm quantity (Nb/L)
17 -> PM 5.0μm to 10μm quantity (Nb/L)
JSON PAYLOAD (Micro-Spot Servers)
Same as NebuleAir wifi
Endpoint:
api-prod.uspot.probesys.net
nebuleair?token=2AFF6dQk68daFZ
port 443
{"nebuleairid": "82D25549434",
"software_version": "ModuleAirV2-V1-042022",
"sensordatavalues":
[
{"value_type":"NPM_P0","value":"1.54"},
{"value_type":"NPM_P1","value":"1.54"},
{"value_type":"NPM_P2","value":"1.54"},
{"value_type":"NPM_N1","value":"0.02"},
{"value_type":"NPM_N10","value":"0.02"},
{"value_type":"NPM_N25","value":"0.02"},
{"value_type":"MHZ16_CO2","value":"793.00"},
{"value_type":"SGP40_VOC","value":"29915.00"},
{"value_type":"samples","value":"134400"},
{"value_type":"min_micro","value":"137"},
{"value_type":"max_micro","value":"155030"},
{"value_type":"interval","value":"145000"},
{"value_type":"signal","value":"-80"},
{"value_type":"latitude","value":"43.2964"},
{"value_type":"longitude","value":"5.36978"},
{"value_type":"state_npm","value":"State: 00000000"},
{"value_type":"BME280_temperature","value":"28.47"},
{"value_type":"BME280_humidity","value":"28.47"},
{"value_type":"BME280_pressure","value":"28.47"},
{"value_type":"CAIRSENS_NO2","value":"54"},
{"value_type":"CAIRSENS_H2S","value":"54"},
{"value_type":"CAIRSENS_O3","value":"54"}
]
}
"""
import board
import json
import serial
import time
import busio
import re
import os
import traceback
import sys
import RPi.GPIO as GPIO
from threading import Thread
from adafruit_bme280 import basic as adafruit_bme280
# Record the start time of the script
start_time_script = time.time()
# Check system uptime
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
# Skip execution if uptime is less than 2 minutes (120 seconds)
if uptime_seconds < 120:
print(f"System just booted ({uptime_seconds:.2f} seconds uptime), skipping execution.")
sys.exit()
payload_csv = [None] * 20
payload_json = {
"nebuleairid": "82D25549434",
"software_version": "ModuleAirV2-V1-042022",
"sensordatavalues": [] # Empty list to start with
}
aircarto_profile_id = 0
uSpot_profile_id = 1
def blink_led(pin, blink_count, delay=1):
"""
Blink an LED on a specified GPIO pin.
Args:
pin (int): GPIO pin number (BCM mode) to which the LED is connected.
blink_count (int): Number of times the LED should blink.
delay (float): Time in seconds for the LED to stay ON or OFF (default is 1 second).
"""
# GPIO setup
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM) # Use BCM numbering
GPIO.setup(pin, GPIO.OUT) # Set the specified pin as an output
try:
for _ in range(blink_count):
GPIO.output(pin, GPIO.HIGH) # Turn the LED on
#print(f"LED on GPIO {pin} is ON")
time.sleep(delay) # Wait for the specified delay
GPIO.output(pin, GPIO.LOW) # Turn the LED off
#print(f"LED on GPIO {pin} is OFF")
time.sleep(delay) # Wait for the specified delay
finally:
GPIO.cleanup(pin) # Clean up the specific pin to reset its state
print(f"GPIO {pin} cleaned up")
#get data from config
def load_config(config_file):
try:
with open(config_file, 'r') as file:
config_data = json.load(file)
return config_data
except Exception as e:
print(f"Error loading config file: {e}")
return {}
#Fonction pour mettre à jour le JSON de configuration
def update_json_key(file_path, key, value):
"""
Updates a specific key in a JSON file with a new value.
:param file_path: Path to the JSON file.
:param key: The key to update in the JSON file.
:param value: The new value to assign to the key.
"""
try:
# Load the existing data
with open(file_path, "r") as file:
data = json.load(file)
# Check if the key exists in the JSON file
if key in data:
data[key] = value # Update the key with the new value
else:
print(f"Key '{key}' not found in the JSON file.")
return
# Write the updated data back to the file
with open(file_path, "w") as file:
json.dump(data, file, indent=2) # Use indent for pretty printing
print(f"updating '{key}' to '{value}'.")
except Exception as e:
print(f"Error updating the JSON file: {e}")
# Define the config file path
config_file = '/var/www/nebuleair_pro_4g/config.json'
# Load the configuration data
config = load_config(config_file)
loop_activation = config.get('loop_activation', True) #activation de la loop principale
baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4
device_id = config.get('deviceID', '').upper() #device ID en maj
need_to_log = config.get('loop_log', False) #inscription des logs
bme_280_config = config.get('i2c_BME', False) #présence du BME280
i2C_sound_config = config.get('i2C_sound', False) #présence du capteur son
send_aircarto = config.get('send_aircarto', True) #envoi sur AirCarto (data.nebuleair.fr)
send_uSpot = config.get('send_uSpot', False) #envoi sur MicroSpot ()
npm_5channel = config.get('NextPM_5channels', False) #5 canaux du NPM
envea_sondes = config.get('envea_sondes', [])
connected_envea_sondes = [sonde for sonde in envea_sondes if sonde.get('connected', False)]
selected_networkID = config.get('SARA_R4_neworkID', '')
#update device id in the payload json
payload_json["nebuleairid"] = device_id
if loop_activation == False:
print("Loop activation is False , skipping execution.")
sys.exit()
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=baudrate, #115200 ou 9600
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 2
)
ser_NPM = serial.Serial(
port='/dev/ttyAMA5',
baudrate=115200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 1
)
serial_connections = {}
# Sondes Envea
if connected_envea_sondes:
# Pour chacune des sondes
for device in connected_envea_sondes:
port = device.get('port', 'Unknown')
name = device.get('name', 'Unknown')
connected = device.get('connected', False)
serial_connections[name] = serial.Serial(
port=f'/dev/{port}', # Format the port string
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 1
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_line=None, debug=True):
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
elapsed_time = time.time() - start_time # Time since function start
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout # Reset timeout on new data
# Decode and check for the specific line
if wait_for_line:
decoded_response = response.decode('utf-8', errors='replace')
if wait_for_line in decoded_response:
if debug: print(f"[DEBUG] 🔎Found target line: {wait_for_line}")
break
elif time.time() > end_time:
if debug: print(f"[DEBUG] Timeout reached. No more data received.")
break
time.sleep(0.1) # Short sleep to prevent busy waiting
# Final response and debug output
total_elapsed_time = time.time() - start_time
if debug: print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️")
# Check if the elapsed time exceeded 10 seconds
if total_elapsed_time > 10 and debug:
print(f"[ALERT] 🚨 The operation took too long🚨")
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠</span>')
return response.decode('utf-8', errors='replace')
# Open and read the JSON file
try:
# Send the command to request data (e.g., data for 60 seconds)
print('<h3>START LOOP</h3>')
print("Getting NPM values")
ser_NPM.write(b'\x81\x12\x6D')
# Read the response
byte_data = ser_NPM.readline()
#if npm is disconnected byte_data is empty
# Extract the state byte and PM data from the response
state_byte = int.from_bytes(byte_data[2:3], byteorder='big')
state_bits = [int(bit) for bit in bin(state_byte)[2:].zfill(8)]
PM1 = int.from_bytes(byte_data[9:11], byteorder='big') / 10
PM25 = int.from_bytes(byte_data[11:13], byteorder='big') / 10
PM10 = int.from_bytes(byte_data[13:15], byteorder='big') / 10
#Add data to payload CSV
payload_csv[0] = PM1
payload_csv[1] = PM25
payload_csv[2] = PM10
#Add data to payload JSON
payload_json["sensordatavalues"].append({"value_type": "NPM_P0", "value": str(PM1)})
payload_json["sensordatavalues"].append({"value_type": "NPM_P1", "value": str(PM10)})
payload_json["sensordatavalues"].append({"value_type": "NPM_P2", "value": str(PM25)})
# Sonde BME280 connected
if bme_280_config:
print("Getting BME280 values")
#on récupère les infos du BME280 et on les ajoute au payload_csv
i2c = busio.I2C(board.SCL, board.SDA)
bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c, address=0x76)
bme280.sea_level_pressure = 1013.25 # Update this value for your location
payload_csv[3] = round(bme280.temperature, 2)
payload_csv[4] = round(bme280.humidity, 2)
payload_csv[5] = round(bme280.pressure, 2)
payload_json["sensordatavalues"].append({"value_type": "BME280_temperature", "value": f"{round(bme280.temperature, 2)}"})
payload_json["sensordatavalues"].append({"value_type": "BME280_humidity", "value": f"{round(bme280.humidity, 2)}"})
payload_json["sensordatavalues"].append({"value_type": "BME280_pressure", "value": f"{round(bme280.pressure, 2)}"})
# NPM sur 5 cannaux
if npm_5channel:
print("Getting NPM 5 Channels")
# Define the path to the JSON file
json_file_path_npm = "/var/www/nebuleair_pro_4g/NPM/data/data.json"
# Read the JSON file
try:
with open(json_file_path_npm, "r") as file:
data = json.load(file) # Load JSON into a dictionary
# Extract values
channel_1 = data.get("channel_1", 0)
channel_2 = data.get("channel_2", 0)
channel_3 = data.get("channel_3", 0)
channel_4 = data.get("channel_4", 0)
channel_5 = data.get("channel_5", 0)
# Print extracted values
print(f"Channel 1: {channel_1}")
print(f"Channel 2: {channel_2}")
print(f"Channel 3: {channel_3}")
print(f"Channel 4: {channel_4}")
print(f"Channel 5: {channel_5}")
#add to CSV
payload_csv[13] = channel_1
payload_csv[14] = channel_2
payload_csv[15] = channel_3
payload_csv[16] = channel_4
payload_csv[17] = channel_5
except FileNotFoundError:
print(f"Error: JSON file not found at {json_file_path_npm}")
except json.JSONDecodeError:
print("Error: JSON file is not formatted correctly")
except Exception as e:
print(f"Unexpected error: {e}")
# Sonde Bruit connected
if i2C_sound_config:
#on récupère les infos de sound_metermoving et on les ajoute au message
file_path_data_noise = "/var/www/nebuleair_pro_4g/sound_meter/moving_avg_minute.txt"
# Read the file and extract the numbers
try:
with open(file_path_data_noise, "r") as file:
content = file.read().strip()
avg_noise, max_noise, min_noise = map(int, content.split())
# Append the variables to the payload_csv
payload_csv[6] = avg_noise
payload_csv[7] = max_noise
payload_csv[8] = min_noise
except FileNotFoundError:
print(f"Error: File {file_path_data_noise} not found.")
except ValueError:
print("Error: File content is not valid numbers.")
# Sondes Envea
if connected_envea_sondes:
print("Getting Envea values")
# Define the path to the JSON file
json_file_path_envea = "/var/www/nebuleair_pro_4g/envea/data/data.json"
# Read the JSON file
try:
with open(json_file_path_envea, "r") as file:
data = json.load(file) # Load JSON into a dictionary
# Extract values
h2s = data.get("h2s", 0)
no2 = data.get("no2", 0)
o3 = data.get("o3", 0)
# Print extracted values
print(f"h2s : {h2s}")
print(f"no2 : {no2}")
print(f"o3: {o3}")
#add to CSV
payload_csv[10] = h2s
payload_csv[9] = no2
payload_csv[11] = o3
#add to JSON
payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_H2S", "value": str(h2s)})
payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_NO2", "value": str(no2)})
payload_json["sensordatavalues"].append({"value_type": "CAIRSENS_O3", "value": str(o3)})
except FileNotFoundError:
print(f"Error: JSON file not found at {json_file_path_envea}")
except json.JSONDecodeError:
print("Error: JSON file is not formatted correctly")
except Exception as e:
print(f"Unexpected error: {e}")
# Getting the LTE Signal
print("-> Getting LTE signal <-")
ser_sara.write(b'AT+CSQ\r')
response2 = read_complete_response(ser_sara, wait_for_line="OK")
print('<p class="text-danger-emphasis">')
print(response2)
print("</p>")
match = re.search(r'\+CSQ:\s*(\d+),', response2)
if match:
signal_quality = int(match.group(1))
payload_csv[12]=signal_quality
time.sleep(1)
# On vérifie si le signal n'est pas à 99 pour déconnexion
# si c'est le cas on essaie de se reconnecter
if signal_quality == 99:
print('<span style="color: red;font-weight: bold;">⚠ATTENTION: Signal Quality indicates no signal (99)⚠️</span>')
print("TRY TO RECONNECT:")
command = f'AT+COPS=1,2,"{selected_networkID}"\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara, timeout=20, end_of_response_timeout=20)
print('<p class="text-danger-emphasis">')
print(responseReconnect)
print("</p>")
print('🛑STOP LOOP🛑')
print("<hr>")
#on arrete le script pas besoin de continuer
sys.exit()
else:
print("Signal Quality:", signal_quality)
#print(payload_json)
'''
SEND TO AIRCARTO
'''
# Write Data to saraR4
# 1. Open sensordata_csv.json (with correct data size)
csv_string = ','.join(str(value) if value is not None else '' for value in payload_csv)
size_of_string = len(csv_string)
print("Open JSON:")
command = f'AT+UDWNFILE="sensordata_csv.json",{size_of_string}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara, wait_for_line=">", debug=False)
print(response_SARA_1)
time.sleep(1)
#2. Write to shell
print("Write data to memory:")
ser_sara.write(csv_string.encode())
response_SARA_2 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print(response_SARA_2)
#3. Send to endpoint (with device ID)
print("Send data (POST REQUEST):")
command= f'AT+UHTTPC={aircarto_profile_id},4,"/pro_4G/data.php?sensor_id={device_id}","server_response.txt","sensordata_csv.json",4\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_line="+UUHTTPCR")
print('<p class="text-danger-emphasis">')
print(response_SARA_3)
print("</p>")
# si on recoit la réponse UHTTPCR
if "+UUHTTPCR" in response_SARA_3:
print("✅ Received +UUHTTPCR response.")
# Les types de réponse
# 1.La commande n'a pas fonctionné
# +CME ERROR: No connection to phone
# +CME ERROR: Operation not allowed
# 2.La commande fonctionne: elle renvoie un code
# +UUHTTPCR: <profile_id>,<http_command>,<http_result>
# <http_result>: 1 pour sucess et 0 pour fail
# +UUHTTPCR: 0,4,1 -> OK
# +UUHTTPCR: 0,4,0 -> error
# Split response into lines
lines = response_SARA_3.strip().splitlines()
# 1.Vérifier si la réponse contient un message d'erreur CME
if "+CME ERROR" in lines[-1]:
print("*****")
print('<span style="color: red;font-weight: bold;">ATTENTION: CME ERROR</span>')
print("error:", lines[-1])
print("*****")
#update status
update_json_key(config_file, "SARA_R4_network_status", "disconnected")
# Gestion de l'erreur spécifique
if "No connection to phone" in lines[-1]:
print("No connection to the phone. Retrying or reset may be required.")
# Actions spécifiques pour ce type d'erreur (par exemple, réinitialiser ou tenter de reconnecter)
# need to reconnect to network
# and reset HTTP profile (AT+UHTTP=0) -> ne fonctionne pas..
# tester un reset avec CFUN 15
# 1.Reconnexion au réseau (AT+COPS)
command = f'AT+COPS=1,2,"{selected_networkID}"\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara)
print("Response reconnect:")
print(responseReconnect)
print("End response reconnect")
elif "Operation not allowed" in lines[-1]:
print("Operation not allowed. This may require a different configuration.")
# Actions spécifiques pour ce type d'erreur
# Clignotement LED rouge en cas d'erreur
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
else:
# 2.Si la réponse contient une réponse HTTP valide
# Extract HTTP response code from the last line
# ATTENTION: lines[-1] renvoie l'avant dernière ligne et il peut y avoir un soucis avec le OK
# rechercher plutot
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
parts = http_response.split(',')
# 2.1 code 0 (HTTP failed)
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
print("*****")
print('<span style="color: red;font-weight: bold;">ATTENTION: HTTP operation failed</span>')
update_json_key(config_file, "SARA_R4_network_status", "disconnected")
print("*****")
print("Blink red LED")
# Run LED blinking in a separate thread
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
# Get error code
print("Getting error code (11->Server connection error, 73->Secure socket connect error)")
command = f'AT+UHTTPER={aircarto_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_9 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print('<p class="text-danger-emphasis">')
print(response_SARA_9)
print("</p>")
'''
+UHTTPER: profile_id,error_class,error_code
error_class
0 OK, no error
3 HTTP Protocol error class
10 Wrong HTTP API USAGE
error_code (for error_class 3)
0 No error
11 Server connection error
73 Secure socket connect error
'''
#Pas forcément un moyen de résoudre le soucis
#print("resetting the URL (domain name):")
#command = f'AT+UHTTP={aircarto_profile_id},1,"{url_nebuleair}"\r'
#ser_sara.write(command.encode('utf-8'))
#response_SARA_31 = read_complete_response(ser_sara)
#print(response_SARA_31)
# 2.2 code 1 (HHTP succeded)
else:
# Si la commande HTTP a réussi
print('<span class="badge text-bg-success">HTTP operation successful.</span>')
update_json_key(config_file, "SARA_R4_network_status", "connected")
print("Blink blue LED")
led_thread = Thread(target=blink_led, args=(23, 5, 0.5))
led_thread.start()
#4. Read reply from server
print("Reply from server:")
ser_sara.write(b'AT+URDFILE="server_response.txt"\r')
response_SARA_4 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print('<p class="text-success">')
print(response_SARA_4)
print('</p>')
else:
print('<span style="color: red;font-weight: bold;">No UUHTTPCR response</span>')
print("Blink red LED")
# Run LED blinking in a separate thread
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
led_thread.start()
#5. empty json
print("Empty SARA memory:")
ser_sara.write(b'AT+UDELFILE="sensordata_csv.json"\r')
response_SARA_5 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print(response_SARA_5)
'''
SEND TO MICRO SPOT
'''
if send_uSpot:
print(">>>>>>>>")
print(">>>>>>>>")
print(">>>>>>>>")
print("SEND TO MICRO SPOT (HTTP):")
#step 4: set url (op_code = 1)
print("****")
print("SET URL")
command = f'AT+UHTTP={uSpot_profile_id},1,"api-prod.uspot.probesys.net"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print(response_SARA_5)
time.sleep(1)
#step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2)
print("****")
print("SET SSL")
command = f'AT+UHTTP={uSpot_profile_id},6,0\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_5 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print(response_SARA_5)
time.sleep(1)
#step 4: set PORT (op_code = 5)
print("****")
print("SET PORT")
command = f'AT+UHTTP={uSpot_profile_id},5,81\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_55 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print(response_SARA_55)
time.sleep(1)
# Write Data to saraR4
# 1. Open sensordata_json.json (with correct data size)
print("Open JSON:")
payload_string = json.dumps(payload_json) # Convert dict to JSON string
size_of_string = len(payload_string)
command = f'AT+UDWNFILE="sensordata_json.json",{size_of_string}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara, wait_for_line=">", debug=False)
print(response_SARA_1)
time.sleep(1)
#2. Write to shell
print("Write to memory:")
ser_sara.write(payload_string.encode())
response_SARA_2 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print(response_SARA_2)
#step 4: trigger the request (http_command=1 for GET and http_command=1 for POST)
print("****")
print("Trigger POST REQUEST")
command = f'AT+UHTTPC={uSpot_profile_id},4,"/nebuleair?token=2AFF6dQk68daFZ","http.resp","sensordata_json.json",4\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=30, wait_for_line="+UUHTTPCR")
print('<p class="text-danger-emphasis">')
print(response_SARA_3)
print("</p>")
#READ REPLY
print("****")
print("Read reply from server")
ser_sara.write(b'AT+URDFILE="http.resp"\r')
response_SARA_7 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print('<p class="text-success">')
print(response_SARA_7)
print('</p>')
#5. empty json
print("Empty SARA memory:")
ser_sara.write(b'AT+UDELFILE="sensordata_json.json"\r')
response_SARA_8 = read_complete_response(ser_sara, wait_for_line="OK", debug=False)
print(response_SARA_8)
# Calculate and print the elapsed time
elapsed_time = time.time() - start_time_script
print(f"Elapsed time: {elapsed_time:.2f} seconds")
print("<hr>")
except Exception as e:
print("An error occurred:", e)
traceback.print_exc() # This prints the full traceback