Files
nebuleair_pro_4g/SARA/sara_ping.py
Your Name 04fbf81798 update
2025-11-05 14:50:29 +01:00

166 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
r'''
____ _ ____ _
/ ___| / \ | _ \ / \
\___ \ / _ \ | |_) | / _ \
___) / ___ \| _ < / ___ \
|____/_/ \_\_| \_\/_/ \_\
Script to do a ping request to data.nebuleair.fr/ping.php
python3 /var/www/nebuleair_pro_4g/SARA/sara_ping.py
'''
import serial
import time
import sys
import json
# SARA R4 UHTTPC profile IDs
aircarto_profile_id = 0
baudrate = 115200
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=baudrate, #115200 ou 9600
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
'''
Fonction très importante !!!
Reads the complete response from a serial connection and waits for specific lines.
'''
if wait_for_lines is None:
wait_for_lines = [] # Default to an empty list if not provided
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
elapsed_time = time.time() - start_time # Time since function start
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout # Reset timeout on new data
# Decode and check for any target line
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
if debug:
print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)")
return decoded_response # Return response immediately if a target line is found
elif time.time() > end_time:
if debug:
print(f"[DEBUG] Timeout reached. No more data received.")
break
time.sleep(0.1) # Short sleep to prevent busy waiting
# Final response and debug output
total_elapsed_time = time.time() - start_time
if debug:
print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️")
# Check if the elapsed time exceeded 10 seconds
if total_elapsed_time > 10 and debug:
print(f"[ALERT] 🚨 The operation took too long 🚨")
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠</span>')
return response.decode('utf-8', errors='replace') # Return the full response if no target line is found
def extract_error_code(response):
"""
Extract just the error code from AT+UHTTPER response
"""
for line in response.split('\n'):
if '+UHTTPER' in line:
try:
# Split the line and get the third value (error code)
parts = line.split(':')[1].strip().split(',')
if len(parts) >= 3:
error_code = int(parts[2])
return error_code
except:
pass
# Return None if we couldn't find the error code
return None
try:
#3. Send to endpoint (with device ID)
print("Send data (GET REQUEST):")
command= f'AT+UHTTPC={aircarto_profile_id},1,"/ping.php","aircarto_server_response.txt"\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=20, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True)
print(response_SARA_3)
# si on recoit la réponse UHTTPCR
if "+UUHTTPCR" in response_SARA_3:
print("✅ Received +UUHTTPCR response.")
# Split response into lines
lines = response_SARA_3.strip().splitlines()
# 1.Vérifier si la réponse contient un message d'erreur CME
if "+CME ERROR" in lines[-1]:
print("error ⛔")
else:
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
parts = http_response.split(',')
# 2.1 code 0 (HTTP failed) ⛔⛔⛔
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
print("⛔⛔ATTENTION: HTTP operation failed")
#get error code
print("Getting error code (11->Server connection error, 73->Secure socket connect error)")
command = f'AT+UHTTPER={aircarto_profile_id}\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_9 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print('<p class="text-danger-emphasis">')
print(response_SARA_9)
print("</p>", end="")
# Extract just the error code
error_code = extract_error_code(response_SARA_9)
if error_code is not None:
# Display interpretation based on error code
if error_code == 0:
print('<p class="text-success">No error detected</p>')
elif error_code == 4:
print('<p class="text-danger">Error 4: Invalid server Hostname</p>')
elif error_code == 11:
print('<p class="text-danger">Error 11: Server connection error</p>')
elif error_code == 22:
print('<p class="text-danger">Error 22: PSD or CSD connection not established</p>')
elif error_code == 73:
print('<p class="text-danger">Error 73: Secure socket connect error</p>')
else:
print(f'<p class="text-danger">Unknown error code: {error_code}</p>')
else:
print('<p class="text-danger">Could not extract error code from response</p>')
# 2.2 code 1 (HHTP succeded)
else:
# Si la commande HTTP a réussi
print("✅✅HTTP operation successful")
#4. Read reply from server
print("Reply from server:")
ser_sara.write(b'AT+URDFILE="aircarto_server_response.txt"\r')
response_SARA_4 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_4)
except serial.SerialException as e:
print(f"Error: {e}")
finally:
if ser_sara.is_open:
ser_sara.close()
#print("Serial closed")