r''' ____ _ ____ _ / ___| / \ | _ \ / \ \___ \ / _ \ | |_) | / _ \ ___) / ___ \| _ < / ___ \ |____/_/ \_\_| \_\/_/ \_\ Test aller-retour Miotiq: 1. Construit un payload 100 bytes avec device_id + command=1 (ping) 2. Envoie via UDP vers 192.168.0.20:4242 3. Ecoute sur le socket pour une reponse descendante (~15s) 4. Affiche le resultat /usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/sara_ping_miotiq.py ''' import serial import time import sys import re import sqlite3 # --- Config --- MIOTIQ_IP = "192.168.0.20" MIOTIQ_PORT = 4242 PAYLOAD_SIZE = 100 COMMAND_PING = 0x01 LISTEN_TIMEOUT = 15 # seconds to wait for downlink response # --- Load device_id from SQLite --- def load_device_id(): try: conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() cursor.execute("SELECT value FROM config_table WHERE key='deviceID'") row = cursor.fetchone() conn.close() if row: return row[0].upper() except Exception as e: print(f'❌ Erreur lecture config SQLite: {e}') return None # --- Build ping payload --- def build_ping_payload(device_id): payload = bytearray(PAYLOAD_SIZE) # Init all to 0xFF (no data) for i in range(PAYLOAD_SIZE): payload[i] = 0xFF # Bytes 0-7: device_id (ASCII, padded with 0x00) device_id_bytes = device_id.encode('ascii')[:8].ljust(8, b'\x00') payload[0:8] = device_id_bytes # Byte 8: signal quality (0xFF = not set, fine for ping) # Byte 9: command = 0x01 (ping test) payload[9] = COMMAND_PING return bytes(payload) # --- Serial --- ser_sara = serial.Serial( port='/dev/ttyAMA2', baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=2 ) def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None): if wait_for_lines is None: wait_for_lines = [] response = bytearray() serial_connection.timeout = timeout end_time = time.time() + end_of_response_timeout start_time = time.time() while True: if serial_connection.in_waiting > 0: data = serial_connection.read(serial_connection.in_waiting) response.extend(data) end_time = time.time() + end_of_response_timeout decoded_response = response.decode('utf-8', errors='replace') for target_line in wait_for_lines: if target_line in decoded_response: return decoded_response elif time.time() > end_time: break time.sleep(0.1) return response.decode('utf-8', errors='replace') def send_at(command, wait_for=None, timeout=2): if wait_for is None: wait_for = ["OK", "+CME ERROR", "ERROR"] ser_sara.reset_input_buffer() ser_sara.write((command + '\r').encode('utf-8')) resp = read_complete_response(ser_sara, timeout=timeout, end_of_response_timeout=timeout, wait_for_lines=wait_for) has_error = "+CME ERROR" in resp or ("ERROR" in resp and "OK" not in resp) return resp, not has_error # --- Raw logs --- raw_logs = [] def log_raw(label, response): raw_logs.append(f"[{label}]\n{response.strip()}") socket_id = None try: sys.stdout.reconfigure(line_buffering=True) ser_sara.reset_input_buffer() # Load device ID device_id = load_device_id() if not device_id: print('❌ Impossible de lire le deviceID depuis la config') sys.exit(1) print(f'Device ID: {device_id}') # Build ping payload ping_payload = build_ping_payload(device_id) print(f'Payload: {PAYLOAD_SIZE} bytes, command=0x{COMMAND_PING:02X} (ping)') # Step 1: Create UDP socket resp, ok = send_at('AT+USOCR=17') log_raw('AT+USOCR=17', resp) if not ok: print('❌ Création socket UDP — erreur') print('La connexion PDP n\'est peut-être pas établie. Lancez "Vérifier connexion PDP" d\'abord.') sys.exit(1) match = re.search(r'\+USOCR:\s*(\d+)', resp) if not match: print('❌ Impossible d\'extraire le socket ID') sys.exit(1) socket_id = match.group(1) print(f'✅ Socket UDP créé (ID: {socket_id})') # Step 2: Connect to Miotiq resp, ok = send_at(f'AT+USOCO={socket_id},"{MIOTIQ_IP}",{MIOTIQ_PORT}', timeout=5) log_raw(f'AT+USOCO={socket_id}', resp) if not ok: print(f'❌ Connexion à {MIOTIQ_IP}:{MIOTIQ_PORT} — erreur') sys.exit(1) print(f'✅ Connecté à {MIOTIQ_IP}:{MIOTIQ_PORT}') # Step 3: Write ping payload resp, ok = send_at(f'AT+USOWR={socket_id},{len(ping_payload)}', wait_for=["@", "OK", "+CME ERROR", "ERROR"]) log_raw(f'AT+USOWR={socket_id},{len(ping_payload)}', resp) if "@" not in resp: print('❌ Le modem n\'a pas envoyé le prompt @ — envoi annulé') sys.exit(1) ser_sara.write(ping_payload) resp = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"]) log_raw('PAYLOAD SEND', resp) if "+CME ERROR" in resp or ("ERROR" in resp and "OK" not in resp): print('❌ Erreur envoi du payload ping') sys.exit(1) print(f'✅ Payload ping envoyé ({PAYLOAD_SIZE} bytes)') # Step 4: Listen for downlink response print(f'⏳ Attente réponse descendante Miotiq ({LISTEN_TIMEOUT}s)...') response_received = False start_listen = time.time() while time.time() - start_listen < LISTEN_TIMEOUT: # Poll for available data on socket ser_sara.reset_input_buffer() resp, ok = send_at(f'AT+USORD={socket_id},0', timeout=2) log_raw(f'AT+USORD={socket_id},0', resp) # Parse response: +USORD: , usord_match = re.search(r'\+USORD:\s*\d+,(\d+)', resp) if usord_match: available_bytes = int(usord_match.group(1)) if available_bytes > 0: # Read the data resp2, ok2 = send_at(f'AT+USORD={socket_id},{available_bytes}', timeout=2) log_raw(f'AT+USORD={socket_id},{available_bytes}', resp2) # Extract data: +USORD: ,,"" data_match = re.search(r'\+USORD:\s*\d+,\d+,"([^"]*)"', resp2) if data_match: received_data = data_match.group(1) elapsed = time.time() - start_listen print(f'✅ Réponse reçue en {elapsed:.1f}s !') print(f'Data: {received_data}') response_received = True break time.sleep(2) # Poll every 2 seconds if not response_received: print(f'⚠️ Aucune réponse reçue après {LISTEN_TIMEOUT}s') print('Le payload a été envoyé mais le serveur n\'a pas renvoyé de réponse descendante.') # Step 5: Close socket resp, ok = send_at(f'AT+USOCL={socket_id}') log_raw(f'AT+USOCL={socket_id}', resp) socket_id = None # Prevent double close in finally print('✅ Socket fermé') # Summary if response_received: print('Test aller-retour OK — la communication bidirectionnelle Miotiq fonctionne.') else: print('L\'envoi UDP fonctionne mais la réponse descendante n\'a pas été reçue. Vérifiez la configuration serveur.') except serial.SerialException as e: print(f'❌ Erreur série: {e}') except SystemExit: pass except Exception as e: print(f'❌ Erreur: {e}') finally: # Close socket if still open if socket_id is not None: try: ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8')) read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1) except: pass # Print raw logs if raw_logs: log_id = "ping_raw_logs" print(f'Logs AT') print(f'') for log in raw_logs: print(log.replace('\n', '')) print('') print('') if ser_sara.is_open: ser_sara.close()
') for log in raw_logs: print(log.replace('\n', '')) print('') print('