Files
nebuleair_pro_4g/SARA/sara_ping.py
Your Name 3d507ae659 update
2025-02-27 11:48:42 +01:00

135 lines
5.0 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
'''
____ _ ____ _
/ ___| / \ | _ \ / \
\___ \ / _ \ | |_) | / _ \
___) / ___ \| _ < / ___ \
|____/_/ \_\_| \_\/_/ \_\
Script to do a ping request to data.nebuleair.fr/ping.php
python3 /var/www/nebuleair_pro_4g/SARA/sara_ping.py
'''
import serial
import time
import sys
import json
# SARA R4 UHTTPC profile IDs
aircarto_profile_id = 0
#get baudrate
def load_config(config_file):
try:
with open(config_file, 'r') as file:
config_data = json.load(file)
return config_data
except Exception as e:
print(f"Error loading config file: {e}")
return {}
# Define the config file path
config_file = '/var/www/nebuleair_pro_4g/config.json'
# Load the configuration data
config = load_config(config_file)
# Access the shared variables
baudrate = config.get('SaraR4_baudrate', 115200)
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=baudrate, #115200 ou 9600
parity=serial.PARITY_NONE, #PARITY_NONE, PARITY_EVEN or PARITY_ODD
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=True):
'''
Fonction très importante !!!
Reads the complete response from a serial connection and waits for specific lines.
'''
if wait_for_lines is None:
wait_for_lines = [] # Default to an empty list if not provided
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
elapsed_time = time.time() - start_time # Time since function start
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout # Reset timeout on new data
# Decode and check for any target line
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
if debug:
print(f"[DEBUG] 🔎 Found target line: {target_line} (in {elapsed_time:.2f}s)")
return decoded_response # Return response immediately if a target line is found
elif time.time() > end_time:
if debug:
print(f"[DEBUG] Timeout reached. No more data received.")
break
time.sleep(0.1) # Short sleep to prevent busy waiting
# Final response and debug output
total_elapsed_time = time.time() - start_time
if debug:
print(f"[DEBUG] ⏱️ elapsed time: {total_elapsed_time:.2f}s. ⏱️")
# Check if the elapsed time exceeded 10 seconds
if total_elapsed_time > 10 and debug:
print(f"[ALERT] 🚨 The operation took too long 🚨")
print(f'<span style="color: red;font-weight: bold;">[ALERT] ⚠️{total_elapsed_time:.2f}s⚠</span>')
return response.decode('utf-8', errors='replace') # Return the full response if no target line is found
try:
#3. Send to endpoint (with device ID)
print("Send data (GET REQUEST):")
command= f'AT+UHTTPC={aircarto_profile_id},1,"/ping.php","aircarto_server_response.txt"\r'
ser_sara.write(command.encode('utf-8'))
response_SARA_3 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True)
print(response_SARA_3)
# si on recoit la réponse UHTTPCR
if "+UUHTTPCR" in response_SARA_3:
print("✅ Received +UUHTTPCR response.")
# Split response into lines
lines = response_SARA_3.strip().splitlines()
# 1.Vérifier si la réponse contient un message d'erreur CME
if "+CME ERROR" in lines[-1]:
print("error ⛔")
else:
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
parts = http_response.split(',')
# 2.1 code 0 (HTTP failed) ⛔⛔⛔
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
print("⛔ATTENTION: HTTP operation failed")
# 2.2 code 1 (HHTP succeded)
else:
# Si la commande HTTP a réussi
print("✅✅HTTP operation successful")
#4. Read reply from server
print("Reply from server:")
ser_sara.write(b'AT+URDFILE="aircarto_server_response.txt"\r')
response_SARA_4 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
print(response_SARA_4)
except serial.SerialException as e:
print(f"Error: {e}")
finally:
if ser_sara.is_open:
ser_sara.close()
#print("Serial closed")