import serial import time import sys parameter = sys.argv[1:] # Exclude the script name port='/dev/'+parameter[0] # Mapping dictionaries COMPOUND_MAP = { 'A': 'Ammonia', 'B': 'Benzene', 'C': 'Carbon Monoxide', 'D': 'Hydrogen Sulfide', 'E': 'Ethylene', 'F': 'Formaldehyde', 'G': 'Gasoline', 'H': 'Hydrogen', 'I': 'Isobutylene', 'J': 'Jet Fuel', 'K': 'Kerosene', 'L': 'Liquified Petroleum Gas', 'M': 'Methane', 'N': 'Nitrogen Dioxide', 'O': 'Ozone', 'P': 'Propane', 'Q': 'Quinoline', 'R': 'Refrigerant', 'S': 'Sulfur Dioxide', 'T': 'Toluene', 'U': 'Uranium Hexafluoride', 'V': 'Vinyl Chloride', 'W': 'Water Vapor', 'X': 'Xylene', 'Y': 'Yttrium', 'Z': 'Zinc' } RANGE_MAP = { 'A': '0-10 ppm', 'B': '0-250 ppb', 'C': '0-1000 ppm', 'D': '0-50 ppm', 'E': '0-100 ppm', 'F': '0-5 ppm', 'G': '0-500 ppm', 'H': '0-2000 ppm', 'I': '0-200 ppm', 'J': '0-300 ppm', 'K': '0-400 ppm', 'L': '0-600 ppm', 'M': '0-800 ppm', 'N': '0-20 ppm', 'O': '0-1 ppm', 'P': '0-5000 ppm', 'Q': '0-150 ppm', 'R': '0-750 ppm', 'S': '0-25 ppm', 'T': '0-350 ppm', 'U': '0-450 ppm', 'V': '0-550 ppm', 'W': '0-650 ppm', 'X': '0-850 ppm', 'Y': '0-950 ppm', 'Z': '0-1500 ppm' } INTERFACE_MAP = { 0x01: 'USB', 0x02: 'UART', 0x03: 'I2C', 0x04: 'SPI' } def parse_cairsens_data(hex_data): """ Parse the extracted hex data from CAIRSENS sensor. :param hex_data: Hexadecimal string of extracted data (indices 11-28) :return: Dictionary with parsed information """ # Convert hex to bytes for easier processing raw_bytes = bytes.fromhex(hex_data) # Initialize result dictionary result = { 'device_type': 'Unknown', 'compound': 'Unknown', 'range': 'Unknown', 'interface': 'Unknown', 'raw_data': hex_data } if len(raw_bytes) >= 4: # Ensure we have at least 4 bytes # First byte: Device type check first_char = chr(raw_bytes[0]) if 0x20 <= raw_bytes[0] <= 0x7E else '?' if first_char == 'C': result['device_type'] = 'CAIRCLIP' else: result['device_type'] = f'Unknown ({first_char})' # Second byte: Compound mapping second_char = chr(raw_bytes[1]) if 0x20 <= raw_bytes[1] <= 0x7E else '?' result['compound'] = COMPOUND_MAP.get(second_char, f'Unknown ({second_char})') # Third byte: Range mapping third_char = chr(raw_bytes[2]) if 0x20 <= raw_bytes[2] <= 0x7E else '?' result['range'] = RANGE_MAP.get(third_char, f'Unknown ({third_char})') # Fourth byte: Interface (raw byte value) interface_byte = raw_bytes[3] result['interface'] = INTERFACE_MAP.get(interface_byte, f'Unknown (0x{interface_byte:02X})') result['interface_raw'] = f'0x{interface_byte:02X}' return result def read_cairsens(port, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, databits=serial.EIGHTBITS, timeout=1): """ Lit les données de la sonde CAIRSENS via UART. /usr/bin/python3 /var/www/nebuleair_pro_4g/envea/read_ref_v2.py ttyAMA4 :param port: Le port série utilisé (ex: 'COM1' ou '/dev/ttyAMA0'). :param baudrate: Le débit en bauds (ex: 9600). :param parity: Le bit de parité (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD). :param stopbits: Le nombre de bits de stop (serial.STOPBITS_ONE, serial.STOPBITS_TWO). :param databits: Le nombre de bits de données (serial.FIVEBITS, serial.SIXBITS, serial.SEVENBITS, serial.EIGHTBITS). :param timeout: Temps d'attente maximal pour la lecture (en secondes). :return: Les données reçues sous forme de chaîne de caractères. """ try: # Ouvrir la connexion série ser = serial.Serial( port=port, baudrate=baudrate, parity=parity, stopbits=stopbits, bytesize=databits, timeout=timeout ) print(f"Connexion ouverte sur {port} à {baudrate} bauds.") # Attendre un instant pour stabiliser la connexion time.sleep(2) # Envoyer une commande à la sonde (si nécessaire) # Adapter cette ligne selon la documentation de la sonde #ser.write(b'\r\n') ser.write(b'\xFF\x02\x13\x30\x01\x02\x03\x04\x05\x06\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x1C\xD1\x61\x03') # Lire les données reçues data = ser.readline() print(f"Données reçues brutes : {data}") # Convertir les données en hexadécimal hex_data = data.hex() # Convertit en chaîne hexadécimale formatted_hex = ' '.join(hex_data[i:i+2] for i in range(0, len(hex_data), 2)) # Formate avec des espaces print(f"Données reçues en hexadécimal : {formatted_hex}") # Extraire les valeurs de l'index 11 à 28 (indices 22 à 56 en hex string) extracted_hex = hex_data[22:56] # Each byte is 2 hex chars, so 11*2=22 to 28*2=56 print(f"Valeurs hexadécimales extraites (11 à 28) : {extracted_hex}") # Parse the extracted data parsed_data = parse_cairsens_data(extracted_hex) # Display parsed information print("\n=== CAIRSENS SENSOR INFORMATION ===") print(f"Device Type: {parsed_data['device_type']}") print(f"Compound: {parsed_data['compound']}") print(f"Range: {parsed_data['range']}") print(f"Interface: {parsed_data['interface']} ({parsed_data.get('interface_raw', 'N/A')})") print(f"Raw Data: {parsed_data['raw_data']}") print("=====================================") # Convertir en ASCII et en valeurs numériques (pour debug) if extracted_hex: raw_bytes = bytes.fromhex(extracted_hex) ascii_data = ''.join(chr(b) if 0x20 <= b <= 0x7E else '.' for b in raw_bytes) print(f"Valeurs converties en ASCII : {ascii_data}") numeric_values = [b for b in raw_bytes] print(f"Valeurs numériques : {numeric_values}") # Fermer la connexion ser.close() print("Connexion fermée.") return parsed_data except serial.SerialException as e: print(f"Erreur de connexion série : {e}") return None except Exception as e: print(f"Erreur générale : {e}") return None # Exemple d'utilisation if __name__ == "__main__": port = port # Remplacez par votre port série (ex: /dev/ttyAMA0 sur Raspberry Pi) baudrate = 9600 # Débit en bauds (à vérifier dans la documentation) parity = serial.PARITY_NONE # Parité (NONE, EVEN, ODD) stopbits = serial.STOPBITS_ONE # Bits de stop (ONE, TWO) databits = serial.EIGHTBITS # Bits de données (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS) data = read_cairsens(port, baudrate, parity, stopbits, databits) if data: print(f"\nRésultat final : {data}")