r''' ____ _ ____ _ / ___| / \ | _ \ / \ \___ \ / _ \ | |_) | / _ \ ___) / ___ \| _ < / ___ \ |____/_/ \_\_| \_\/_/ \_\ Test aller-retour Miotiq: 1. Construit un payload 100 bytes avec device_id + command=0x02 (ping) 2. Cree un socket UDP binde sur un port local fixe (LISTEN_PORT) 3. Envoie via AT+USOST vers 192.168.0.20:4242 (mode non-connecte) 4. Ecoute avec AT+USORF pour la reponse descendante Miotiq (~15s) Le serveur doit envoyer le downlink via l'API Miotiq sendToDevice avec dstPort = LISTEN_PORT (33333). /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/sara_ping_miotiq.py ''' import serial import time import sys import re import sqlite3 # --- Config --- MIOTIQ_IP = "192.168.0.20" MIOTIQ_PORT = 4242 LISTEN_PORT = 33333 # Port local sur lequel le capteur ecoute le downlink PAYLOAD_SIZE = 100 COMMAND_PING = 0x02 LISTEN_TIMEOUT = 15 # seconds to wait for downlink response # --- Load device_id from SQLite --- def load_device_id(): try: conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() cursor.execute("SELECT value FROM config_table WHERE key='deviceID'") row = cursor.fetchone() conn.close() if row: return row[0].upper() except Exception as e: print(f'❌ Erreur lecture config SQLite: {e}') return None # --- Build ping payload --- def build_ping_payload(device_id): payload = bytearray(PAYLOAD_SIZE) for i in range(PAYLOAD_SIZE): payload[i] = 0xFF # Bytes 0-7: device_id (ASCII, padded with 0x00) device_id_bytes = device_id.encode('ascii')[:8].ljust(8, b'\x00') payload[0:8] = device_id_bytes # Byte 9: command = 0x02 (ping test) payload[9] = COMMAND_PING return bytes(payload) # --- Serial --- ser_sara = serial.Serial( port='/dev/ttyAMA2', baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=2 ) def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None): if wait_for_lines is None: wait_for_lines = [] response = bytearray() serial_connection.timeout = timeout end_time = time.time() + end_of_response_timeout start_time = time.time() while True: if serial_connection.in_waiting > 0: data = serial_connection.read(serial_connection.in_waiting) response.extend(data) end_time = time.time() + end_of_response_timeout decoded_response = response.decode('utf-8', errors='replace') for target_line in wait_for_lines: if target_line in decoded_response: return decoded_response elif time.time() > end_time: break time.sleep(0.1) return response.decode('utf-8', errors='replace') def send_at(command, wait_for=None, timeout=2): if wait_for is None: wait_for = ["OK", "+CME ERROR", "ERROR"] ser_sara.reset_input_buffer() ser_sara.write((command + '\r').encode('utf-8')) resp = read_complete_response(ser_sara, timeout=timeout, end_of_response_timeout=timeout, wait_for_lines=wait_for) has_error = "+CME ERROR" in resp or ("ERROR" in resp and "OK" not in resp) return resp, not has_error # --- Raw logs --- raw_logs = [] def log_raw(label, response): raw_logs.append(f"[{label}]\n{response.strip()}") socket_id = None try: sys.stdout.reconfigure(line_buffering=True) ser_sara.reset_input_buffer() # Load device ID device_id = load_device_id() if not device_id: print('❌ Impossible de lire le deviceID depuis la config') sys.exit(1) print(f'Device ID: {device_id}') # Build ping payload ping_payload = build_ping_payload(device_id) print(f'Payload: {PAYLOAD_SIZE} bytes, command=0x{COMMAND_PING:02X} (ping)') # Step 1: Create UDP socket bound to local port (for receiving downlink) resp, ok = send_at(f'AT+USOCR=17,{LISTEN_PORT}') log_raw(f'AT+USOCR=17,{LISTEN_PORT}', resp) if not ok: print('❌ Création socket UDP — erreur') print('La connexion PDP n\'est peut-être pas établie. Lancez "Vérifier connexion PDP" d\'abord.') sys.exit(1) match = re.search(r'\+USOCR:\s*(\d+)', resp) if not match: print('❌ Impossible d\'extraire le socket ID') sys.exit(1) socket_id = match.group(1) print(f'✅ Socket UDP créé (ID: {socket_id}, port local: {LISTEN_PORT})') # Step 2: Send ping payload via AT+USOST (unconnected mode) resp, ok = send_at( f'AT+USOST={socket_id},"{MIOTIQ_IP}",{MIOTIQ_PORT},{len(ping_payload)}', wait_for=["@", "OK", "+CME ERROR", "ERROR"] ) log_raw(f'AT+USOST={socket_id}', resp) if "@" not in resp: print('❌ Le modem n\'a pas envoyé le prompt @ — envoi annulé') sys.exit(1) ser_sara.write(ping_payload) resp = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"]) log_raw('PAYLOAD SEND', resp) if "+CME ERROR" in resp or ("ERROR" in resp and "OK" not in resp): print('❌ Erreur envoi du payload ping') sys.exit(1) print(f'✅ Payload ping envoyé vers {MIOTIQ_IP}:{MIOTIQ_PORT} ({PAYLOAD_SIZE} bytes)') # Step 3: Listen for downlink response on port LISTEN_PORT print(f'
Attente réponse descendante Miotiq sur port {LISTEN_PORT} ({LISTEN_TIMEOUT}s)...') response_received = False start_listen = time.time() while time.time() - start_listen < LISTEN_TIMEOUT: # Check for URC +UUSORF or poll with AT+USORF ser_sara.reset_input_buffer() resp, ok = send_at(f'AT+USORF={socket_id},512', timeout=2) log_raw(f'AT+USORF={socket_id},512', resp) # Parse response: +USORF: ,"",,,"" usorf_match = re.search(r'\+USORF:\s*\d+,"([^"]*)",(\d+),(\d+),"([^"]*)"', resp) if usorf_match: src_ip = usorf_match.group(1) src_port = usorf_match.group(2) data_len = usorf_match.group(3) received_data = usorf_match.group(4) elapsed = time.time() - start_listen print(f'
Réponse reçue en {elapsed:.1f}s !') print(f'Source: {src_ip}:{src_port} — {data_len} bytes
') print(f'Data: {received_data}') response_received = True break # Also check for no-data response: +USORF: ,0 # (means no data available yet, keep polling) time.sleep(2) if not response_received: print(f'
⚠️ Aucune réponse reçue après {LISTEN_TIMEOUT}s') print(f'Le payload a été envoyé mais aucune donnée reçue sur le port {LISTEN_PORT}. Vérifiez que le serveur envoie le downlink sur dstPort={LISTEN_PORT}.') # Step 4: Close socket resp, ok = send_at(f'AT+USOCL={socket_id}') log_raw(f'AT+USOCL={socket_id}', resp) socket_id = None print('✅ Socket fermé') # Summary if response_received: print('
Test aller-retour OK — la communication bidirectionnelle Miotiq fonctionne.') else: print('
L\'envoi UDP fonctionne mais la réponse descendante n\'a pas été reçue.') except serial.SerialException as e: print(f'❌ Erreur série: {e}') except SystemExit: pass except Exception as e: print(f'❌ Erreur: {e}') finally: # Close socket if still open if socket_id is not None: try: ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8')) read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1) except: pass # Print raw logs if raw_logs: log_id = "ping_raw_logs" print(f'
') print(f'
') for log in raw_logs: print(log.replace('\n', '
')) print('
') print('
') if ser_sara.is_open: ser_sara.close()