From 1c6af36313cd43d6e56f280db7174359fba99cda Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 14 Mar 2025 10:57:45 +0100 Subject: [PATCH] update --- SARA/R5/setPDP.py | 26 +++++++- SARA/reboot/start.py | 132 +++++++++++++++++++++++++++++++------- SARA/sara_ping.py | 49 +++++++++++++- loop/SARA_send_data_v2.py | 90 +++++++++++++++++--------- 4 files changed, 240 insertions(+), 57 deletions(-) diff --git a/SARA/R5/setPDP.py b/SARA/R5/setPDP.py index bdafc40..ba3d07a 100644 --- a/SARA/R5/setPDP.py +++ b/SARA/R5/setPDP.py @@ -119,7 +119,7 @@ def read_complete_response(serial_connection, timeout=2, end_of_response_timeout try: print('Start script') - # 1. Set AIRCARTO URL (profile id = 0) + # 1. Check connection print('➡️Check SARA R5 connexion') command = f'ATI0\r' ser_sara.write(command.encode('utf-8')) @@ -135,6 +135,30 @@ try: print(response_SARA_2, end="") time.sleep(1) + # 2. Set the PDP type + print('➡️Set the PDP type to IPv4 referring to the outputof the +CGDCONT read command') + command = f'AT+UPSD=0,0,0\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_3, end="") + time.sleep(1) + + # 2. Profile #0 is mapped on CID=1. + print('➡️Profile #0 is mapped on CID=1.') + command = f'AT+UPSD=0,100,1\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_3, end="") + time.sleep(1) + + # 2. Set the PDP type + print('➡️Activate the PSD profile #0: the IPv4 address is already assigned by the network.') + command = f'AT+UPSDA=0,3\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK","+UUPSDA"]) + print(response_SARA_3, end="") + time.sleep(1) + except Exception as e: diff --git a/SARA/reboot/start.py b/SARA/reboot/start.py index 8502746..7655a7c 100644 --- a/SARA/reboot/start.py +++ b/SARA/reboot/start.py @@ -64,6 +64,8 @@ config = load_config(config_file) baudrate = config.get('SaraR4_baudrate', 115200) #baudrate du sara R4 device_id = config.get('deviceID', '').upper() #device ID en maj +sara_r5_DPD_setup = False + ser_sara = serial.Serial( port='/dev/ttyAMA2', baudrate=baudrate, #115200 ou 9600 @@ -138,6 +140,7 @@ try: elif "SARA-R500" in response_SARA_ATI: model = "SARA-R500" print("📱 Detected SARA R5 modem") + sara_r5_DPD_setup = True else: # Fallback to regex match if direct string match fails match = re.search(r"Model:\s*([A-Za-z0-9\-]+)", response_SARA_ATI) @@ -151,7 +154,9 @@ try: update_json_key(config_file, "modem_version", model) time.sleep(1) - + ''' + AIRCARTO + ''' # 1. Set AIRCARTO URL (profile id = 0) print('➡️Set aircarto URL') aircarto_profile_id = 0 @@ -162,20 +167,23 @@ try: print(response_SARA_1) time.sleep(1) - #2. Set uSpot URL (profile id = 1) - print('➡️Set uSpot URL') + ''' + uSpot + ''' + print("➡️➡️Set uSpot URL with SSL") + + security_profile_id = 1 uSpot_profile_id = 1 uSpot_url="api-prod.uspot.probesys.net" - security_profile_id = 1 + #step 1: import the certificate - print("****") + print("➡️ import certificate") certificate_name = "e6" with open("/var/www/nebuleair_pro_4g/SARA/SSL/certificate/e6.pem", "rb") as cert_file: certificate = cert_file.read() size_of_string = len(certificate) - print("\033[0;33m Import certificate\033[0m") # AT+USECMNG=0,,, # type-> 0 -> trusted root CA command = f'AT+USECMNG=0,0,"{certificate_name}",{size_of_string}\r' @@ -184,49 +192,127 @@ try: print(response_SARA_1) time.sleep(0.5) - - print("\033[0;33mAdd certificate\033[0m") + + print("➡️ add certificate") ser_sara.write(certificate) response_SARA_2 = read_complete_response(ser_sara) print(response_SARA_2) time.sleep(0.5) - # SECURITY PROFILE + # op_code: 0 -> certificate validation level + # param_val : 0 -> Level 0 No validation; 1-> Level 1 Root certificate validation + print("➡️Set the security profile (params)") + certification_level=0 + command = f'AT+USECPRF={security_profile_id},0,{certification_level}\r' + ser_sara.write((command + '\r').encode('utf-8')) + response_SARA_5b = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_5b) + time.sleep(0.5) + + # op_code: 1 -> minimum SSL/TLS version + # param_val : 0 -> any; server can use any version for the connection; 1-> LSv1.0; 2->TLSv1.1; 3->TLSv1.2; + print("➡️Set the security profile (params)") + minimum_SSL_version = 0 + command = f'AT+USECPRF={security_profile_id},1,{minimum_SSL_version}\r' + ser_sara.write((command + '\r').encode('utf-8')) + response_SARA_5bb = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_5bb) + time.sleep(0.5) + + #op_code: 2 -> legacy cipher suite selection + # 0 (factory-programmed value): a list of default cipher suites is proposed at the beginning of handshake process, and a cipher suite will be negotiated among the cipher suites proposed in the list. + print("➡️Set cipher") + cipher_suite = 0 + command = f'AT+USECPRF={security_profile_id},2,{cipher_suite}\r' + ser_sara.write((command + '\r').encode('utf-8')) + response_SARA_5cc = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_5cc) + time.sleep(0.5) + # op_code: 3 -> trusted root certificate internal name - print("\033[0;33mSet the security profile (choose cert)\033[0m") + print("➡️Set the security profile (choose cert)") command = f'AT+USECPRF={security_profile_id},3,"{certificate_name}"\r' ser_sara.write((command + '\r').encode('utf-8')) - response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"]) + response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"]) print(response_SARA_5c) time.sleep(0.5) - #step 4: set url (op_code = 1) + # op_code: 10 -> SNI (server name indication) + print("➡️Set the SNI") + command = f'AT+USECPRF={security_profile_id},10,"{uSpot_url}"\r' + ser_sara.write((command + '\r').encode('utf-8')) + response_SARA_5cf = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_5cf) + time.sleep(0.5) + + #step 4: set url (op_code = 1) + print("➡️SET URL") command = f'AT+UHTTP={uSpot_profile_id},1,"{uSpot_url}"\r' - ser_sara.write(command.encode('utf-8')) - response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_2) + ser_sara.write((command + '\r').encode('utf-8')) + response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_5) time.sleep(1) #step 4: set PORT (op_code = 5) - print("set port 443") - command = f'AT+UHTTP={uSpot_profile_id},5,443\r' + print("➡️SET PORT") + port = 443 + command = f'AT+UHTTP={uSpot_profile_id},5,{port}\r' ser_sara.write((command + '\r').encode('utf-8')) response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"]) print(response_SARA_55) time.sleep(1) - - #step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2) - print("\033[0;33mSET SSL\033[0m") + + #step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2) + print("➡️SET SSL") http_secure = 1 command = f'AT+UHTTP={uSpot_profile_id},6,{http_secure},{security_profile_id}\r' - #command = f'AT+UHTTP={profile_id},6,{http_secure}\r' ser_sara.write(command.encode('utf-8')) - response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"]) - print(response_SARA_5) + response_SARA_5fg = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_5fg) time.sleep(1) + + ''' + SARA R5 + ''' + + if sara_r5_DPD_setup: + print("➡️➡️SARA R5 PDP SETUP") + # 2. Activate PDP context 1 + print('➡️Activate PDP context 1') + command = f'AT+CGACT=1,1\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_2, end="") + time.sleep(1) + + # 2. Set the PDP type + print('➡️Set the PDP type to IPv4 referring to the outputof the +CGDCONT read command') + command = f'AT+UPSD=0,0,0\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_3, end="") + time.sleep(1) + + # 2. Profile #0 is mapped on CID=1. + print('➡️Profile #0 is mapped on CID=1.') + command = f'AT+UPSD=0,100,1\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK"]) + print(response_SARA_3, end="") + time.sleep(1) + + # 2. Set the PDP type + print('➡️Activate the PSD profile #0: the IPv4 address is already assigned by the network.') + command = f'AT+UPSDA=0,3\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_3 = read_complete_response(ser_sara, wait_for_lines=["OK","+UUPSDA"]) + print(response_SARA_3, end="") + time.sleep(1) + + #3. Get localisation (CellLocate) mode = 2 #single shot position sensor = 2 #use cellular CellLocate® location information diff --git a/SARA/sara_ping.py b/SARA/sara_ping.py index c9ffb3c..7d9c92a 100644 --- a/SARA/sara_ping.py +++ b/SARA/sara_ping.py @@ -89,6 +89,24 @@ def read_complete_response(serial_connection, timeout=2, end_of_response_timeout return response.decode('utf-8', errors='replace') # Return the full response if no target line is found +def extract_error_code(response): + """ + Extract just the error code from AT+UHTTPER response + """ + for line in response.split('\n'): + if '+UHTTPER' in line: + try: + # Split the line and get the third value (error code) + parts = line.split(':')[1].strip().split(',') + if len(parts) >= 3: + error_code = int(parts[2]) + return error_code + except: + pass + + # Return None if we couldn't find the error code + return None + try: #3. Send to endpoint (with device ID) print("Send data (GET REQUEST):") @@ -111,7 +129,36 @@ try: parts = http_response.split(',') # 2.1 code 0 (HTTP failed) ⛔⛔⛔ if len(parts) == 3 and parts[-1] == '0': # The third value indicates success - print("⛔ATTENTION: HTTP operation failed") + print("⛔⛔ATTENTION: HTTP operation failed") + #get error code + print("Getting error code (11->Server connection error, 73->Secure socket connect error)") + command = f'AT+UHTTPER={aircarto_profile_id}\r' + ser_sara.write(command.encode('utf-8')) + response_SARA_9 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) + print('

') + print(response_SARA_9) + print("

", end="") + # Extract just the error code + error_code = extract_error_code(response_SARA_9) + if error_code is not None: + # Display interpretation based on error code + if error_code == 0: + print('

No error detected

') + elif error_code == 4: + print('

Error 4: Invalid server Hostname

') + elif error_code == 11: + print('

Error 11: Server connection error

') + elif error_code == 22: + print('

Error 22: PSD or CSD connection not established

') + elif error_code == 73: + print('

Error 73: Secure socket connect error

') + else: + print(f'

Unknown error code: {error_code}

') + else: + print('

Could not extract error code from response

') + + + # 2.2 code 1 (HHTP succeded) else: # Si la commande HTTP a réussi diff --git a/loop/SARA_send_data_v2.py b/loop/SARA_send_data_v2.py index 5aaf82f..b6f26ce 100755 --- a/loop/SARA_send_data_v2.py +++ b/loop/SARA_send_data_v2.py @@ -287,6 +287,24 @@ def read_complete_response(serial_connection, timeout=2, end_of_response_timeout return response.decode('utf-8', errors='replace') # Return the full response if no target line is found +def extract_error_code(response): + """ + Extract just the error code from AT+UHTTPER response + """ + for line in response.split('\n'): + if '+UHTTPER' in line: + try: + # Split the line and get the third value (error code) + parts = line.split(':')[1].strip().split(',') + if len(parts) >= 3: + error_code = int(parts[2]) + return error_code + except: + pass + + # Return None if we couldn't find the error code + return None + try: ''' _ ___ ___ ____ @@ -458,7 +476,7 @@ try: print("Verify SARA R4 connection") # Getting the LTE Signal - print("-> Getting LTE signal <-") + print("➡️Getting LTE signal") ser_sara.write(b'AT+CSQ\r') response2 = read_complete_response(ser_sara, wait_for_lines=["OK"]) @@ -606,7 +624,7 @@ try: led_thread.start() # Get error code - print("Getting error code (11->Server connection error, 73->Secure socket connect error)") + print("Getting error code") command = f'AT+UHTTPER={aircarto_profile_id}\r' ser_sara.write(command.encode('utf-8')) response_SARA_9 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) @@ -614,20 +632,25 @@ try: print(response_SARA_9) print("

", end="") - ''' - +UHTTPER: profile_id,error_class,error_code - - error_class - 0 OK, no error - 3 HTTP Protocol error class - 10 Wrong HTTP API USAGE - - error_code (for error_class 3 and 10) - 0 No error - 4 Invalid server Hostname - 11 Server connection error - 73 Secure socket connect error - ''' + # Extract just the error code + error_code = extract_error_code(response_SARA_9) + if error_code is not None: + # Display interpretation based on error code + if error_code == 0: + print('

No error detected

') + elif error_code == 4: + print('

Error 4: Invalid server Hostname

') + elif error_code == 11: + print('

Error 11: Server connection error

') + elif error_code == 22: + print('

Error 22: PSD or CSD connection not established

') + elif error_code == 73: + print('

Error 73: Secure socket connect error

') + else: + print(f'

Unknown error code: {error_code}

') + else: + print('

Could not extract error code from response

') + #Reboot du SARA R4 #ATTENTION: sur le SARA R5 la commande renvoie une erreur @@ -939,28 +962,31 @@ try: led_thread.start() # Get error code - print("Getting error code (4-> Invalid server Hostname, 11->Server connection error, 73->Secure socket connect error)") + print("Getting error code") command = f'AT+UHTTPER={uSpot_profile_id}\r' ser_sara.write(command.encode('utf-8')) response_SARA_9b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False) print('

') print(response_SARA_9b) print("

", end="") - - ''' - +UHTTPER: profile_id,error_class,error_code - - error_class - 0 OK, no error - 3 HTTP Protocol error class - 10 Wrong HTTP API USAGE - - error_code (for error_class 3) - 0 No error - 4 Invalid server Hostname - 11 Server connection error - 73 Secure socket connect error - ''' + # Extract just the error code + error_code = extract_error_code(response_SARA_9b) + if error_code is not None: + # Display interpretation based on error code + if error_code == 0: + print('

No error detected

') + elif error_code == 4: + print('

Error 4: Invalid server Hostname

') + elif error_code == 11: + print('

Error 11: Server connection error

') + elif error_code == 22: + print('

Error 22: PSD or CSD connection not established

') + elif error_code == 73: + print('

Error 73: Secure socket connect error

') + else: + print(f'

Unknown error code: {error_code}

') + else: + print('

Could not extract error code from response

') #Pas forcément un moyen de résoudre le soucis