r'''
____ _ ____ _
/ ___| / \ | _ \ / \
\___ \ / _ \ | |_) | / _ \
___) / ___ \| _ < / ___ \
|____/_/ \_\_| \_\/_/ \_\
Check and setup PDP connection (user-friendly version for Miotiq page).
- Checks if PDP context is already active
- If yes: reports OK without touching anything
- If no: activates PDP context and PSD profile
/usr/bin/python3 /var/www/nebuleair_pro_4g/SARA/sara_check_pdp.py
'''
import serial
import time
import sys
import re
ser_sara = serial.Serial(
port='/dev/ttyAMA2',
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=2
)
def read_complete_response(serial_connection, timeout=2, end_of_response_timeout=2, wait_for_lines=None):
if wait_for_lines is None:
wait_for_lines = []
response = bytearray()
serial_connection.timeout = timeout
end_time = time.time() + end_of_response_timeout
start_time = time.time()
while True:
if serial_connection.in_waiting > 0:
data = serial_connection.read(serial_connection.in_waiting)
response.extend(data)
end_time = time.time() + end_of_response_timeout
decoded_response = response.decode('utf-8', errors='replace')
for target_line in wait_for_lines:
if target_line in decoded_response:
return decoded_response
elif time.time() > end_time:
break
time.sleep(0.1)
return response.decode('utf-8', errors='replace')
def send_at(command, wait_for=None, timeout=2):
"""Send AT command and return (response_text, success_bool)"""
if wait_for is None:
wait_for = ["OK", "+CME ERROR", "ERROR"]
ser_sara.reset_input_buffer()
ser_sara.write((command + '\r').encode('utf-8'))
resp = read_complete_response(ser_sara, timeout=timeout, end_of_response_timeout=timeout, wait_for_lines=wait_for)
success = "OK" in resp and "+CME ERROR" not in resp and "ERROR" not in resp.replace("OK", "")
return resp, success
# Collect raw logs for collapsible display
raw_logs = []
def log_raw(label, response):
raw_logs.append(f"[{label}]\n{response.strip()}")
try:
sys.stdout.reconfigure(line_buffering=True)
ser_sara.reset_input_buffer()
# Step 1: Check modem connectivity
resp, ok = send_at('ATI0')
log_raw('ATI0', resp)
if not ok:
print('❌ Modem non accessible')
print('Pas de réponse du modem sur ttyAMA2')
sys.exit(1)
print('✅ Modem connecté')
# Step 2: Check if PDP context is already active
resp, ok = send_at('AT+CGACT?')
log_raw('AT+CGACT?', resp)
pdp_already_active = '+CGACT: 1,1' in resp
if pdp_already_active:
print('✅ Contexte PDP déjà actif')
# Also check PSD profile status by trying to read IP
resp2, ok2 = send_at('AT+UPSND=0,0')
log_raw('AT+UPSND=0,0', resp2)
ip_match = re.search(r'\+UPSND:\s*0,0,"([^"]+)"', resp2)
if ip_match and ip_match.group(1) != "0.0.0.0":
ip_addr = ip_match.group(1)
print(f'✅ Profil PSD actif — IP: {ip_addr}')
print('
Connexion PDP OK — prêt pour les sockets UDP.')
else:
# PDP active but PSD profile not set up — need to configure it
print('⚠️ Contexte PDP actif mais profil PSD non configuré — activation en cours...')
setup_psd = True
else:
print('⚠️ Contexte PDP inactif — activation en cours...')
setup_psd = True
# Activate PDP context
resp, ok = send_at('AT+CGACT=1,1', timeout=5)
log_raw('AT+CGACT=1,1', resp)
if ok:
print('✅ Contexte PDP activé')
else:
print('❌ Échec activation contexte PDP')
sys.exit(1)
# Setup PSD profile if needed
if 'setup_psd' in dir() and setup_psd:
# Set PDP type to IPv4
resp, ok = send_at('AT+UPSD=0,0,0')
log_raw('AT+UPSD=0,0,0', resp)
# Map profile #0 to CID=1
resp, ok = send_at('AT+UPSD=0,100,1')
log_raw('AT+UPSD=0,100,1', resp)
# Activate PSD profile
resp, ok = send_at('AT+UPSDA=0,3', wait_for=["OK", "+UUPSDA", "+CME ERROR", "ERROR"], timeout=5)
log_raw('AT+UPSDA=0,3', resp)
if "OK" in resp or "+UUPSDA" in resp:
# Verify IP
resp2, ok2 = send_at('AT+UPSND=0,0')
log_raw('AT+UPSND=0,0', resp2)
ip_match = re.search(r'\+UPSND:\s*0,0,"([^"]+)"', resp2)
if ip_match and ip_match.group(1) != "0.0.0.0":
print(f'✅ Profil PSD activé — IP: {ip_match.group(1)}')
print('
Connexion PDP OK — prêt pour les sockets UDP.')
else:
print('✅ Profil PSD activé')
print('
Connexion PDP OK.')
else:
print('❌ Échec activation profil PSD')
print('
La connexion PDP n\'a pas pu être établie.')
except serial.SerialException as e:
print(f'❌ Erreur série: {e}')
except Exception as e:
print(f'❌ Erreur: {e}')
finally:
# Print raw logs in a collapsible section
if raw_logs:
log_id = "pdp_raw_logs"
print(f'
')
print(f'
')
for log in raw_logs:
print(log.replace('\n', '
'))
print('
')
print('