Files
moduleair_pro_4g/loop/SARA_send_data_v2.py
2025-07-02 13:52:03 +02:00

1373 lines
59 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
____ _ ____ _ ____ _ ____ _
/ ___| / \ | _ \ / \ / ___| ___ _ __ __| | | _ \ __ _| |_ __ _
\___ \ / _ \ | |_) | / _ \ \___ \ / _ \ '_ \ / _` | | | | |/ _` | __/ _` |
___) / ___ \| _ < / ___ \ ___) | __/ | | | (_| | | |_| | (_| | || (_| |
|____/_/ \_\_| \_\/_/ \_\ |____/ \___|_| |_|\__,_| |____/ \__,_|\__\__,_|
Main loop to gather data from sensor inside SQLite database:
* NPM
* Envea
* I2C BME280
* Noise sensor
and send it to AirCarto servers via SARA R4 HTTP post requests
also send the timestamp (already stored inside the DB) !
/usr/bin/python3 /var/www/moduleair_pro_4g/loop/SARA_send_data_v2.py
ATTENTION:
# This script is triggered every minutes by /var/www/moduleair_pro_4g/master.py (as a service)
CSV PAYLOAD (AirCarto Servers)
Endpoint:
data.moduleair.fr
/pro_4G/data.php?sensor_id={device_id}&timestamp={rtc_module_time}
ATTENTION : do not change order !
CSV size: 18
{PM1},{PM25},{PM10},{temp},{hum},{press},{avg_noise},{max_noise},{min_noise},{envea_no2},{envea_h2s},{envea_o3},{4g_signal_quality}
0 -> PM1 (μg/m3)
1 -> PM25 (μg/m3)
2 -> PM10 (μg/m3)
3 -> temp
4 -> hum
5 -> press
6 -> avg_noise
7 -> max_noise
8 -> min_noise
9 -> envea_no2
10 -> envea_h2s
11 -> envea_nh3
12 -> 4G signal quality,
13 -> PM 0.2μm to 0.5μm quantity (Nb/L)
14 -> PM 0.5μm to 1.0μm quantity (Nb/L)
15 -> PM 1.0μm to 2.5μm quantity (Nb/L)
16 -> PM 2.5μm to 5.0μm quantity (Nb/L)
17 -> PM 5.0μm to 10μm quantity (Nb/L)
18 -> NPM temp inside
19 -> NPM hum inside
20 -> CO2
JSON PAYLOAD (Micro-Spot Servers)
Same as moduleair wifi
Endpoint:
api-prod.uspot.probesys.net
moduleair?token=2AFF6dQk68daFZ
port 443
{"moduleairid": "82D25549434",
"software_version": "ModuleAirV2-V1-042022",
"sensordatavalues":
[
{"value_type":"NPM_P0","value":"1.54"},
{"value_type":"NPM_P1","value":"1.54"},
{"value_type":"NPM_P2","value":"1.54"},
{"value_type":"NPM_N1","value":"0.02"},
{"value_type":"NPM_N10","value":"0.02"},
{"value_type":"NPM_N25","value":"0.02"},
{"value_type":"MHZ16_CO2","value":"793.00"},
{"value_type":"SGP40_VOC","value":"29915.00"},
{"value_type":"samples","value":"134400"},
{"value_type":"min_micro","value":"137"},
{"value_type":"max_micro","value":"155030"},
{"value_type":"interval","value":"145000"},
{"value_type":"signal","value":"-80"},
{"value_type":"latitude","value":"43.2964"},
{"value_type":"longitude","value":"5.36978"},
{"value_type":"state_npm","value":"State: 00000000"},
{"value_type":"BME280_temperature","value":"28.47"},
{"value_type":"BME280_humidity","value":"28.47"},
{"value_type":"BME280_pressure","value":"28.47"},
{"value_type":"CAIRSENS_NO2","value":"54"},
{"value_type":"CAIRSENS_H2S","value":"54"},
{"value_type":"CAIRSENS_O3","value":"54"}
]
}
"""
#import board
import json
import serial
import time
import busio
import requests
import re
import os
import traceback
import sys
import sqlite3
import RPi.GPIO as GPIO
from threading import Thread
from datetime import datetime
# Record the start time of the script
start_time_script = time.time()
#Payload CSV to be sent to data.moduleair.fr
payload_csv = [None] * 25
#Payload JSON to be sent to uSpot
payload_json = {
"moduleairid": "XXX",
"software_version": "ModuleAirV2-V1-042022",
"sensordatavalues": [] # Empty list to start with
}
# SARA R4 UHTTPC profile IDs
aircarto_profile_id = 0
uSpot_profile_id = 1
# database connection
conn = sqlite3.connect("/var/www/moduleair_pro_4g/sqlite/sensors.db")
cursor = conn.cursor()
def update_config_sqlite(key, value, value_type=None, max_retries=4, retry_delay=1.0):
"""
Update or insert a configuration value in SQLite config table with retry logic
Args:
key (str): Configuration key
value: Configuration value (any type)
value_type (str, optional): Force specific type ('str', 'int', 'float', 'bool')
If None, auto-detect from value type
max_retries (int): Maximum number of retry attempts (default: 4)
retry_delay (float): Delay between retries in seconds (default: 1.0)
Returns:
bool: True if successful, False otherwise
"""
# Auto-detect type if not specified
if value_type is None:
if isinstance(value, bool):
value_type = 'bool'
elif isinstance(value, int):
value_type = 'int'
elif isinstance(value, float):
value_type = 'float'
else:
value_type = 'str'
# Convert value to string for storage
if value_type == 'bool':
str_value = '1' if value else '0'
else:
str_value = str(value)
# Retry logic
for attempt in range(max_retries):
try:
# Use WAL mode and immediate transaction for better concurrency
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000") # Wait up to 30 seconds for lock
# Begin immediate transaction to acquire lock quickly
conn.execute("BEGIN IMMEDIATE")
# Use INSERT OR REPLACE to update existing or create new
cursor.execute("""
INSERT OR REPLACE INTO config_table (key, value, type)
VALUES (?, ?, ?)
""", (key, str_value, value_type))
conn.commit()
print(f"✓ Config updated: {key} = {value} ({value_type}) [attempt {attempt + 1}]")
return True
except sqlite3.OperationalError as e:
if "database is locked" in str(e).lower() or "locked" in str(e).lower():
print(f"⚠ Database locked on attempt {attempt + 1}/{max_retries} for key '{key}': {e}")
# Rollback any pending transaction
try:
conn.rollback()
except:
pass
# If not the last attempt, wait and retry
if attempt < max_retries - 1:
print(f"⏳ Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
# Increase delay slightly for each retry (exponential backoff)
retry_delay *= 1.2
else:
print(f"✗ Failed to update config {key} after {max_retries} attempts: database remains locked")
return False
else:
# Non-lock related operational error
print(f"✗ Operational error updating config {key}: {e}")
try:
conn.rollback()
except:
pass
return False
except Exception as e:
# Other unexpected errors
print(f"✗ Unexpected error updating config {key} on attempt {attempt + 1}: {e}")
try:
conn.rollback()
except:
pass
# For unexpected errors, don't retry unless it's clearly a temporary issue
if "busy" in str(e).lower() or "locked" in str(e).lower():
if attempt < max_retries - 1:
print(f"⏳ Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 1.2
continue
return False
# If we get here, all retries failed
print(f"✗ Failed to update config {key} after {max_retries} attempts")
return False
#get config data from SQLite table
def load_config_sqlite():
"""
Load configuration data from SQLite config table
Returns:
dict: Configuration data with proper type conversion
"""
try:
# Query the config table
cursor.execute("SELECT key, value, type FROM config_table")
rows = cursor.fetchall()
# Create config dictionary
config_data = {}
for key, value, type_name in rows:
# Convert value based on its type
if type_name == 'bool':
config_data[key] = value == '1' or value == 'true'
elif type_name == 'int':
config_data[key] = int(value)
elif type_name == 'float':
config_data[key] = float(value)
else:
config_data[key] = value
return config_data
except Exception as e:
print(f"Error loading config from SQLite: {e}")
return {}
#Load config
config = load_config_sqlite()
#config
device_id = config.get('deviceID', 'unknown')
device_id = device_id.upper()
modem_config_mode = config.get('modem_config_mode', False)
device_latitude_raw = config.get('latitude_raw', 0)
device_longitude_raw = config.get('longitude_raw', 0)
modem_version=config.get('modem_version', "")
Sara_baudrate = config.get('SaraR4_baudrate', 115200)
npm_5channel = config.get('npm_5channel', False) #5 canaux du NPM
selected_networkID = int(config.get('SARA_R4_neworkID', 0))
send_uSpot = config.get('send_uSpot', False) #envoi sur MicroSpot ()
bme_280_config = config.get('BME280', False)
co2_mhz19= config.get('MHZ19', False)
sensirion_sfa30= config.get('SFA30', False)
reset_uSpot_url = False
#update device id in the payload json
payload_json["moduleairid"] = device_id
# Skip execution if modem_config_mode is true
if modem_config_mode:
print("Modem 4G (SARA R4) is in config mode -> EXIT")
sys.exit()
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=Sara_baudrate, #115200 ou 9600
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
'''
Fonction très importante !!!
Reads the complete response from a serial connection and waits for specific lines.
timeout -> temps d'attente de la réponse de la première ligne (assez rapide car le SARA répond direct avec la commande recue)
end_of_response_timeout -> le temps d'inactivité entre deux lignes imprimées (plus long dans certain cas: le SARA mouline avant de finir vraiment)
wait_for_lines -> si on rencontre la string la fonction s'arrete
'''
if wait_for_lines is None:
wait_for_lines = [] # Default to an empty list if not provided
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
elapsed_time = time.time() - start_time # Time since function start
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout # Reset timeout on new data
# Decode and check for any target line
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
if debug:
print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)")
return decoded_response # Return response immediately if a target line is found
elif time.time() > end_time:
if debug:
print(f"[DEBUG] Timeout reached. No more data received.")
break
time.sleep(0.1) # Short sleep to prevent busy waiting
# Final response and debug output
total_elapsed_time = time.time() - start_time
if debug:
print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️")
# Check if the elapsed time exceeded 10 seconds
if total_elapsed_time > 10 and debug:
print(f"[ALERT] 🚨 The operation took too long 🚨")
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠</span>')
return response.decode('utf-8', errors='replace') # Return the full response if no target line is found
def extract_error_code(response):
"""
Extract just the error code from AT+UHTTPER response
"""
for line in response.split('\n'):
if '+UHTTPER' in line:
try:
# Split the line and get the third value (error code)
parts = line.split(':')[1].strip().split(',')
if len(parts) >= 3:
error_code = int(parts[2])
return error_code
except:
pass
# Return None if we couldn't find the error code
return None
def send_error_notification(device_id, error_type, additional_info=None):
"""
Send an error notification to the server when issues with the SARA module occur.
Will silently fail if there's no internet connection.
Parameters:
-----------
device_id : str
The unique identifier of the device
error_type : str
Type of error encountered (e.g., 'serial_error', 'cme_error', 'http_error', 'timeout')
additional_info : str, optional
Any additional information about the error for logging purposes
Returns:
--------
bool
True if notification was sent successfully, False otherwise
"""
# Create the alert URL with all relevant parameters
base_url = 'http://data.nebuleair.fr/pro_4G/alert.php'
alert_url = f'{base_url}?capteur_id={device_id}&error_type={error_type}'
# Add additional info if provided
if additional_info:
# Make sure to URL encode the additional info
from urllib.parse import quote
alert_url += f'&details={quote(str(additional_info))}'
# Try to send the notification, catch ALL exceptions
try:
response = requests.post(alert_url, timeout=3)
if response.status_code == 200:
print(f"✅ Alert notification sent successfully")
return True
else:
print(f"⚠️ Alert notification failed: Status code {response.status_code}")
except Exception as e:
print(f"⚠️ Alert notification couldn't be sent: {e}")
return False
def modem_hardware_reboot():
"""
Performs a hardware reboot using transistors connected to pin 16 and 20:
pin 16 set to SARA GND
pin 20 set to SARA ON (not used)
LOW -> cut the current
HIGH -> current flow
"""
print('<span style="color: orange;font-weight: bold;">🔄 Hardware SARA reboot 🔄</span>')
SARA_power_GPIO = 16
SARA_ON_GPIO = 20
GPIO.setmode(GPIO.BCM) # Use BCM numbering
GPIO.setup(SARA_power_GPIO, GPIO.OUT) # Set GPIO17 as an output
GPIO.output(SARA_power_GPIO, GPIO.LOW)
time.sleep(2)
GPIO.output(SARA_power_GPIO, GPIO.HIGH)
time.sleep(2)
print("Checking if modem is responsive...")
for attempt in range(5):
ser_sara.write(b'AT\r')
response_check = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=True)
if response_check and "OK" in response_check:
print("✅ Modem is responsive after reboot.")
return True
print(f"⏳ Waiting for modem... attempt {attempt + 1}")
time.sleep(2)
else:
print("❌ Modem not responding after reboot.")
return False
def reset_PSD_CSD_connection():
"""
Function that reset the PSD CSD connection for the SARA R5
returns true or false
"""
print("Reseting PDP connection ")
pdp_reset_success = True
# Activate PDP context 1
print('➡️ Activate PDP context 1')
command = f'AT+CGACT=1,1\r'
ser_sara.write(command.encode('utf-8'))
response_pdp1 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_pdp1, end="")
pdp_reset_success = pdp_reset_success and (response_pdp1 is not None and "OK" in response_pdp1)
time.sleep(1)
# Set the PDP type
print('➡️ Set the PDP type to IPv4 referring to the output of the +CGDCONT read command')
command = f'AT+UPSD=0,0,0\r'
ser_sara.write(command.encode('utf-8'))
response_pdp2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_pdp2, end="")
pdp_reset_success = pdp_reset_success and (response_pdp2 is not None and "OK" in response_pdp2)
time.sleep(1)
# Profile #0 is mapped on CID=1
print('➡️ Profile #0 is mapped on CID=1.')
command = f'AT+UPSD=0,100,1\r'
ser_sara.write(command.encode('utf-8'))
response_pdp3 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_pdp3, end="")
pdp_reset_success = pdp_reset_success and (response_pdp3 is not None and "OK" in response_pdp3)
time.sleep(1)
# Activate the PSD profile
print('➡️ Activate the PSD profile #0: the IPv4 address is already assigned by the network.')
command = f'AT+UPSDA=0,3\r'
ser_sara.write(command.encode('utf-8'))
response_pdp4 = read_complete_response(ser_sara, wait_for_lines=["OK", "+UUPSDA"])
print(response_pdp4, end="")
pdp_reset_success = pdp_reset_success and (response_pdp4 is not None and ("OK" in response_pdp4 or "+UUPSDA" in response_pdp4))
time.sleep(1)
if not pdp_reset_success:
print("⚠️ PDP connection reset had some issues")
return pdp_reset_success
def reset_server_hostname(profile_id):
"""
Function that reset server hostname (URL) connection for the SARA R5
returns true or false
"""
print("Reseting Server Hostname connection ")
http_reset_success = False # Default fallback
if profile_id == 0:
print('<span style="color: orange;font-weight: bold;">🔧 Resetting AirCarto HTTP Profile</span>')
command = f'AT+UHTTP={profile_id},1,"data.moduleair.fr"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5)
time.sleep(1)
http_reset_success = response_SARA_5 is not None and "OK" in response_SARA_5
if not http_reset_success:
print("⚠️ AirCarto HTTP profile reset failed")
elif profile_id ==1:
pass # on utilise la fonction reset_server_hostname_https pour uSpot
else:
print(f"❌ Unsupported profile ID: {profile_id}")
http_reset_success = False
return http_reset_success
def reset_server_hostname_https(profile_id):
"""
Function that reset server hostname (URL) connection for the SARA R5
returns true or false
"""
print("Reseting Server Hostname HTTS secure connection ")
http_reset_success = False # Default fallback
#Pour uSpot
if profile_id == 1:
print('<span style="color: orange;font-weight: bold;">🔧 Resetting uSpot HTTPs Profile</span>')
uSpot_url="api-prod.uspot.probesys.net"
security_profile_id = 1
#step 1: import the certificate
print("➡️ import certificate")
certificate_name = "e6"
with open("/var/www/moduleair_pro_4g/SARA/SSL/certificate/e6.pem", "rb") as cert_file:
certificate = cert_file.read()
size_of_string = len(certificate)
# AT+USECMNG=0,<type>,<internal_name>,<data_size>
# type-> 0 -> trusted root CA
command = f'AT+USECMNG=0,0,"{certificate_name}",{size_of_string}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=[">"])
print(response_SARA_1)
time.sleep(0.5)
print("➡️ add certificate")
ser_sara.write(certificate)
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_2)
time.sleep(0.5)
# op_code: 0 -> certificate validation level
# param_val : 0 -> Level 0 No validation; 1-> Level 1 Root certificate validation
print("Set the security profile (params)")
certification_level=0
command = f'AT+USECPRF={security_profile_id},0,{certification_level}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5b = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5b)
time.sleep(0.5)
# op_code: 1 -> minimum SSL/TLS version
# param_val : 0 -> any; server can use any version for the connection; 1-> LSv1.0; 2->TLSv1.1; 3->TLSv1.2;
print("Set the security profile (params)")
minimum_SSL_version = 0
command = f'AT+USECPRF={security_profile_id},1,{minimum_SSL_version}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5bb = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5bb)
time.sleep(0.5)
#op_code: 2 -> legacy cipher suite selection
# 0 (factory-programmed value): a list of default cipher suites is proposed at the beginning of handshake process, and a cipher suite will be negotiated among the cipher suites proposed in the list.
print("Set cipher")
cipher_suite = 0
command = f'AT+USECPRF={security_profile_id},2,{cipher_suite}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5cc = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5cc)
time.sleep(0.5)
# op_code: 3 -> trusted root certificate internal name
print("Set the security profile (choose cert)")
command = f'AT+USECPRF={security_profile_id},3,"{certificate_name}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5c)
time.sleep(0.5)
# op_code: 10 -> SNI (server name indication)
print("Set the SNI")
command = f'AT+USECPRF={security_profile_id},10,"{uSpot_url}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5cf = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5cf)
time.sleep(0.5)
#step 4: set url (op_code = 1)
print("SET URL")
command = f'AT+UHTTP={profile_id},1,"{uSpot_url}"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5)
time.sleep(1)
#step 4: set PORT (op_code = 5)
print("SET PORT")
port = 443
command = f'AT+UHTTP={profile_id},5,{port}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_55)
time.sleep(1)
#step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2)
print("SET SSL")
http_secure = 1
command = f'AT+UHTTP={profile_id},6,{http_secure},{security_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_5fg = read_complete_response(ser_sara, wait_for_lines=["OK"])
print(response_SARA_5fg)
time.sleep(1)
http_reset_success = response_SARA_5 is not None and "OK" in response_SARA_5
if not http_reset_success:
print("⚠️ AirCarto HTTP profile reset failed")
#Pour uSpot
elif profile_id ==1:
pass #on utilise la fonction reset_server_hostname_https pour uSpot
else:
print(f"❌ Unsupported profile ID: {profile_id}")
http_reset_success = False
return http_reset_success
try:
'''
_ ___ ___ ____
| | / _ \ / _ \| _ \
| | | | | | | | | |_) |
| |__| |_| | |_| | __/
|_____\___/ \___/|_|
'''
print('<h3>START LOOP</h3>')
# Check system uptime
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
# Skip execution if uptime is less than 2 minutes (120 seconds)
if uptime_seconds < 120:
print(f"System just booted ({uptime_seconds:.2f} seconds uptime), skipping execution.")
update_config_sqlite('SARA_network_status', 'booting')
sys.exit()
#Local timestamp
cursor.execute("SELECT * FROM timestamp_table LIMIT 1")
row = cursor.fetchone() # Get the first (and only) row
rtc_time_str = row[1] # '2025-02-07 12:30:45'
print(f"Getting local timestamp: {rtc_time_str}")
if rtc_time_str == 'not connected':
print("⛔ Atttention RTC module not connected⛔")
rtc_status = "disconnected"
influx_timestamp="rtc_disconnected"
else :
# Convert to a datetime object
dt_object = datetime.strptime(rtc_time_str, '%Y-%m-%d %H:%M:%S')
# Check if timestamp is reset (year 2000)
if dt_object.year == 2000:
print("⛔ Attention: RTC has been reset to default date ⛔")
rtc_status = "reset"
else:
print("✅ RTC timestamp is valid")
rtc_status = "valid"
# Always convert to InfluxDB format
# Convert to InfluxDB RFC3339 format with UTC 'Z' suffix
influx_timestamp = dt_object.strftime('%Y-%m-%dT%H:%M:%SZ')
rtc_status = "valid"
#print(influx_timestamp)
#NEXTPM
print("Getting NPM values (last 6 measures)")
#cursor.execute("SELECT * FROM data_NPM ORDER BY timestamp DESC LIMIT 1")
#cursor.execute("SELECT * FROM data_NPM ORDER BY timestamp DESC LIMIT 6")
cursor.execute("SELECT rowid, * FROM data_NPM ORDER BY rowid DESC LIMIT 6")
rows = cursor.fetchall()
# Exclude the timestamp column (assuming first column is timestamp)
data_values = [row[2:] for row in rows] # Exclude timestamp
# Compute column-wise average
num_columns = len(data_values[0])
averages = [round(sum(col) / len(col),1) for col in zip(*data_values)]
PM1 = averages[0]
PM25 = averages[1]
PM10 = averages[2]
npm_temp = averages[3]
npm_hum = averages[4]
#Add data to payload CSV
payload_csv[0] = PM1
payload_csv[1] = PM25
payload_csv[2] = PM10
payload_csv[18] = npm_temp
payload_csv[19] = npm_hum
#Add data to payload JSON
payload_json["sensordatavalues"].append({"value_type": "NPM_P0", "value": str(PM1)})
payload_json["sensordatavalues"].append({"value_type": "NPM_P1", "value": str(PM10)})
payload_json["sensordatavalues"].append({"value_type": "NPM_P2", "value": str(PM25)})
#NextPM 5 channels
if npm_5channel:
print("Getting NextPM 5 channels values (last 6 measures)")
cursor.execute("SELECT * FROM data_NPM_5channels ORDER BY timestamp DESC LIMIT 6")
rows = cursor.fetchall()
# Exclude the timestamp column (assuming first column is timestamp)
data_values = [row[1:] for row in rows] # Exclude timestamp
# Compute column-wise average
num_columns = len(data_values[0])
averages = [round(sum(col) / len(col)) for col in zip(*data_values)]
# Store averages in specific indices
payload_csv[13] = averages[0] # Channel 1
payload_csv[14] = averages[1] # Channel 2
payload_csv[15] = averages[2] # Channel 3
payload_csv[16] = averages[3] # Channel 4
payload_csv[17] = averages[4] # Channel 5
#BME280
if bme_280_config:
print("Getting BME280 values")
cursor.execute("SELECT * FROM data_BME280 ORDER BY timestamp DESC LIMIT 1")
last_row = cursor.fetchone()
if last_row:
print("SQLite DB last available row:", last_row)
BME280_temperature = last_row[1]
BME280_humidity = last_row[2]
BME280_pressure = last_row[3]
#Add data to payload CSV
payload_csv[3] = BME280_temperature
payload_csv[4] = BME280_humidity
payload_csv[5] = BME280_pressure
#Add data to payload JSON
payload_json["sensordatavalues"].append({"value_type": "BME280_temperature", "value": str(BME280_temperature)})
payload_json["sensordatavalues"].append({"value_type": "BME280_humidity", "value": str(BME280_humidity)})
payload_json["sensordatavalues"].append({"value_type": "BME280_pressure", "value": str(BME280_pressure)})
else:
print("No data available in the database.")
#HM-Z19
if co2_mhz19:
# CO2 sensor - average of last 6 measures
print("Getting CO2 values (last 6 measures)")
cursor.execute("SELECT rowid, * FROM data_CO2 ORDER BY rowid DESC LIMIT 6")
rows = cursor.fetchall()
if rows:
# Extract just the CO2 values (assuming CO2 is in column 2 after rowid and timestamp)
co2_values = [row[2] for row in rows] # Adjust index based on your table structure
# Calculate average
co2_average = round(sum(co2_values) / len(co2_values), 1)
# Add data to payload CSV
payload_csv[20] = co2_average # Choose appropriate index
# Add data to payload JSON
payload_json["sensordatavalues"].append({"value_type": "MHZ16_CO2", "value": str(co2_average)})
print(f"CO2 average from {len(co2_values)} measurements: {co2_average}")
else:
print("No CO2 data available in the database.")
#print("Verify SARA R4 connection")
# Getting the LTE Signal
print("Getting LTE signal")
ser_sara.write(b'AT+CSQ\r')
response2 = read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR", "+CME ERROR"])
print('<p class="text-danger-emphasis">')
print(response2)
print("</p>", end="")
#Here it's possible that the SARA do not repond at all or send a error message
#-> TO DO : harware reboot
#-> send notification
#-> end loop, no need to continue
#1. No answer at all form SARA
if response2 is None or response2 == "":
print("ATTENTION: No answer from SARA module")
print('🛑STOP LOOP🛑')
print("<hr>")
#Send notification (WIFI)
send_error_notification(device_id, "SERIAL ISSUE ->no answer from sara")
#Hardware Reboot
hardware_reboot_success = modem_hardware_reboot()
if hardware_reboot_success:
print("✅Modem successfully rebooted and reinitialized")
else:
print("⛔There were issues with the modem reboot/reinitialize process")
#end loop
sys.exit()
#2. si on a une reponse du SARA mais c'est une erreur
elif "+CME ERROR" in response2:
print(f"SARA module returned error: {response2}")
print("The CSQ command is not supported by this module or in its current state")
print("ATTENTION: SARA is connected over serial but CSQ command not supported")
print('🛑STOP LOOP🛑')
print("<hr>")
#end loop
sys.exit()
#3. On peut avoir une erreur de type "Socket:bind: Treck error 222 : Invalid argument"
elif "Socket:bind: Treck error" in response2:
print(f"SARA module returned error: {response2}")
print("ATTENTION: low-level error from the Treck TCP/IP stack")
print('🛑STOP LOOP🛑')
print("<hr>")
#Send notification (WIFI)
send_error_notification(device_id, "SERIAL ISSUE -> Treck TCP/IP stack error")
#hardware reboot
hardware_reboot_success = modem_hardware_reboot()
if hardware_reboot_success:
print("✅Modem successfully rebooted and reinitialized")
else:
print("⛔There were issues with the modem reboot/reinitialize process")
#end loop
sys.exit()
else :
print("✅SARA is connected over serial")
match = re.search(r'\+CSQ:\s*(\d+),', response2)
if match:
signal_quality = int(match.group(1))
payload_csv[12]=signal_quality
time.sleep(0.1)
# On vérifie si le signal n'est pas à 99 pour déconnexion
# si c'est le cas on essaie de se reconnecter
if signal_quality == 99:
update_config_sqlite('SARA_network_status', 'disconnected')
update_config_sqlite('SARA_signal_quality', '99')
print('<span style="color: red;font-weight: bold;">⚠ATTENTION: Signal Quality indicates no signal (99)⚠️</span>')
print("TRY TO RECONNECT:")
command = f'AT+COPS=1,2,"{selected_networkID}"\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara, timeout=20, end_of_response_timeout=20, wait_for_lines=["OK", "+CME ERROR", "ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(responseReconnect)
print("</p>", end="")
print('🛑STOP LOOP🛑')
print("<hr>")
#on arrete le script pas besoin de continuer
sys.exit()
else:
print("Signal Quality:", signal_quality)
update_config_sqlite('SARA_signal_quality', signal_quality)
'''
____ _____ _ _ ____ _ ___ ____ ____ _ ____ _____ ___
/ ___|| ____| \ | | _ \ / \ |_ _| _ \ / ___| / \ | _ \_ _/ _ \
\___ \| _| | \| | | | | / _ \ | || |_) | | / _ \ | |_) || || | | |
___) | |___| |\ | |_| | / ___ \ | || _ <| |___ / ___ \| _ < | || |_| |
|____/|_____|_| \_|____/ /_/ \_\___|_| \_\\____/_/ \_\_| \_\|_| \___/
'''
print('<p class="fw-bold">➡SEND TO AIRCARTO SERVERS</p>', end="")
# Write Data to saraR4
# 1. Open sensordata_csv.json (with correct data size)
csv_string = ','.join(str(value) if value is not None else '' for value in payload_csv)
size_of_string = len(csv_string)
print("Open JSON:")
command = f'AT+UDWNFILE="sensordata_csv.json",{size_of_string}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=[">"], debug=False)
print(response_SARA_1)
time.sleep(1)
#2. Write to shell
print("Write data to memory:")
ser_sara.write(csv_string.encode())
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_2)
#3. Send to endpoint (with device ID)
print("Send data (POST REQUEST):")
command= f'AT+UHTTPC={aircarto_profile_id},4,"/pro_4G/data.php?sensor_id={device_id}&datetime={influx_timestamp}","server_response.txt","sensordata_csv.json",4\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR", "ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(response_SARA_3)
print("</p>", end="")
# si on recoit la réponse UHTTPCR
if "+UUHTTPCR" in response_SARA_3:
print("✅ Received +UUHTTPCR response.")
# Les types de réponse
# 1.La commande n'a pas fonctionné
# +CME ERROR: No connection to phone
# +CME ERROR: Operation not allowed
# 2.La commande fonctionne: elle renvoie un code
# +UUHTTPCR: <profile_id>,<http_command>,<http_result>
# <http_result>: 1 pour sucess et 0 pour fail
# +UUHTTPCR: 0,4,1 -> OK
# +UUHTTPCR: 0,4,0 -> error
# Split response into lines
lines = response_SARA_3.strip().splitlines()
# 1.Vérifier si la réponse contient un message d'erreur CME
if "+CME ERROR" in lines[-1]:
print("*****")
print('<span style="color: red;font-weight: bold;">ATTENTION: CME ERROR</span>')
print("error:", lines[-1])
print("*****")
# Gestion de l'erreur spécifique
if "No connection to phone" in lines[-1]:
print("No connection to the phone. Retrying or reset may be required.")
# Actions spécifiques pour ce type d'erreur (par exemple, réinitialiser ou tenter de reconnecter)
# need to reconnect to network
# and reset HTTP profile (AT+UHTTP=0) -> ne fonctionne pas..
# tester un reset avec CFUN 15
# 1.Reconnexion au réseau (AT+COPS)
command = f'AT+COPS=1,2,"{selected_networkID}"\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara)
print("Response reconnect:")
print(responseReconnect)
print("End response reconnect")
elif "Operation not allowed" in lines[-1]:
print("Operation not allowed. This may require a different configuration.")
# Actions spécifiques pour ce type d'erreur
else:
# 2.Si la réponse contient une réponse HTTP valide
# Extract HTTP response code from the last line
# ATTENTION: lines[-1] renvoie l'avant dernière ligne et il peut y avoir un soucis avec le OK
# rechercher plutot
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
parts = http_response.split(',')
# 2.1 code 0 (HTTP failed) ⛔⛔⛔
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
print("*****")
print('<span style="color: red;font-weight: bold;">ATTENTION: HTTP operation failed</span>')
print("*****")
update_config_sqlite('SARA_network_status', 'error')
# Get error code
print("Getting error code")
command = f'AT+UHTTPER={aircarto_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_9 = read_complete_response(ser_sara, wait_for_lines=["OK","ERROR"], debug=False)
print('<p class="text-danger-emphasis">')
print(response_SARA_9)
print("</p>", end="")
# Extract just the error code
error_code = extract_error_code(response_SARA_9)
if error_code is not None:
# Display interpretation based on error code
if error_code == 0:
print('<p class="text-success">No error detected</p>', end="")
elif error_code == 4:
print('<p class="text-danger">Error 4: Invalid server Hostname</p>', end="")
send_error_notification(device_id, "UHTTPER (error n°4) -> Invalid Server Hostname", end="")
server_hostname_resets = reset_server_hostname(aircarto_profile_id)
if server_hostname_resets:
print("✅server hostname reset successfully", end="")
else:
print("⛔There were issues with the modem server hostname reinitialize process")
elif error_code == 11:
print('<p class="text-danger">Error 11: Server connection error</p>')
elif error_code == 22:
print('<p class="text-danger">⚠Error 22: PSD or CSD connection not established (SARA-R5 need to reset PDP conection)⚠️</p>')
send_error_notification(device_id, "UHTTPER (error n°22) -> PSD or CSD connection not established")
psd_csd_resets = reset_PSD_CSD_connection()
if psd_csd_resets:
print("✅PSD CSD connection reset successfully", end="")
else:
print("⛔There were issues with the modem CSD PSD reinitialize process", end="")
elif error_code == 26:
print('<p class="text-danger">Error 26: Connection timed out</p>', end="")
send_error_notification(device_id, "UHTTPER (error n°26) -> Connection timed out", end="")
elif error_code == 44:
print('<p class="text-danger">Error 44: Connection lost</p>')
send_error_notification(device_id, "UHTTPER (error n°44) -> Connection lost", end="")
elif error_code == 73:
print('<p class="text-danger">Error 73: Secure socket connect error</p>', end="")
else:
print(f'<p class="text-danger">Unknown error code: {error_code}</p>', end="")
else:
print('<p class="text-danger">Could not extract error code from response</p>',end="")
if "ERROR" in response_SARA_9:
print('<p class="text-danger">⚠Sara Module returned an error</p>',end="")
print('<p class="text-warning">⚠set up hardware reboot here???</p>',end="")
# 2.2 code 1 (✅✅HHTP / UUHTTPCR succeded✅✅)
else:
# Si la commande HTTP a réussi
print('<span class="badge text-bg-success">✅✅HTTP operation successful.</span>',end="")
#update SARA_network_status
update_config_sqlite('SARA_network_status', 'connected')
#4. Read reply from server
print("Reply from server:")
ser_sara.write(b'AT+URDFILE="server_response.txt"\r')
response_SARA_4 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-success">')
print(response_SARA_4)
print("</p>", end="")
#Parse the server datetime
# Extract just the date from the response
date_string = None
date_start = response_SARA_4.find("Date: ")
if date_start != -1:
date_end = response_SARA_4.find("\n", date_start)
date_string = response_SARA_4[date_start + 6:date_end].strip()
print(f'<div class="text-primary">Server date: {date_string}</div>', end="")
# Optionally convert to datetime object
try:
from datetime import datetime
server_datetime = datetime.strptime(
date_string,
"%a, %d %b %Y %H:%M:%S %Z"
)
#print(f'<p class="text-primary">Parsed datetime: {server_datetime}</p>')
except Exception as e:
print(f'<p class="text-warning">Error parsing date: {e}</p>')
# Get RTC time from SQLite
cursor.execute("SELECT * FROM timestamp_table LIMIT 1")
row = cursor.fetchone()
rtc_time_str = row[1] # '2025-02-07 12:30:45' or '2000-01-01 00:55:21' or 'not connected'
print(f'<div class="text-primary">RTC time: {rtc_time_str}</div>', end="")
# Compare times if both are available
if server_datetime and rtc_time_str != 'not connected':
try:
# Convert RTC time string to datetime
rtc_datetime = datetime.strptime(rtc_time_str, '%Y-%m-%d %H:%M:%S')
# Calculate time difference in seconds
time_diff = abs((server_datetime - rtc_datetime).total_seconds())
print(f'<div class="text-primary">Time difference: {time_diff:.2f} seconds</div>', end="")
# Check if difference is more than 60 seconds
# and update the RTC clock
if time_diff > 60:
print(f'<div class="text-warning"><strong>⚠️ RTC time differs from server time by {time_diff:.2f} seconds!</strong></div>', end="")
# Format server time for RTC update
server_time_formatted = server_datetime.strftime('%Y-%m-%d %H:%M:%S')
#update RTC module do not wait for answer, non blocking
#/usr/bin/python3 /var/www/moduleair_pro_4g/RTC/set_with_browserTime.py '2024-01-30 12:48:39'
# Launch RTC update script as non-blocking subprocess
import subprocess
update_command = [
"/usr/bin/python3",
"/var/www/moduleair_pro_4g/RTC/set_with_browserTime.py",
server_time_formatted
]
# Execute the command without waiting for result
subprocess.Popen(update_command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
print(f'<div class="text-warning">➡️ Updating RTC with server time: {server_time_formatted}</div>', end="")
else:
print(f'<div class="text-success">✅ RTC time is synchronized with server time (within 60 seconds)</div>')
except Exception as e:
print(f'<p class="text-warning">Error comparing times: {e}</p>')
#Si non ne recoit pas de réponse UHTTPCR
#on a peut etre une ERROR de type "+CME ERROR: No connection to phone"
else:
print('<span style="color: red;font-weight: bold;">No UUHTTPCR response</span>')
#Vérification de l'erreur
print("Getting type of error")
# Split the response into lines and search for "+CME ERROR:"
lines2 = response_SARA_3.strip().splitlines()
for line in lines2:
if "+CME ERROR" in line:
error_message = line.split("+CME ERROR:")[1].strip()
print("*****")
print('<span style="color: red;font-weight: bold;">⚠ATTENTION: CME ERROR⚠</span>')
print(f"Error type: {error_message}")
print("*****")
# Handle "No connection to phone" error
if error_message == "No connection to phone":
print('<span style="color: orange;font-weight: bold;">📞Try reconnect to network📞</span>')
#IMPORTANT!
# Reconnexion au réseau (AT+COPS)
#command = f'AT+COPS=1,2,{selected_networkID}\r'
command = f'AT+COPS=0\r'
ser_sara.write(command.encode('utf-8'))
responseReconnect = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["OK", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(responseReconnect)
print("</p>", end="")
# Handle "Operation not allowed" error
if error_message == "Operation not allowed":
print('<span style="color: orange;font-weight: bold;">❓Try Resetting the HTTP Profile❓</span>')
command = f'AT+UHTTP={aircarto_profile_id},1,"data.moduleair.fr"\r'
ser_sara.write(command.encode('utf-8'))
responseResetHTTP_profile = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(responseResetHTTP_profile)
print("</p>", end="")
if "ERROR" in line:
print("⛔Attention ERROR!⛔")
#Send notification (WIFI)
send_error_notification(device_id, "sara_error")
#Software Reboot
#software_reboot_success = modem_complete_reboot_and_reinitialize(modem_version, aircarto_profile_id)
#if software_reboot_success:
# print("Modem successfully rebooted and reinitialized")
#else:
# print("There were issues with the modem reboot/reinitialize process")
#Hardware Reboot
hardware_reboot_success = modem_hardware_reboot()
if hardware_reboot_success:
print("✅Modem successfully rebooted and reinitialized")
else:
print("⛔There were issues with the modem reboot/reinitialize process")
#5. empty json
print("Empty SARA memory:")
ser_sara.write(b'AT+UDELFILE="sensordata_csv.json"\r')
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-danger-emphasis">')
print(response_SARA_5)
print("</p>", end="")
if "+CME ERROR" in response_SARA_5:
print("⛔ Attention CME ERROR ⛔")
'''
_ ____ _
___ ___ _ __ __| | _ _/ ___| _ __ ___ | |_
/ __|/ _ \ '_ \ / _` | | | | \___ \| '_ \ / _ \| __|
\__ \ __/ | | | (_| | | |_| |___) | |_) | (_) | |_
|___/\___|_| |_|\__,_| \__,_|____/| .__/ \___/ \__|
|_|
'''
if send_uSpot:
print('<p class="fw-bold">➡SEND TO uSPOT SERVERS</p>', end="")
# 1. Open sensordata_json.json (with correct data size)
print("Open JSON:")
payload_string = json.dumps(payload_json) # Convert dict to JSON string
size_of_string = len(payload_string)
command = f'AT+UDWNFILE="sensordata_json.json",{size_of_string}\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_6 = read_complete_response(ser_sara, wait_for_lines=[">"], debug=False)
print(response_SARA_6)
time.sleep(1)
#2. Write to shell
print("Write to memory:")
ser_sara.write(payload_string.encode())
response_SARA_7 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_7)
#step 4: trigger the request (http_command=1 for GET and http_command=1 for POST)
print("****")
print("Trigger POST REQUEST")
command = f'AT+UHTTPC={uSpot_profile_id},4,"/moduleair?token=2AFF6dQk68daFZ","uSpot_server_response.txt","sensordata_json.json",4\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_8 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True)
print('<p class="text-danger-emphasis">')
print(response_SARA_8)
print("</p>", end="")
# si on recoit la réponse UHTTPCR
if "+UUHTTPCR" in response_SARA_8:
print("✅ Received +UUHTTPCR response.")
lines = response_SARA_8.strip().splitlines()
# 1.Vérifier si la réponse contient un message d'erreur CME
if "+CME ERROR" in lines[-1]:
print("*****")
print('<span style="color: red;font-weight: bold;">⛔ATTENTION: CME ERROR</span>')
print("error:", lines[-1])
print("*****")
#update status
# Gestion de l'erreur spécifique
if "No connection to phone" in lines[-1]:
print("No connection to the phone.")
elif "Operation not allowed" in lines[-1]:
print("Operation not allowed. This may require a different configuration.")
# Actions spécifiques pour ce type d'erreur
else:
# 2.Si la réponse contient une réponse HTTP valide
# Extract HTTP response code from the last line
# ATTENTION: lines[-1] renvoie l'avant dernière ligne et il peut y avoir un soucis avec le OK
# rechercher plutot
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
parts = http_response.split(',')
# 2.1 code 0 (HTTP failed)
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
print("*****")
print('<span style="color: red;font-weight: bold;">⛔ATTENTION: HTTP operation failed</span>')
print("*****")
# Get error code
print("Getting error code", end="")
command = f'AT+UHTTPER={uSpot_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_9b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-danger-emphasis">')
print(response_SARA_9b)
print("</p>", end="")
# Extract just the error code
error_code = extract_error_code(response_SARA_9b)
if error_code is not None:
# Display interpretation based on error code
if error_code == 0:
print('<p class="text-success">No error detected</p>')
# INVALID SERVER HOSTNAME
elif error_code == 4:
print('<p class="text-danger">Error 4: Invalid server Hostname</p>', end="")
send_error_notification(device_id, "UHTTPER (4) uSpot Invalid server Hostname")
server_hostname_resets = reset_server_hostname_https(uSpot_profile_id)
if server_hostname_resets:
print("✅server hostname reset successfully")
else:
print("⛔There were issues with the modem server hostname reinitialize process")
# SERVER CONNECTION ERROR
elif error_code == 11:
print('<p class="text-danger">Error 11: Server connection error</p>', end="")
elif error_code == 22:
print('<p class="text-danger">Error 22: PSD or CSD connection not established</p>', end="")
elif error_code == 26:
print('<p class="text-danger">Error 26: Connection timed out</p>')
elif error_code == 44:
print('<p class="text-danger">Error 44: Connection lost</p>')
elif error_code == 73:
print('<p class="text-danger">Error 73: Secure socket connect error</p>', end="")
send_error_notification(device_id, "uSpot - Secure socket connect error")
#Software Reboot ??
else:
print(f'<p class="text-danger">Unknown error code: {error_code}</p>',end="")
else:
print('<p class="text-danger">Could not extract error code from response</p>', end="")
#Pas forcément un moyen de résoudre le soucis
# 2.2 code 1 (✅✅HHTP / UUHTTPCR succeded✅✅)
else:
# Si la commande HTTP a réussi
print('<span style="font-weight: bold;">✅✅HTTP operation successful.</span>')
#4. Read reply from server
print("Reply from server:")
command = f'AT+URDFILE="uSpot_server_response.txt"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_4b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-success">')
print(response_SARA_4b)
print("</p>", end="")
# Initialize http_response_code to 0 as a default value
http_response_code = 0
# Safely extract HTTP code
try:
http_prefix = "HTTP/"
# response_SARA_4b is a string, not a function - use .find() method
http_pos = response_SARA_4b.find(http_prefix)
if http_pos != -1:
# Find the space after the HTTP version
space_pos = response_SARA_4b.find(" ", http_pos)
if space_pos != -1:
# Extract the code after the space
code_start = space_pos + 1
code_end = response_SARA_4b.find(" ", code_start)
if code_end != -1:
# Extract and convert to integer
http_code_str = response_SARA_4b[code_start:code_end]
http_response_code = int(http_code_str)
print(f"HTTP response code: {http_response_code}")
if http_response_code == 201:
print('<span style="font-weight: bold;">✅✅HTTP 201 ressource created.</span>')
elif http_response_code == 308:
print('<span style="font-weight: bold;"> ⚠HTTP 308 Redirect, need to set up HTTPS.</span>')
server_hostname_resets = reset_server_hostname_https(uSpot_profile_id)
if server_hostname_resets:
print("✅server hostname reset successfully")
else:
print("⛔There were issues with the modem server hostname reinitialize process")
except Exception as e:
# If any error occurs during parsing, keep the default value
print(f"Error parsing HTTP code: {e}")
#5. empty json
print("Empty SARA memory:")
command = f'AT+UDELFILE="sensordata_json.json"\r'
ser_sara.write((command + '\r').encode('utf-8'))
response_SARA_9t = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_9t)
# Calculate and print the elapsed time
elapsed_time = time.time() - start_time_script
print(f"Elapsed time: {elapsed_time:.2f} seconds")
print("<hr>")
except Exception as e:
print("An error occurred:", e)
traceback.print_exc() # This prints the full traceback