''' ____ _____ _ _ ____ ___ ____ ____ / ___|| ____| \ | / ___| / _ \| _ \/ ___| \___ \| _| | \| \___ \| | | | |_) \___ \ ___) | |___| |\ |___) | |_| | _ < ___) | |____/|_____|_| \_|____/ \___/|_| \_\____/ Script to get SENSORS values And store them inside sqlite database Uses RTC module for timing /usr/bin/python3 /var/www/nebuleair_pro_4g/NPM/get_data_v2.py ''' import serial import requests import json import sys import sqlite3 import smbus2 import time from datetime import datetime # Connect to the SQLite database conn = sqlite3.connect("/var/www/nebuleair_pro_4g/sqlite/sensors.db") cursor = conn.cursor() #RTC module DS3231_ADDR = 0x68 REG_TIME = 0x00 def bcd_to_dec(bcd): return (bcd // 16 * 10) + (bcd % 16) def read_time(bus): """Try to read and decode time from the RTC module (DS3231).""" try: data = bus.read_i2c_block_data(DS3231_ADDR, REG_TIME, 7) seconds = bcd_to_dec(data[0] & 0x7F) minutes = bcd_to_dec(data[1]) hours = bcd_to_dec(data[2] & 0x3F) day = bcd_to_dec(data[4]) month = bcd_to_dec(data[5]) year = bcd_to_dec(data[6]) + 2000 return datetime(year, month, day, hours, minutes, seconds) except OSError: return None # RTC module not connected def load_config(config_file): try: with open(config_file, 'r') as file: config_data = json.load(file) return config_data except Exception as e: print(f"Error loading config file: {e}") return {} # Load the configuration data config_file = '/var/www/nebuleair_pro_4g/config.json' config = load_config(config_file) npm_solo_port = config.get('NPM_solo_port', '') #port du NPM solo ser = serial.Serial( port=npm_solo_port, baudrate=115200, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 0.5 ) #ser.write(b'\x81\x11\x6E') #data10s ser.write(b'\x81\x12\x6D') #data60s while True: try: #print("Start get_data_v2.py script") byte_data = ser.readline() #print(byte_data) stateByte = int.from_bytes(byte_data[2:3], byteorder='big') Statebits = [int(bit) for bit in bin(stateByte)[2:].zfill(8)] PM1 = int.from_bytes(byte_data[9:11], byteorder='big')/10 PM25 = int.from_bytes(byte_data[11:13], byteorder='big')/10 PM10 = int.from_bytes(byte_data[13:15], byteorder='big')/10 #print(f"State: {Statebits}") #print(f"PM1: {PM1}") #print(f"PM25: {PM25}") #print(f"PM10: {PM10}") #create JSON data = { 'PM1': PM1, 'PM25': PM25, 'PM10': PM10, 'sleep' : Statebits[0], 'degradedState' : Statebits[1], 'notReady' : Statebits[2], 'heatError' : Statebits[3], 't_rhError' : Statebits[4], 'fanError' : Statebits[5], 'memoryError' : Statebits[6], 'laserError' : Statebits[7] } json_data = json.dumps(data) #print(json_data) #GET RTC TIME # Read RTC time bus = smbus2.SMBus(1) # Try to read RTC time rtc_time = read_time(bus) if rtc_time: rtc_time_str = rtc_time.strftime('%Y-%m-%d %H:%M:%S') #print(rtc_time_str) else: print("Error! RTC module not connected") rtc_time_str = "1970-01-01 00:00:00" # Default fallback time #save to sqlite database cursor.execute(''' INSERT INTO data (timestamp,PM1, PM25, PM10) VALUES (?,?,?,?)''' , (rtc_time_str,PM1,PM25,PM10)) # Commit and close the connection conn.commit() #print("Sensor data saved successfully!") break # Exit loop after successful execution except KeyboardInterrupt: print("User interrupt encountered. Exiting...") time.sleep(3) exit() except Exception as e: print(f"Error: {e}") # Show actual error time.sleep(3) exit() conn.close()