Files
nebuleair_pro_4g/SARA/sara_test_udp.py
PaulVua b8b70a5a54 Page modem: section tests Miotiq UDP + masquer tests HTTP si SIM Miotiq
- Nouveau script SARA/sara_test_udp.py (test socket UDP vers 192.168.0.20:4242)
- Section "Tests Miotiq (UDP)" avec PSD setup, test socket, placeholder aller-retour
- Masque les tests HTTP/Send message quand send_miotiq est actif
- Endpoint launcher.php sara_test_udp

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 15:54:39 +02:00

155 lines
5.5 KiB
Python

r'''
____ _ ____ _
/ ___| / \ | _ \ / \
\___ \ / _ \ | |_) | / _ \
___) / ___ \| _ < / ___ \
|____/_/ \_\_| \_\/_/ \_\
Test UDP socket connectivity to Miotiq private network (192.168.0.20:4242)
Creates a UDP socket, connects, writes a small test payload, and closes.
Each AT step is verified for errors.
/usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/sara_test_udp.py
'''
import serial
import time
import sys
import re
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=False):
if wait_for_lines is None:
wait_for_lines = []
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
return decoded_response
elif time.time() > end_time:
break
time.sleep(0.1)
return response.decode('utf-8', errors='replace')
# Result tracking
steps = []
def log_step(name, success, detail=""):
steps.append({"name": name, "success": success, "detail": detail})
icon = "" if success else ""
print(f'{icon} {name}')
if detail:
print(f'<small class="text-muted">{detail}</small>')
try:
sys.stdout.reconfigure(line_buffering=True)
ser_sara.reset_input_buffer()
# Step 1: Create UDP socket (protocol 17 = UDP)
command = 'AT+USOCR=17\r'
ser_sara.write(command.encode('utf-8'))
response = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"])
if "+CME ERROR" in response or "ERROR" in response:
log_step("Création socket UDP", False, "AT+USOCR=17 → erreur. La connexion PDP n'est peut-être pas établie.")
print('<br><strong>Suggestion :</strong> Lancez "Setup PSD connection" puis réessayez.')
sys.exit(1)
# Extract socket ID
match = re.search(r'\+USOCR:\s*(\d+)', response)
if not match:
log_step("Création socket UDP", False, "Impossible d'extraire le socket ID")
sys.exit(1)
socket_id = match.group(1)
log_step("Création socket UDP", True, f"Socket ID: {socket_id}")
# Step 2: Connect to Miotiq server
ser_sara.reset_input_buffer()
command = f'AT+USOCO={socket_id},"192.168.0.20",4242\r'
ser_sara.write(command.encode('utf-8'))
response = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR", "ERROR"])
if "+CME ERROR" in response or "ERROR" in response:
log_step("Connexion 192.168.0.20:4242", False, f"Erreur connexion au serveur Miotiq")
# Close socket
ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8'))
read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1)
sys.exit(1)
log_step("Connexion 192.168.0.20:4242", True)
# Step 3: Write test payload (4 bytes: "TEST")
test_payload = b'TEST'
ser_sara.reset_input_buffer()
command = f'AT+USOWR={socket_id},{len(test_payload)}\r'
ser_sara.write(command.encode('utf-8'))
response = read_complete_response(ser_sara, wait_for_lines=["@", "OK", "+CME ERROR", "ERROR"])
if "@" not in response:
log_step("Écriture payload test", False, "Le modem n'a pas envoyé le prompt @")
ser_sara.reset_input_buffer()
ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8'))
read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1)
sys.exit(1)
# Send the actual bytes
ser_sara.write(test_payload)
response = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"])
if "+CME ERROR" in response or "ERROR" in response:
log_step("Écriture payload test", False, "Erreur lors de l'envoi des données")
ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8'))
read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1)
sys.exit(1)
log_step("Écriture payload test", True, f"{len(test_payload)} bytes envoyés")
# Step 4: Close socket
ser_sara.reset_input_buffer()
command = f'AT+USOCL={socket_id}\r'
ser_sara.write(command.encode('utf-8'))
response = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"])
if "+CME ERROR" in response or "ERROR" in response:
log_step("Fermeture socket", False)
else:
log_step("Fermeture socket", True)
# Summary
all_ok = all(s["success"] for s in steps)
if all_ok:
print('<br><strong class="text-success">Toutes les étapes OK — le modem peut envoyer des données UDP vers Miotiq.</strong>')
else:
print('<br><strong class="text-danger">Certaines étapes ont échoué.</strong>')
except serial.SerialException as e:
print(f'❌ Erreur série: {e}')
except Exception as e:
print(f'❌ Erreur: {e}')
finally:
if ser_sara.is_open:
ser_sara.close()