- Nouveau script SARA/sara_check_pdp.py: vérifie si PDP est déjà actif avant d'agir - Si PDP actif: affiche OK + IP sans toucher à la config - Si PDP inactif: active automatiquement + affiche résultat - Logs AT bruts accessibles via bouton collapse - Endpoint launcher.php sara_check_pdp Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
171 lines
5.8 KiB
Python
171 lines
5.8 KiB
Python
r'''
|
|
____ _ ____ _
|
|
/ ___| / \ | _ \ / \
|
|
\___ \ / _ \ | |_) | / _ \
|
|
___) / ___ \| _ < / ___ \
|
|
|____/_/ \_\_| \_\/_/ \_\
|
|
|
|
Check and setup PDP connection (user-friendly version for Miotiq page).
|
|
- Checks if PDP context is already active
|
|
- If yes: reports OK without touching anything
|
|
- If no: activates PDP context and PSD profile
|
|
|
|
/usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/sara_check_pdp.py
|
|
'''
|
|
|
|
import serial
|
|
import time
|
|
import sys
|
|
import re
|
|
|
|
ser_sara = serial.Serial(
|
|
port='/dev/ttyAMA2',
|
|
baudrate=115200,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
bytesize=serial.EIGHTBITS,
|
|
timeout=2
|
|
)
|
|
|
|
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None):
|
|
if wait_for_lines is None:
|
|
wait_for_lines = []
|
|
|
|
response = bytearray()
|
|
serial_connection.timeout = timeout
|
|
end_time = time.time() + end_of_response_timeout
|
|
start_time = time.time()
|
|
|
|
while True:
|
|
if serial_connection.in_waiting > 0:
|
|
data = serial_connection.read(serial_connection.in_waiting)
|
|
response.extend(data)
|
|
end_time = time.time() + end_of_response_timeout
|
|
|
|
decoded_response = response.decode('utf-8', errors='replace')
|
|
for target_line in wait_for_lines:
|
|
if target_line in decoded_response:
|
|
return decoded_response
|
|
elif time.time() > end_time:
|
|
break
|
|
time.sleep(0.1)
|
|
|
|
return response.decode('utf-8', errors='replace')
|
|
|
|
|
|
def send_at(command, wait_for=None, timeout=2):
|
|
"""Send AT command and return (response_text, success_bool)"""
|
|
if wait_for is None:
|
|
wait_for = ["OK", "+CME ERROR", "ERROR"]
|
|
ser_sara.reset_input_buffer()
|
|
ser_sara.write((command + '\r').encode('utf-8'))
|
|
resp = read_complete_response(ser_sara, timeout=timeout, end_of_response_timeout=timeout, wait_for_lines=wait_for)
|
|
success = "OK" in resp and "+CME ERROR" not in resp and "ERROR" not in resp.replace("OK", "")
|
|
return resp, success
|
|
|
|
|
|
# Collect raw logs for collapsible display
|
|
raw_logs = []
|
|
|
|
def log_raw(label, response):
|
|
raw_logs.append(f"[{label}]\n{response.strip()}")
|
|
|
|
|
|
try:
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
ser_sara.reset_input_buffer()
|
|
|
|
# Step 1: Check modem connectivity
|
|
resp, ok = send_at('ATI0')
|
|
log_raw('ATI0', resp)
|
|
if not ok:
|
|
print('❌ <strong>Modem non accessible</strong>')
|
|
print('<small class="text-muted">Pas de réponse du modem sur ttyAMA2</small>')
|
|
sys.exit(1)
|
|
|
|
print('✅ Modem connecté')
|
|
|
|
# Step 2: Check if PDP context is already active
|
|
resp, ok = send_at('AT+CGACT?')
|
|
log_raw('AT+CGACT?', resp)
|
|
|
|
pdp_already_active = '+CGACT: 1,1' in resp
|
|
|
|
if pdp_already_active:
|
|
print('✅ Contexte PDP déjà actif')
|
|
|
|
# Also check PSD profile status by trying to read IP
|
|
resp2, ok2 = send_at('AT+UPSND=0,0')
|
|
log_raw('AT+UPSND=0,0', resp2)
|
|
|
|
ip_match = re.search(r'\+UPSND:\s*0,0,"([^"]+)"', resp2)
|
|
if ip_match and ip_match.group(1) != "0.0.0.0":
|
|
ip_addr = ip_match.group(1)
|
|
print(f'✅ Profil PSD actif — IP: {ip_addr}')
|
|
print('<br><strong class="text-success">Connexion PDP OK — prêt pour les sockets UDP.</strong>')
|
|
else:
|
|
# PDP active but PSD profile not set up — need to configure it
|
|
print('⚠️ Contexte PDP actif mais profil PSD non configuré — activation en cours...')
|
|
setup_psd = True
|
|
else:
|
|
print('⚠️ Contexte PDP inactif — activation en cours...')
|
|
setup_psd = True
|
|
|
|
# Activate PDP context
|
|
resp, ok = send_at('AT+CGACT=1,1', timeout=5)
|
|
log_raw('AT+CGACT=1,1', resp)
|
|
if ok:
|
|
print('✅ Contexte PDP activé')
|
|
else:
|
|
print('❌ Échec activation contexte PDP')
|
|
sys.exit(1)
|
|
|
|
# Setup PSD profile if needed
|
|
if 'setup_psd' in dir() and setup_psd:
|
|
# Set PDP type to IPv4
|
|
resp, ok = send_at('AT+UPSD=0,0,0')
|
|
log_raw('AT+UPSD=0,0,0', resp)
|
|
|
|
# Map profile #0 to CID=1
|
|
resp, ok = send_at('AT+UPSD=0,100,1')
|
|
log_raw('AT+UPSD=0,100,1', resp)
|
|
|
|
# Activate PSD profile
|
|
resp, ok = send_at('AT+UPSDA=0,3', wait_for=["OK", "+UUPSDA", "+CME ERROR", "ERROR"], timeout=5)
|
|
log_raw('AT+UPSDA=0,3', resp)
|
|
|
|
if "OK" in resp or "+UUPSDA" in resp:
|
|
# Verify IP
|
|
resp2, ok2 = send_at('AT+UPSND=0,0')
|
|
log_raw('AT+UPSND=0,0', resp2)
|
|
ip_match = re.search(r'\+UPSND:\s*0,0,"([^"]+)"', resp2)
|
|
if ip_match and ip_match.group(1) != "0.0.0.0":
|
|
print(f'✅ Profil PSD activé — IP: {ip_match.group(1)}')
|
|
print('<br><strong class="text-success">Connexion PDP OK — prêt pour les sockets UDP.</strong>')
|
|
else:
|
|
print('✅ Profil PSD activé')
|
|
print('<br><strong class="text-success">Connexion PDP OK.</strong>')
|
|
else:
|
|
print('❌ Échec activation profil PSD')
|
|
print('<br><strong class="text-danger">La connexion PDP n\'a pas pu être établie.</strong>')
|
|
|
|
except serial.SerialException as e:
|
|
print(f'❌ Erreur série: {e}')
|
|
|
|
except Exception as e:
|
|
print(f'❌ Erreur: {e}')
|
|
|
|
finally:
|
|
# Print raw logs in a collapsible section
|
|
if raw_logs:
|
|
log_id = "pdp_raw_logs"
|
|
print(f'<br><button class="btn btn-sm btn-outline-secondary mt-2" type="button" data-bs-toggle="collapse" data-bs-target="#{log_id}"><small>Logs AT</small></button>')
|
|
print(f'<div class="collapse mt-1" id="{log_id}"><div class="card card-body bg-light"><small><code>')
|
|
for log in raw_logs:
|
|
print(log.replace('\n', '<br>'))
|
|
print('<br>')
|
|
print('</code></small></div></div>')
|
|
|
|
if ser_sara.is_open:
|
|
ser_sara.close()
|