r''' ____ _ ____ _ / ___| / \ | _ \ / \ \___ \ / _ \ | |_) | / _ \ ___) / ___ \| _ < / ___ \ |____/_/ \_\_| \_\/_/ \_\ Test UDP socket connectivity to Miotiq private network (192.168.0.20:4242) Creates a UDP socket, connects, writes a small test payload, and closes. Each AT step is verified for errors. /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/sara_test_udp.py ''' import serial import time import sys import re ser_sara = serial.Serial( port='/dev/ttyAMA2', baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=2 ) def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None, debug=False): if wait_for_lines is None: wait_for_lines = [] response = bytearray() serial_connection.timeout = timeout end_time = time.time() + end_of_response_timeout start_time = time.time() while True: if serial_connection.in_waiting > 0: data = serial_connection.read(serial_connection.in_waiting) response.extend(data) end_time = time.time() + end_of_response_timeout decoded_response = response.decode('utf-8', errors='replace') for target_line in wait_for_lines: if target_line in decoded_response: return decoded_response elif time.time() > end_time: break time.sleep(0.1) return response.decode('utf-8', errors='replace') # Result tracking steps = [] def log_step(name, success, detail=""): steps.append({"name": name, "success": success, "detail": detail}) icon = "✅" if success else "❌" print(f'{icon} {name}') if detail: print(f'{detail}') try: sys.stdout.reconfigure(line_buffering=True) ser_sara.reset_input_buffer() # Step 1: Create UDP socket (protocol 17 = UDP) command = 'AT+USOCR=17\r' ser_sara.write(command.encode('utf-8')) response = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"]) if "+CME ERROR" in response or "ERROR" in response: log_step("Création socket UDP", False, "AT+USOCR=17 → erreur. La connexion PDP n'est peut-être pas établie.") print('
Suggestion : Lancez "Setup PSD connection" puis réessayez.') sys.exit(1) # Extract socket ID match = re.search(r'\+USOCR:\s*(\d+)', response) if not match: log_step("Création socket UDP", False, "Impossible d'extraire le socket ID") sys.exit(1) socket_id = match.group(1) log_step("Création socket UDP", True, f"Socket ID: {socket_id}") # Step 2: Connect to Miotiq server ser_sara.reset_input_buffer() command = f'AT+USOCO={socket_id},"192.168.0.20",4242\r' ser_sara.write(command.encode('utf-8')) response = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=5, wait_for_lines=["OK", "+CME ERROR", "ERROR"]) if "+CME ERROR" in response or "ERROR" in response: log_step("Connexion 192.168.0.20:4242", False, f"Erreur connexion au serveur Miotiq") # Close socket ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8')) read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1) sys.exit(1) log_step("Connexion 192.168.0.20:4242", True) # Step 3: Write test payload (4 bytes: "TEST") test_payload = b'TEST' ser_sara.reset_input_buffer() command = f'AT+USOWR={socket_id},{len(test_payload)}\r' ser_sara.write(command.encode('utf-8')) response = read_complete_response(ser_sara, wait_for_lines=["@", "OK", "+CME ERROR", "ERROR"]) if "@" not in response: log_step("Écriture payload test", False, "Le modem n'a pas envoyé le prompt @") ser_sara.reset_input_buffer() ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8')) read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1) sys.exit(1) # Send the actual bytes ser_sara.write(test_payload) response = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"]) if "+CME ERROR" in response or "ERROR" in response: log_step("Écriture payload test", False, "Erreur lors de l'envoi des données") ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8')) read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1) sys.exit(1) log_step("Écriture payload test", True, f"{len(test_payload)} bytes envoyés") # Step 4: Close socket ser_sara.reset_input_buffer() command = f'AT+USOCL={socket_id}\r' ser_sara.write(command.encode('utf-8')) response = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"]) if "+CME ERROR" in response or "ERROR" in response: log_step("Fermeture socket", False) else: log_step("Fermeture socket", True) # Summary all_ok = all(s["success"] for s in steps) if all_ok: print('
Toutes les étapes OK — le modem peut envoyer des données UDP vers Miotiq.') else: print('
Certaines étapes ont échoué.') except serial.SerialException as e: print(f'❌ Erreur série: {e}') except Exception as e: print(f'❌ Erreur: {e}') finally: if ser_sara.is_open: ser_sara.close()