Le socket connecte (AT+USOCO) ne recevait pas le downlink Miotiq car celui-ci arrive sur un port different. Maintenant: - AT+USOCR=17,33333 : socket binde sur port local fixe - AT+USOST : envoi en mode non-connecte - AT+USORF : ecoute du downlink sur le port binde Le serveur doit envoyer le downlink avec dstPort=33333. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
236 lines
8.5 KiB
Python
236 lines
8.5 KiB
Python
r'''
|
|
____ _ ____ _
|
|
/ ___| / \ | _ \ / \
|
|
\___ \ / _ \ | |_) | / _ \
|
|
___) / ___ \| _ < / ___ \
|
|
|____/_/ \_\_| \_\/_/ \_\
|
|
|
|
Test aller-retour Miotiq:
|
|
1. Construit un payload 100 bytes avec device_id + command=0x02 (ping)
|
|
2. Cree un socket UDP binde sur un port local fixe (LISTEN_PORT)
|
|
3. Envoie via AT+USOST vers 192.168.0.20:4242 (mode non-connecte)
|
|
4. Ecoute avec AT+USORF pour la reponse descendante Miotiq (~15s)
|
|
|
|
Le serveur doit envoyer le downlink via l'API Miotiq sendToDevice
|
|
avec dstPort = LISTEN_PORT (33333).
|
|
|
|
/usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/sara_ping_miotiq.py
|
|
'''
|
|
|
|
import serial
|
|
import time
|
|
import sys
|
|
import re
|
|
import sqlite3
|
|
|
|
# --- Config ---
|
|
MIOTIQ_IP = "192.168.0.20"
|
|
MIOTIQ_PORT = 4242
|
|
LISTEN_PORT = 33333 # Port local sur lequel le capteur ecoute le downlink
|
|
PAYLOAD_SIZE = 100
|
|
COMMAND_PING = 0x02
|
|
LISTEN_TIMEOUT = 15 # seconds to wait for downlink response
|
|
|
|
# --- Load device_id from SQLite ---
|
|
def load_device_id():
|
|
try:
|
|
conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db")
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT value FROM config_table WHERE key='deviceID'")
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
if row:
|
|
return row[0].upper()
|
|
except Exception as e:
|
|
print(f'❌ Erreur lecture config SQLite: {e}')
|
|
return None
|
|
|
|
# --- Build ping payload ---
|
|
def build_ping_payload(device_id):
|
|
payload = bytearray(PAYLOAD_SIZE)
|
|
for i in range(PAYLOAD_SIZE):
|
|
payload[i] = 0xFF
|
|
# Bytes 0-7: device_id (ASCII, padded with 0x00)
|
|
device_id_bytes = device_id.encode('ascii')[:8].ljust(8, b'\x00')
|
|
payload[0:8] = device_id_bytes
|
|
# Byte 9: command = 0x02 (ping test)
|
|
payload[9] = COMMAND_PING
|
|
return bytes(payload)
|
|
|
|
# --- Serial ---
|
|
ser_sara = serial.Serial(
|
|
port='/dev/ttyAMA2',
|
|
baudrate=115200,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
bytesize=serial.EIGHTBITS,
|
|
timeout=2
|
|
)
|
|
|
|
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None):
|
|
if wait_for_lines is None:
|
|
wait_for_lines = []
|
|
response = bytearray()
|
|
serial_connection.timeout = timeout
|
|
end_time = time.time() + end_of_response_timeout
|
|
start_time = time.time()
|
|
while True:
|
|
if serial_connection.in_waiting > 0:
|
|
data = serial_connection.read(serial_connection.in_waiting)
|
|
response.extend(data)
|
|
end_time = time.time() + end_of_response_timeout
|
|
decoded_response = response.decode('utf-8', errors='replace')
|
|
for target_line in wait_for_lines:
|
|
if target_line in decoded_response:
|
|
return decoded_response
|
|
elif time.time() > end_time:
|
|
break
|
|
time.sleep(0.1)
|
|
return response.decode('utf-8', errors='replace')
|
|
|
|
|
|
def send_at(command, wait_for=None, timeout=2):
|
|
if wait_for is None:
|
|
wait_for = ["OK", "+CME ERROR", "ERROR"]
|
|
ser_sara.reset_input_buffer()
|
|
ser_sara.write((command + '\r').encode('utf-8'))
|
|
resp = read_complete_response(ser_sara, timeout=timeout, end_of_response_timeout=timeout, wait_for_lines=wait_for)
|
|
has_error = "+CME ERROR" in resp or ("ERROR" in resp and "OK" not in resp)
|
|
return resp, not has_error
|
|
|
|
|
|
# --- Raw logs ---
|
|
raw_logs = []
|
|
|
|
def log_raw(label, response):
|
|
raw_logs.append(f"[{label}]\n{response.strip()}")
|
|
|
|
|
|
socket_id = None
|
|
|
|
try:
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
ser_sara.reset_input_buffer()
|
|
|
|
# Load device ID
|
|
device_id = load_device_id()
|
|
if not device_id:
|
|
print('❌ <strong>Impossible de lire le deviceID depuis la config</strong>')
|
|
sys.exit(1)
|
|
print(f'Device ID: <strong>{device_id}</strong>')
|
|
|
|
# Build ping payload
|
|
ping_payload = build_ping_payload(device_id)
|
|
print(f'Payload: {PAYLOAD_SIZE} bytes, command=0x{COMMAND_PING:02X} (ping)')
|
|
|
|
# Step 1: Create UDP socket bound to local port (for receiving downlink)
|
|
resp, ok = send_at(f'AT+USOCR=17,{LISTEN_PORT}')
|
|
log_raw(f'AT+USOCR=17,{LISTEN_PORT}', resp)
|
|
if not ok:
|
|
print('❌ Création socket UDP — erreur')
|
|
print('<small class="text-muted">La connexion PDP n\'est peut-être pas établie. Lancez "Vérifier connexion PDP" d\'abord.</small>')
|
|
sys.exit(1)
|
|
|
|
match = re.search(r'\+USOCR:\s*(\d+)', resp)
|
|
if not match:
|
|
print('❌ Impossible d\'extraire le socket ID')
|
|
sys.exit(1)
|
|
socket_id = match.group(1)
|
|
print(f'✅ Socket UDP créé (ID: {socket_id}, port local: {LISTEN_PORT})')
|
|
|
|
# Step 2: Send ping payload via AT+USOST (unconnected mode)
|
|
resp, ok = send_at(
|
|
f'AT+USOST={socket_id},"{MIOTIQ_IP}",{MIOTIQ_PORT},{len(ping_payload)}',
|
|
wait_for=["@", "OK", "+CME ERROR", "ERROR"]
|
|
)
|
|
log_raw(f'AT+USOST={socket_id}', resp)
|
|
if "@" not in resp:
|
|
print('❌ Le modem n\'a pas envoyé le prompt @ — envoi annulé')
|
|
sys.exit(1)
|
|
|
|
ser_sara.write(ping_payload)
|
|
resp = read_complete_response(ser_sara, wait_for_lines=["OK", "+CME ERROR", "ERROR"])
|
|
log_raw('PAYLOAD SEND', resp)
|
|
if "+CME ERROR" in resp or ("ERROR" in resp and "OK" not in resp):
|
|
print('❌ Erreur envoi du payload ping')
|
|
sys.exit(1)
|
|
print(f'✅ Payload ping envoyé vers {MIOTIQ_IP}:{MIOTIQ_PORT} ({PAYLOAD_SIZE} bytes)')
|
|
|
|
# Step 3: Listen for downlink response on port LISTEN_PORT
|
|
print(f'<br>⏳ <strong>Attente réponse descendante Miotiq sur port {LISTEN_PORT} ({LISTEN_TIMEOUT}s)...</strong>')
|
|
|
|
response_received = False
|
|
start_listen = time.time()
|
|
|
|
while time.time() - start_listen < LISTEN_TIMEOUT:
|
|
# Check for URC +UUSORF or poll with AT+USORF
|
|
ser_sara.reset_input_buffer()
|
|
resp, ok = send_at(f'AT+USORF={socket_id},512', timeout=2)
|
|
log_raw(f'AT+USORF={socket_id},512', resp)
|
|
|
|
# Parse response: +USORF: <socket_id>,"<ip>",<port>,<length>,"<data>"
|
|
usorf_match = re.search(r'\+USORF:\s*\d+,"([^"]*)",(\d+),(\d+),"([^"]*)"', resp)
|
|
if usorf_match:
|
|
src_ip = usorf_match.group(1)
|
|
src_port = usorf_match.group(2)
|
|
data_len = usorf_match.group(3)
|
|
received_data = usorf_match.group(4)
|
|
elapsed = time.time() - start_listen
|
|
print(f'<br>✅ <strong class="text-success">Réponse reçue en {elapsed:.1f}s !</strong>')
|
|
print(f'<small class="text-muted">Source: {src_ip}:{src_port} — {data_len} bytes</small><br>')
|
|
print(f'<small class="text-muted">Data: {received_data}</small>')
|
|
response_received = True
|
|
break
|
|
|
|
# Also check for no-data response: +USORF: <socket_id>,0
|
|
# (means no data available yet, keep polling)
|
|
|
|
time.sleep(2)
|
|
|
|
if not response_received:
|
|
print(f'<br>⚠️ <strong class="text-warning">Aucune réponse reçue après {LISTEN_TIMEOUT}s</strong>')
|
|
print(f'<small class="text-muted">Le payload a été envoyé mais aucune donnée reçue sur le port {LISTEN_PORT}. Vérifiez que le serveur envoie le downlink sur dstPort={LISTEN_PORT}.</small>')
|
|
|
|
# Step 4: Close socket
|
|
resp, ok = send_at(f'AT+USOCL={socket_id}')
|
|
log_raw(f'AT+USOCL={socket_id}', resp)
|
|
socket_id = None
|
|
print('✅ Socket fermé')
|
|
|
|
# Summary
|
|
if response_received:
|
|
print('<br><strong class="text-success">Test aller-retour OK — la communication bidirectionnelle Miotiq fonctionne.</strong>')
|
|
else:
|
|
print('<br><strong class="text-warning">L\'envoi UDP fonctionne mais la réponse descendante n\'a pas été reçue.</strong>')
|
|
|
|
except serial.SerialException as e:
|
|
print(f'❌ Erreur série: {e}')
|
|
|
|
except SystemExit:
|
|
pass
|
|
|
|
except Exception as e:
|
|
print(f'❌ Erreur: {e}')
|
|
|
|
finally:
|
|
# Close socket if still open
|
|
if socket_id is not None:
|
|
try:
|
|
ser_sara.write(f'AT+USOCL={socket_id}\r'.encode('utf-8'))
|
|
read_complete_response(ser_sara, wait_for_lines=["OK", "ERROR"], timeout=1, end_of_response_timeout=1)
|
|
except:
|
|
pass
|
|
|
|
# Print raw logs
|
|
if raw_logs:
|
|
log_id = "ping_raw_logs"
|
|
print(f'<br><button class="btn btn-sm btn-outline-secondary mt-2" type="button" data-bs-toggle="collapse" data-bs-target="#{log_id}"><small>Logs AT</small></button>')
|
|
print(f'<div class="collapse mt-1" id="{log_id}"><div class="card card-body bg-light"><small><code>')
|
|
for log in raw_logs:
|
|
print(log.replace('\n', '<br>'))
|
|
print('<br>')
|
|
print('</code></small></div></div>')
|
|
|
|
if ser_sara.is_open:
|
|
ser_sara.close()
|