""" ____ _ ____ _ ____ _ ____ _ / ___| / \ | _ \ / \ / ___| ___ _ __ __| | | _ \ __ _| |_ __ _ \___ \ / _ \ | |_) | / _ \ \___ \ / _ \ '_ \ / _` | | | | |/ _` | __/ _` | ___) / ___ \| _ < / ___ \ ___) | __/ | | | (_| | | |_| | (_| | || (_| | |____/_/ \_\_| \_\/_/ \_\ |____/ \___|_| |_|\__,_| |____/ \__,_|\__\__,_| Main loop to gather data from sensor inside SQLite database: * NPM * Envea * I2C BME280 * Noise sensor and send it to AirCarto servers via SARA R4 HTTP post requests also send the timestamp (already stored inside the DB) ! /usr/bin/python3 /var/www/moduleair_pro_4g/loop/SARA_send_data_v2.py ATTENTION: # This script is triggered every minutes by /var/www/moduleair_pro_4g/master.py (as a service) CSV PAYLOAD (AirCarto Servers) Endpoint: data.moduleair.fr /pro_4G/data.php?sensor_id={device_id}×tamp={rtc_module_time} ATTENTION : do not change order ! CSV size: 18 {PM1},{PM25},{PM10},{temp},{hum},{press},{avg_noise},{max_noise},{min_noise},{envea_no2},{envea_h2s},{envea_o3},{4g_signal_quality} 0 -> PM1 (μg/m3) 1 -> PM25 (μg/m3) 2 -> PM10 (μg/m3) 3 -> temp 4 -> hum 5 -> press 6 -> avg_noise 7 -> max_noise 8 -> min_noise 9 -> envea_no2 10 -> envea_h2s 11 -> envea_nh3 12 -> 4G signal quality, 13 -> PM 0.2μm to 0.5μm quantity (Nb/L) 14 -> PM 0.5μm to 1.0μm quantity (Nb/L) 15 -> PM 1.0μm to 2.5μm quantity (Nb/L) 16 -> PM 2.5μm to 5.0μm quantity (Nb/L) 17 -> PM 5.0μm to 10μm quantity (Nb/L) 18 -> NPM temp inside 19 -> NPM hum inside JSON PAYLOAD (Micro-Spot Servers) Same as moduleair wifi Endpoint: api-prod.uspot.probesys.net moduleair?token=2AFF6dQk68daFZ port 443 {"moduleairid": "82D25549434", "software_version": "ModuleAirV2-V1-042022", "sensordatavalues": [ {"value_type":"NPM_P0","value":"1.54"}, {"value_type":"NPM_P1","value":"1.54"}, {"value_type":"NPM_P2","value":"1.54"}, {"value_type":"NPM_N1","value":"0.02"}, {"value_type":"NPM_N10","value":"0.02"}, {"value_type":"NPM_N25","value":"0.02"}, {"value_type":"MHZ16_CO2","value":"793.00"}, {"value_type":"SGP40_VOC","value":"29915.00"}, {"value_type":"samples","value":"134400"}, {"value_type":"min_micro","value":"137"}, {"value_type":"max_micro","value":"155030"}, {"value_type":"interval","value":"145000"}, {"value_type":"signal","value":"-80"}, {"value_type":"latitude","value":"43.2964"}, {"value_type":"longitude","value":"5.36978"}, {"value_type":"state_npm","value":"State: 00000000"}, {"value_type":"BME280_temperature","value":"28.47"}, {"value_type":"BME280_humidity","value":"28.47"}, {"value_type":"BME280_pressure","value":"28.47"}, {"value_type":"CAIRSENS_NO2","value":"54"}, {"value_type":"CAIRSENS_H2S","value":"54"}, {"value_type":"CAIRSENS_O3","value":"54"} ] } """ import board import json import serial import time import busio import re import os import traceback import sys import sqlite3 import RPi.GPIO as GPIO from threading import Thread from datetime import datetime # Record the start time of the script start_time_script = time.time() # Check system uptime with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) # Skip execution if uptime is less than 2 minutes (120 seconds) if uptime_seconds < 120: print(f"System just booted ({uptime_seconds:.2f} seconds uptime), skipping execution.") sys.exit() #Payload CSV to be sent to data.moduleair.fr payload_csv = [None] * 25 #Payload JSON to be sent to uSpot payload_json = { "moduleairid": "XXX", "software_version": "ModuleAirV2-V1-042022", "sensordatavalues": [] # Empty list to start with } # SARA R4 UHTTPC profile IDs aircarto_profile_id = 0 uSpot_profile_id = 1 # database connection conn = sqlite3.connect("/var/www/moduleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() def blink_led(pin, blink_count, delay=1): """ Blink an LED on a specified GPIO pin. Args: pin (int): GPIO pin number (BCM mode) to which the LED is connected. blink_count (int): Number of times the LED should blink. delay (float): Time in seconds for the LED to stay ON or OFF (default is 1 second). """ # GPIO setup GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) # Use BCM numbering GPIO.setup(pin, GPIO.OUT) # Set the specified pin as an output try: for _ in range(blink_count): GPIO.output(pin, GPIO.HIGH) # Turn the LED on #print(f"LED on GPIO {pin} is ON") time.sleep(delay) # Wait for the specified delay GPIO.output(pin, GPIO.LOW) # Turn the LED off #print(f"LED on GPIO {pin} is OFF") time.sleep(delay) # Wait for the specified delay finally: GPIO.cleanup(pin) # Clean up the specific pin to reset its state print(f"GPIO {pin} cleaned up") #get config data from SQLite table def load_config_sqlite(): """ Load configuration data from SQLite config table Returns: dict: Configuration data with proper type conversion """ try: # Query the config table cursor.execute("SELECT key, value, type FROM config_table") rows = cursor.fetchall() # Create config dictionary config_data = {} for key, value, type_name in rows: # Convert value based on its type if type_name == 'bool': config_data[key] = value == '1' or value == 'true' elif type_name == 'int': config_data[key] = int(value) elif type_name == 'float': config_data[key] = float(value) else: config_data[key] = value return config_data except Exception as e: print(f"Error loading config from SQLite: {e}") return {} def load_config_scripts_sqlite(): """ Load script configuration data from SQLite config_scripts_table Returns: dict: Script paths as keys and enabled status as boolean values """ try: # Query the config_scripts_table cursor.execute("SELECT script_path, enabled FROM config_scripts_table") rows = cursor.fetchall() # Create config dictionary with script paths as keys and enabled status as boolean values scripts_config = {} for script_path, enabled in rows: # Convert integer enabled value (0/1) to boolean scripts_config[script_path] = bool(enabled) return scripts_config except Exception as e: print(f"Error loading scripts config from SQLite: {e}") return {} # Define the config file path config_file = '/var/www/moduleair_pro_4g/config.json' #Load config config = load_config_sqlite() #config device_id = config.get('deviceID', 'unknown') device_id = device_id.upper() modem_config_mode = config.get('modem_config_mode', False) device_latitude_raw = config.get('latitude_raw', 0) device_longitude_raw = config.get('longitude_raw', 0) modem_version=config.get('modem_version', "") Sara_baudrate = config.get('SaraR4_baudrate', 115200) npm_5channel = config.get('npm_5channel', False) #5 canaux du NPM selected_networkID = int(config.get('SARA_R4_neworkID', 0)) send_uSpot = config.get('send_uSpot', False) #envoi sur MicroSpot () reset_uSpot_url = False #config_scripts config_scripts = load_config_scripts_sqlite() bme_280_config = config_scripts.get('BME280/get_data_v2.py', False) envea_cairsens= config_scripts.get('envea/read_value_v2.py', False) co2_mhz19= config_scripts.get('MH-Z19/write_data.py', False) sensirion_sfa30= config_scripts.get('sensirion/SFA30_read.py', False) #update device id in the payload json payload_json["moduleairid"] = device_id # Skip execution if modem_config_mode is true if modem_config_mode: print("Modem 4G (SARA R4) is in config mode -> EXIT") sys.exit() ser_sara = serial.Serial( port='/dev/ttyAMA2', baudrate=Sara_baudrate, #115200 ou 9600 parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 2 ) def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True): ''' Fonction très importante !!! Reads the complete response from a serial connection and waits for specific lines. ''' if wait_for_lines is None: wait_for_lines = [] # Default to an empty list if not provided response = bytearray() serial_connection.timeout = timeout end_time = time.time() + end_of_response_timeout start_time = time.time() while True: elapsed_time = time.time() - start_time # Time since function start if serial_connection.in_waiting > 0: data = serial_connection.read(serial_connection.in_waiting) response.extend(data) end_time = time.time() + end_of_response_timeout # Reset timeout on new data # Decode and check for any target line decoded_response = response.decode('utf-8', errors='replace') for target_line in wait_for_lines: if target_line in decoded_response: if debug: print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)") return decoded_response # Return response immediately if a target line is found elif time.time() > end_time: if debug: print(f"[DEBUG] Timeout reached. No more data received.") break time.sleep(0.1) # Short sleep to prevent busy waiting # Final response and debug output total_elapsed_time = time.time() - start_time if debug: print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️") # Check if the elapsed time exceeded 10 seconds if total_elapsed_time > 10 and debug: print(f"[ALERT] 🚨 The operation took too long 🚨") print(f'[ALERT] ⚠️{total_elapsed_time:.2f}s⚠️') return response.decode('utf-8', errors='replace') # Return the full response if no target line is found def extract_error_code(response): """ Extract just the error code from AT+UHTTPER response """ for line in response.split('\n'): if '+UHTTPER' in line: try: # Split the line and get the third value (error code) parts = line.split(':')[1].strip().split(',') if len(parts) >= 3: error_code = int(parts[2]) return error_code except: pass # Return None if we couldn't find the error code return None def send_error_notification(device_id, error_type, additional_info=None): """ Send an error notification to the server when issues with the SARA module occur. Will silently fail if there's no internet connection. Parameters: ----------- device_id : str The unique identifier of the device error_type : str Type of error encountered (e.g., 'serial_error', 'cme_error', 'http_error', 'timeout') additional_info : str, optional Any additional information about the error for logging purposes Returns: -------- bool True if notification was sent successfully, False otherwise """ # Create the alert URL with all relevant parameters base_url = 'http://data.nebuleair.fr/pro_4G/alert.php' alert_url = f'{base_url}?capteur_id={device_id}&error_type={error_type}' # Add additional info if provided if additional_info: # Make sure to URL encode the additional info from urllib.parse import quote alert_url += f'&details={quote(str(additional_info))}' # Try to send the notification, catch ALL exceptions try: response = requests.post(alert_url, timeout=3) if response.status_code == 200: print(f"✅ Alert notification sent successfully") return True else: print(f"⚠️ Alert notification failed: Status code {response.status_code}") except Exception as e: print(f"⚠️ Alert notification couldn't be sent: {e}") return False def modem_complete_reboot_and_reinitialize(modem_version, aircarto_profile_id): """ Performs a complete modem restart sequence: 1. Reboots the modem using the appropriate command for its version 2. Waits for the modem to restart 3. Resets the HTTP profile 4. For SARA-R5, resets the PDP connection Args: modem_version (str): The modem version, e.g., 'SARA-R500' or 'SARA-R410' aircarto_profile_id (int): The HTTP profile ID to reset Returns: bool: True if the complete sequence was successful, False otherwise """ print('🔄 Complete SARA reboot and reinitialize sequence 🔄') # Step 1: Reboot the modem - Integrated modem_software_reboot logic print('🔄 Software SARA reboot! 🔄') # Use different commands based on modem version if 'R5' in modem_version: # For SARA-R5 series command = 'AT+CFUN=16\r' # Normal restart for R5 else: # For SARA-R4 series command = 'AT+CFUN=15\r' # Factory reset for R4 ser_sara.write(command.encode('utf-8')) response = read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], debug=True) print('
') print(response) print("
", end="") # Check if reboot command was acknowledged reboot_success = response is not None and "OK" in response if not reboot_success: print("⚠️ Modem reboot command failed") return False # Step 2: Wait for the modem to restart (adjust time as needed) print("Waiting for modem to restart...") time.sleep(15) # 15 seconds should be enough for most modems to restart # Step 3: Check if modem is responsive after reboot print("Checking if modem is responsive...") ser_sara.write(b'AT\r') response_check = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=True) if response_check is None or "OK" not in response_check: print("⚠️ Modem not responding after reboot") return False print("✅ Modem restarted successfully") # Step 4: Reset the HTTP Profile print('🔧 Resetting the HTTP Profile') command = f'AT+UHTTP={aircarto_profile_id},1,"data.nebuleair.fr"\r' ser_sara.write(command.encode('utf-8')) responseResetHTTP = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR"], debug=True) print('') print(responseResetHTTP) print("
", end="") http_reset_success = responseResetHTTP is not None and "OK" in responseResetHTTP if not http_reset_success: print("⚠️ HTTP profile reset failed") # Continue anyway, don't return False here # Step 5: For SARA-R5, reset the PDP connection pdp_reset_success = True if modem_version == "SARA-R500": print("⚠️ Need to reset PDP connection for SARA-R500") # Activate PDP context 1 print('➡️ Activate PDP context 1') command = f'AT+CGACT=1,1\r' ser_sara.write(command.encode('utf-8')) response_pdp1 = read_complete_response(ser_sara, wait_for_lines=["OK"]) print(response_pdp1, end="") pdp_reset_success = pdp_reset_success and (response_pdp1 is not None and "OK" in response_pdp1) time.sleep(1) # Set the PDP type print('➡️ Set the PDP type to IPv4 referring to the output of the +CGDCONT read command') command = f'AT+UPSD=0,0,0\r' ser_sara.write(command.encode('utf-8')) response_pdp2 = read_complete_response(ser_sara, wait_for_lines=["OK"]) print(response_pdp2, end="") pdp_reset_success = pdp_reset_success and (response_pdp2 is not None and "OK" in response_pdp2) time.sleep(1) # Profile #0 is mapped on CID=1 print('➡️ Profile #0 is mapped on CID=1.') command = f'AT+UPSD=0,100,1\r' ser_sara.write(command.encode('utf-8')) response_pdp3 = read_complete_response(ser_sara, wait_for_lines=["OK"]) print(response_pdp3, end="") pdp_reset_success = pdp_reset_success and (response_pdp3 is not None and "OK" in response_pdp3) time.sleep(1) # Activate the PSD profile print('➡️ Activate the PSD profile #0: the IPv4 address is already assigned by the network.') command = f'AT+UPSDA=0,3\r' ser_sara.write(command.encode('utf-8')) response_pdp4 = read_complete_response(ser_sara, wait_for_lines=["OK", "+UUPSDA"]) print(response_pdp4, end="") pdp_reset_success = pdp_reset_success and (response_pdp4 is not None and ("OK" in response_pdp4 or "+UUPSDA" in response_pdp4)) time.sleep(1) if not pdp_reset_success: print("⚠️ PDP connection reset had some issues") # Return overall success return http_reset_success and pdp_reset_success try: ''' _ ___ ___ ____ | | / _ \ / _ \| _ \ | | | | | | | | | |_) | | |__| |_| | |_| | __/ |_____\___/ \___/|_| ''' print('') print(response2) print("
", end="") #Here it's possible that the SARA do not repond at all or send a error message #-> TO DO : harware reboot #-> send notification #-> end loop, no need to continue #1. No answer at all form SARA if response2 is None or response2 == "": print("No answer from SARA module") print('🛑STOP LOOP🛑') print("') print(responseReconnect) print("
", end="") print('🛑STOP LOOP🛑') print("') print(response_SARA_3) print("
", end="") # si on recoit la réponse UHTTPCR if "+UUHTTPCR" in response_SARA_3: print("✅ Received +UUHTTPCR response.") # Les types de réponse # 1.La commande n'a pas fonctionné # +CME ERROR: No connection to phone # +CME ERROR: Operation not allowed # 2.La commande fonctionne: elle renvoie un code # +UUHTTPCR:') print(response_SARA_9) print("
", end="") # Extract just the error code error_code = extract_error_code(response_SARA_9) if error_code is not None: # Display interpretation based on error code if error_code == 0: print('No error detected
') elif error_code == 4: print('Error 4: Invalid server Hostname
') elif error_code == 11: print('Error 11: Server connection error
') elif error_code == 22: print('⚠️Error 22: PSD or CSD connection not established (SARA-R5 need to reset PDP conection)⚠️
') elif error_code == 73: print('Error 73: Secure socket connect error
') else: print(f'Unknown error code: {error_code}
') else: print('Could not extract error code from response
') #Software Reboot software_reboot_success = modem_complete_reboot_and_reinitialize(modem_version, aircarto_profile_id) if software_reboot_success: print("Modem successfully rebooted and reinitialized") else: print("There were issues with the modem reboot/reinitialize process") # 2.2 code 1 (✅✅HHTP / UUHTTPCR succeded✅✅) else: # Si la commande HTTP a réussi print('✅✅HTTP operation successful.') print("Blink blue LED") led_thread = Thread(target=blink_led, args=(23, 5, 0.5)) led_thread.start() #4. Read reply from server print("Reply from server:") ser_sara.write(b'AT+URDFILE="server_response.txt"\r') response_SARA_4 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print('') print(response_SARA_4) print("
", end="") #Parse the server datetime # Extract just the date from the response date_string = None date_start = response_SARA_4.find("Date: ") if date_start != -1: date_end = response_SARA_4.find("\n", date_start) date_string = response_SARA_4[date_start + 6:date_end].strip() print(f'Parsed datetime: {server_datetime}
') except Exception as e: print(f'Error parsing date: {e}
') # Get RTC time from SQLite cursor.execute("SELECT * FROM timestamp_table LIMIT 1") row = cursor.fetchone() rtc_time_str = row[1] # '2025-02-07 12:30:45' or '2000-01-01 00:55:21' or 'not connected' print(f'Error comparing times: {e}
') #Si non ne recoit pas de réponse UHTTPCR #on a peut etre une ERROR de type "+CME ERROR: No connection to phone" else: print('No UUHTTPCR response') print("Blink red LED") # Run LED blinking in a separate thread led_thread = Thread(target=blink_led, args=(24, 5, 0.5)) led_thread.start() #Vérification de l'erreur print("Getting type of error") # Split the response into lines and search for "+CME ERROR:" lines2 = response_SARA_3.strip().splitlines() for line in lines2: if "+CME ERROR" in line: error_message = line.split("+CME ERROR:")[1].strip() print("*****") print('⚠️ATTENTION: CME ERROR⚠️') print(f"Error type: {error_message}") print("*****") # Handle "No connection to phone" error if error_message == "No connection to phone": print('📞Try reconnect to network📞') #IMPORTANT! # Reconnexion au réseau (AT+COPS) #command = f'AT+COPS=1,2,{selected_networkID}\r' command = f'AT+COPS=0\r' ser_sara.write(command.encode('utf-8')) responseReconnect = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["OK", "+CME ERROR"], debug=True) print('') print(responseReconnect) print("
", end="") # Handle "Operation not allowed" error if error_message == "Operation not allowed": print('❓Try Resetting the HTTP Profile❓') command = f'AT+UHTTP={aircarto_profile_id},1,"data.moduleair.fr"\r' ser_sara.write(command.encode('utf-8')) responseResetHTTP_profile = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR"], debug=True) print('') print(responseResetHTTP_profile) print("
", end="") if "ERROR" in line: print("⛔Attention ERROR!⛔") #Send notification (WIFI) send_error_notification(device_id, "sara_error") #Software Reboot software_reboot_success = modem_complete_reboot_and_reinitialize(modem_version, aircarto_profile_id) if software_reboot_success: print("Modem successfully rebooted and reinitialized") else: print("There were issues with the modem reboot/reinitialize process") #5. empty json print("Empty SARA memory:") ser_sara.write(b'AT+UDELFILE="sensordata_csv.json"\r') response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print('') print(response_SARA_5) print("
", end="") if "+CME ERROR" in response_SARA_5: print("⛔ Attention CME ERROR ⛔") # Calculate and print the elapsed time elapsed_time = time.time() - start_time_script print(f"Elapsed time: {elapsed_time:.2f} seconds") print("