update
This commit is contained in:
@@ -509,12 +509,134 @@ def reset_server_hostname(profile_id):
|
|||||||
if not http_reset_success:
|
if not http_reset_success:
|
||||||
print("⚠️ AirCarto HTTP profile reset failed")
|
print("⚠️ AirCarto HTTP profile reset failed")
|
||||||
elif profile_id ==1:
|
elif profile_id ==1:
|
||||||
pass # TODO: implement handling for profile 1
|
pass # on utilise la fonction reset_server_hostname_https pour uSpot
|
||||||
else:
|
else:
|
||||||
print(f"❌ Unsupported profile ID: {profile_id}")
|
print(f"❌ Unsupported profile ID: {profile_id}")
|
||||||
http_reset_success = False
|
http_reset_success = False
|
||||||
return http_reset_success
|
return http_reset_success
|
||||||
|
|
||||||
|
def reset_server_hostname_https(profile_id):
|
||||||
|
"""
|
||||||
|
Function that reset server hostname (URL) connection for the SARA R5
|
||||||
|
returns true or false
|
||||||
|
"""
|
||||||
|
print("⚠️Reseting Server Hostname HTTS secure connection ")
|
||||||
|
http_reset_success = False # Default fallback
|
||||||
|
|
||||||
|
#Pour uSpot
|
||||||
|
if profile_id == 1:
|
||||||
|
print('<span style="color: orange;font-weight: bold;">🔧 Resetting uSpot HTTPs Profile</span>')
|
||||||
|
uSpot_url="api-prod.uspot.probesys.net"
|
||||||
|
security_profile_id = 1
|
||||||
|
|
||||||
|
#step 1: import the certificate
|
||||||
|
print("➡️ import certificate")
|
||||||
|
certificate_name = "e6"
|
||||||
|
with open("/var/www/moduleair_pro_4g/SARA/SSL/certificate/e6.pem", "rb") as cert_file:
|
||||||
|
certificate = cert_file.read()
|
||||||
|
size_of_string = len(certificate)
|
||||||
|
|
||||||
|
# AT+USECMNG=0,<type>,<internal_name>,<data_size>
|
||||||
|
# type-> 0 -> trusted root CA
|
||||||
|
command = f'AT+USECMNG=0,0,"{certificate_name}",{size_of_string}\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_1 = read_complete_response(ser_sara, wait_for_lines=[">"])
|
||||||
|
print(response_SARA_1)
|
||||||
|
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
print("➡️ add certificate")
|
||||||
|
ser_sara.write(certificate)
|
||||||
|
response_SARA_2 = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_2)
|
||||||
|
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# op_code: 0 -> certificate validation level
|
||||||
|
# param_val : 0 -> Level 0 No validation; 1-> Level 1 Root certificate validation
|
||||||
|
print("➡️Set the security profile (params)")
|
||||||
|
certification_level=0
|
||||||
|
command = f'AT+USECPRF={security_profile_id},0,{certification_level}\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_5b = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_5b)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# op_code: 1 -> minimum SSL/TLS version
|
||||||
|
# param_val : 0 -> any; server can use any version for the connection; 1-> LSv1.0; 2->TLSv1.1; 3->TLSv1.2;
|
||||||
|
print("➡️Set the security profile (params)")
|
||||||
|
minimum_SSL_version = 0
|
||||||
|
command = f'AT+USECPRF={security_profile_id},1,{minimum_SSL_version}\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_5bb = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_5bb)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
#op_code: 2 -> legacy cipher suite selection
|
||||||
|
# 0 (factory-programmed value): a list of default cipher suites is proposed at the beginning of handshake process, and a cipher suite will be negotiated among the cipher suites proposed in the list.
|
||||||
|
print("➡️Set cipher")
|
||||||
|
cipher_suite = 0
|
||||||
|
command = f'AT+USECPRF={security_profile_id},2,{cipher_suite}\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_5cc = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_5cc)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# op_code: 3 -> trusted root certificate internal name
|
||||||
|
print("➡️Set the security profile (choose cert)")
|
||||||
|
command = f'AT+USECPRF={security_profile_id},3,"{certificate_name}"\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_5c = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_5c)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# op_code: 10 -> SNI (server name indication)
|
||||||
|
print("➡️Set the SNI")
|
||||||
|
command = f'AT+USECPRF={security_profile_id},10,"{uSpot_url}"\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_5cf = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_5cf)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
#step 4: set url (op_code = 1)
|
||||||
|
print("➡️SET URL")
|
||||||
|
command = f'AT+UHTTP={profile_id},1,"{uSpot_url}"\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_5 = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_5)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
#step 4: set PORT (op_code = 5)
|
||||||
|
print("➡️SET PORT")
|
||||||
|
port = 443
|
||||||
|
command = f'AT+UHTTP={profile_id},5,{port}\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_55 = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_55)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
#step 4: set url to SSL (op_code = 6) (http_secure = 1 for HTTPS)(USECMNG_PROFILE = 2)
|
||||||
|
print("➡️SET SSL")
|
||||||
|
http_secure = 1
|
||||||
|
command = f'AT+UHTTP={profile_id},6,{http_secure},{security_profile_id}\r'
|
||||||
|
|
||||||
|
ser_sara.write(command.encode('utf-8'))
|
||||||
|
response_SARA_5fg = read_complete_response(ser_sara, wait_for_lines=["OK"])
|
||||||
|
print(response_SARA_5fg)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
http_reset_success = response_SARA_5 is not None and "OK" in response_SARA_5
|
||||||
|
if not http_reset_success:
|
||||||
|
print("⚠️ AirCarto HTTP profile reset failed")
|
||||||
|
#Pour uSpot
|
||||||
|
elif profile_id ==1:
|
||||||
|
pass #on utilise la fonction reset_server_hostname_https pour uSpot
|
||||||
|
else:
|
||||||
|
print(f"❌ Unsupported profile ID: {profile_id}")
|
||||||
|
http_reset_success = False
|
||||||
|
return http_reset_success
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
'''
|
'''
|
||||||
_ ___ ___ ____
|
_ ___ ___ ____
|
||||||
@@ -654,7 +776,7 @@ try:
|
|||||||
payload_csv[20] = co2_average # Choose appropriate index
|
payload_csv[20] = co2_average # Choose appropriate index
|
||||||
|
|
||||||
# Add data to payload JSON
|
# Add data to payload JSON
|
||||||
payload_json["sensordatavalues"].append({"value_type": "CO2", "value": str(co2_average)})
|
payload_json["sensordatavalues"].append({"value_type": "MHZ16_CO2", "value": str(co2_average)})
|
||||||
|
|
||||||
print(f"CO2 average from {len(co2_values)} measurements: {co2_average}")
|
print(f"CO2 average from {len(co2_values)} measurements: {co2_average}")
|
||||||
else:
|
else:
|
||||||
@@ -972,12 +1094,12 @@ try:
|
|||||||
server_time_formatted = server_datetime.strftime('%Y-%m-%d %H:%M:%S')
|
server_time_formatted = server_datetime.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
#update RTC module do not wait for answer, non blocking
|
#update RTC module do not wait for answer, non blocking
|
||||||
#/usr/bin/python3 /var/www/nebuleair_pro_4g/RTC/set_with_browserTime.py '2024-01-30 12:48:39'
|
#/usr/bin/python3 /var/www/moduleair_pro_4g/RTC/set_with_browserTime.py '2024-01-30 12:48:39'
|
||||||
# Launch RTC update script as non-blocking subprocess
|
# Launch RTC update script as non-blocking subprocess
|
||||||
import subprocess
|
import subprocess
|
||||||
update_command = [
|
update_command = [
|
||||||
"/usr/bin/python3",
|
"/usr/bin/python3",
|
||||||
"/var/www/nebuleair_pro_4g/RTC/set_with_browserTime.py",
|
"/var/www/moduleair_pro_4g/RTC/set_with_browserTime.py",
|
||||||
server_time_formatted
|
server_time_formatted
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -1064,6 +1186,195 @@ try:
|
|||||||
if "+CME ERROR" in response_SARA_5:
|
if "+CME ERROR" in response_SARA_5:
|
||||||
print("⛔ Attention CME ERROR ⛔")
|
print("⛔ Attention CME ERROR ⛔")
|
||||||
|
|
||||||
|
'''
|
||||||
|
_ ____ _
|
||||||
|
___ ___ _ __ __| | _ _/ ___| _ __ ___ | |_
|
||||||
|
/ __|/ _ \ '_ \ / _` | | | | \___ \| '_ \ / _ \| __|
|
||||||
|
\__ \ __/ | | | (_| | | |_| |___) | |_) | (_) | |_
|
||||||
|
|___/\___|_| |_|\__,_| \__,_|____/| .__/ \___/ \__|
|
||||||
|
|_|
|
||||||
|
'''
|
||||||
|
if send_uSpot:
|
||||||
|
print('<p class="fw-bold">➡️SEND TO uSPOT SERVERS</p>', end="")
|
||||||
|
|
||||||
|
# 1. Open sensordata_json.json (with correct data size)
|
||||||
|
print("Open JSON:")
|
||||||
|
payload_string = json.dumps(payload_json) # Convert dict to JSON string
|
||||||
|
size_of_string = len(payload_string)
|
||||||
|
command = f'AT+UDWNFILE="sensordata_json.json",{size_of_string}\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_6 = read_complete_response(ser_sara, wait_for_lines=[">"], debug=False)
|
||||||
|
print(response_SARA_6)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
#2. Write to shell
|
||||||
|
print("Write to memory:")
|
||||||
|
ser_sara.write(payload_string.encode())
|
||||||
|
response_SARA_7 = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
|
||||||
|
print(response_SARA_7)
|
||||||
|
|
||||||
|
#step 4: trigger the request (http_command=1 for GET and http_command=1 for POST)
|
||||||
|
print("****")
|
||||||
|
print("Trigger POST REQUEST")
|
||||||
|
command = f'AT+UHTTPC={uSpot_profile_id},4,"/moduleair?token=2AFF6dQk68daFZ","uSpot_server_response.txt","sensordata_json.json",4\r'
|
||||||
|
ser_sara.write(command.encode('utf-8'))
|
||||||
|
|
||||||
|
response_SARA_8 = read_complete_response(ser_sara, timeout=5, end_of_response_timeout=120, wait_for_lines=["+UUHTTPCR", "+CME ERROR"], debug=True)
|
||||||
|
|
||||||
|
print('<p class="text-danger-emphasis">')
|
||||||
|
print(response_SARA_8)
|
||||||
|
print("</p>", end="")
|
||||||
|
|
||||||
|
# si on recoit la réponse UHTTPCR
|
||||||
|
if "+UUHTTPCR" in response_SARA_8:
|
||||||
|
print("✅ Received +UUHTTPCR response.")
|
||||||
|
lines = response_SARA_8.strip().splitlines()
|
||||||
|
# 1.Vérifier si la réponse contient un message d'erreur CME
|
||||||
|
if "+CME ERROR" in lines[-1]:
|
||||||
|
print("*****")
|
||||||
|
print('<span style="color: red;font-weight: bold;">⛔ATTENTION: CME ERROR</span>')
|
||||||
|
print("error:", lines[-1])
|
||||||
|
print("*****")
|
||||||
|
#update status
|
||||||
|
|
||||||
|
# Gestion de l'erreur spécifique
|
||||||
|
if "No connection to phone" in lines[-1]:
|
||||||
|
print("No connection to the phone.")
|
||||||
|
|
||||||
|
elif "Operation not allowed" in lines[-1]:
|
||||||
|
print("Operation not allowed. This may require a different configuration.")
|
||||||
|
# Actions spécifiques pour ce type d'erreur
|
||||||
|
|
||||||
|
# Clignotement LED rouge en cas d'erreur
|
||||||
|
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
|
||||||
|
led_thread.start()
|
||||||
|
|
||||||
|
else:
|
||||||
|
# 2.Si la réponse contient une réponse HTTP valide
|
||||||
|
# Extract HTTP response code from the last line
|
||||||
|
# ATTENTION: lines[-1] renvoie l'avant dernière ligne et il peut y avoir un soucis avec le OK
|
||||||
|
# rechercher plutot
|
||||||
|
http_response = lines[-1] # "+UUHTTPCR: 0,4,0"
|
||||||
|
parts = http_response.split(',')
|
||||||
|
|
||||||
|
# 2.1 code 0 (HTTP failed)
|
||||||
|
if len(parts) == 3 and parts[-1] == '0': # The third value indicates success
|
||||||
|
print("*****")
|
||||||
|
print('<span style="color: red;font-weight: bold;">⛔ATTENTION: HTTP operation failed</span>')
|
||||||
|
print("*****")
|
||||||
|
print("Blink red LED")
|
||||||
|
# Run LED blinking in a separate thread
|
||||||
|
led_thread = Thread(target=blink_led, args=(24, 5, 0.5))
|
||||||
|
led_thread.start()
|
||||||
|
|
||||||
|
# Get error code
|
||||||
|
print("Getting error code", end="")
|
||||||
|
command = f'AT+UHTTPER={uSpot_profile_id}\r'
|
||||||
|
ser_sara.write(command.encode('utf-8'))
|
||||||
|
response_SARA_9b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
|
||||||
|
print('<p class="text-danger-emphasis">')
|
||||||
|
print(response_SARA_9b)
|
||||||
|
print("</p>", end="")
|
||||||
|
# Extract just the error code
|
||||||
|
error_code = extract_error_code(response_SARA_9b)
|
||||||
|
if error_code is not None:
|
||||||
|
# Display interpretation based on error code
|
||||||
|
if error_code == 0:
|
||||||
|
print('<p class="text-success">No error detected</p>')
|
||||||
|
# INVALID SERVER HOSTNAME
|
||||||
|
elif error_code == 4:
|
||||||
|
print('<p class="text-danger">Error 4: Invalid server Hostname</p>', end="")
|
||||||
|
send_error_notification(device_id, "UHTTPER (4) uSpot Invalid server Hostname")
|
||||||
|
server_hostname_resets = reset_server_hostname_https(uSpot_profile_id)
|
||||||
|
if server_hostname_resets:
|
||||||
|
print("✅server hostname reset successfully")
|
||||||
|
else:
|
||||||
|
print("⛔There were issues with the modem server hostname reinitialize process")
|
||||||
|
# SERVER CONNECTION ERROR
|
||||||
|
elif error_code == 11:
|
||||||
|
print('<p class="text-danger">Error 11: Server connection error</p>', end="")
|
||||||
|
elif error_code == 22:
|
||||||
|
print('<p class="text-danger">Error 22: PSD or CSD connection not established</p>', end="")
|
||||||
|
elif error_code == 26:
|
||||||
|
print('<p class="text-danger">Error 26: Connection timed out</p>')
|
||||||
|
elif error_code == 44:
|
||||||
|
print('<p class="text-danger">Error 44: Connection lost</p>')
|
||||||
|
elif error_code == 73:
|
||||||
|
print('<p class="text-danger">Error 73: Secure socket connect error</p>', end="")
|
||||||
|
send_error_notification(device_id, "uSpot - Secure socket connect error")
|
||||||
|
#Software Reboot ??
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(f'<p class="text-danger">Unknown error code: {error_code}</p>',end="")
|
||||||
|
else:
|
||||||
|
print('<p class="text-danger">Could not extract error code from response</p>', end="")
|
||||||
|
|
||||||
|
#Pas forcément un moyen de résoudre le soucis
|
||||||
|
|
||||||
|
# 2.2 code 1 (✅✅HHTP / UUHTTPCR succeded✅✅)
|
||||||
|
else:
|
||||||
|
# Si la commande HTTP a réussi
|
||||||
|
print('<span style="font-weight: bold;">✅✅HTTP operation successful.</span>')
|
||||||
|
print("Blink blue LED")
|
||||||
|
led_thread = Thread(target=blink_led, args=(23, 5, 0.5))
|
||||||
|
led_thread.start()
|
||||||
|
|
||||||
|
#4. Read reply from server
|
||||||
|
print("Reply from server:")
|
||||||
|
command = f'AT+URDFILE="uSpot_server_response.txt"\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_4b = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
|
||||||
|
print('<p class="text-success">')
|
||||||
|
print(response_SARA_4b)
|
||||||
|
print("</p>", end="")
|
||||||
|
|
||||||
|
# Initialize http_response_code to 0 as a default value
|
||||||
|
http_response_code = 0
|
||||||
|
|
||||||
|
# Safely extract HTTP code
|
||||||
|
try:
|
||||||
|
http_prefix = "HTTP/"
|
||||||
|
# response_SARA_4b is a string, not a function - use .find() method
|
||||||
|
http_pos = response_SARA_4b.find(http_prefix)
|
||||||
|
|
||||||
|
if http_pos != -1:
|
||||||
|
# Find the space after the HTTP version
|
||||||
|
space_pos = response_SARA_4b.find(" ", http_pos)
|
||||||
|
if space_pos != -1:
|
||||||
|
# Extract the code after the space
|
||||||
|
code_start = space_pos + 1
|
||||||
|
code_end = response_SARA_4b.find(" ", code_start)
|
||||||
|
if code_end != -1:
|
||||||
|
# Extract and convert to integer
|
||||||
|
http_code_str = response_SARA_4b[code_start:code_end]
|
||||||
|
http_response_code = int(http_code_str)
|
||||||
|
print(f"HTTP response code: {http_response_code}")
|
||||||
|
if http_response_code == 201:
|
||||||
|
print('<span style="font-weight: bold;">✅✅HTTP 201 ressource created.</span>')
|
||||||
|
elif http_response_code == 308:
|
||||||
|
print('<span style="font-weight: bold;"> ⚠️⚠️HTTP 308 Redirect, need to set up HTTPS.</span>')
|
||||||
|
server_hostname_resets = reset_server_hostname_https(uSpot_profile_id)
|
||||||
|
if server_hostname_resets:
|
||||||
|
print("✅server hostname reset successfully")
|
||||||
|
else:
|
||||||
|
print("⛔There were issues with the modem server hostname reinitialize process")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# If any error occurs during parsing, keep the default value
|
||||||
|
print(f"Error parsing HTTP code: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#5. empty json
|
||||||
|
print("Empty SARA memory:")
|
||||||
|
command = f'AT+UDELFILE="sensordata_json.json"\r'
|
||||||
|
ser_sara.write((command + '\r').encode('utf-8'))
|
||||||
|
response_SARA_9t = read_complete_response(ser_sara, wait_for_lines=["OK"], debug=False)
|
||||||
|
print(response_SARA_9t)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Calculate and print the elapsed time
|
# Calculate and print the elapsed time
|
||||||
elapsed_time = time.time() - start_time_script
|
elapsed_time = time.time() - start_time_script
|
||||||
|
|||||||
Reference in New Issue
Block a user