Files
moduleair_pro_4g/MH-Z19/write_data.py
Your Name d0e1ad18e9 update
2025-02-26 10:39:00 +01:00

127 lines
3.2 KiB
Python
Executable File

'''
____ ___ ____
/ ___/ _ \___ \
| | | | | |__) |
| |__| |_| / __/
\____\___/_____|
Script to get CO2 values and write it to the database
/usr/bin/python3 /var/www/moduleair_pro_4g/MH-Z19/write_data.py
'''
import serial
import requests
import json
import sys
import subprocess
import time
import sqlite3
# Connect to the SQLite database
conn = sqlite3.connect("/var/www/moduleair_pro_4g/sqlite/sensors.db")
cursor = conn.cursor()
def load_config(config_file):
try:
with open(config_file, 'r') as file:
config_data = json.load(file)
return config_data
except Exception as e:
print(f"Error loading config file: {e}")
return {}
# Load the configuration data
config_file = '/var/www/moduleair_pro_4g/config.json'
config = load_config(config_file)
mh_z19_port = config.get('MH-Z19_port', '') #port du NPM solo
ser = serial.Serial(
port=mh_z19_port,
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 1
)
# Command to read CO2 concentration
# Contains 9 bytes (byte 0 ~ 8)
# FIRST byte: fixed to 0xFF
# SECOND byte: sensor number (factory default is 0 x01)
# THIRD byte: 0x86 Gas Concentration
# end with proof test value Checksum
READ_CO2_COMMAND = b'\xFF\x01\x86\x00\x00\x00\x00\x00\x79'
file_path = "/var/www/moduleair_pro_4g/matrix/input.txt"
def read_co2():
# Send the read command to the MH-Z19
ser.write(READ_CO2_COMMAND)
# Wait for the response from the sensor
time.sleep(2) # Wait for the sensor to respond
# Read the response from the sensor (9 bytes expected)
response = ser.read(9)
# Print the response to debug
print(f"Response: {response}")
# Check if the response is valid (the first byte should be 0xFF)
if len(response) < 9:
print("Error: No data or incomplete data received.")
return None
if response[0] == 0xFF:
# Extract the CO2 concentration value (byte 2 and 3)
co2_concentration = response[2] * 256 + response[3]
return co2_concentration
else:
print("Error reading data from sensor.")
return None
def main():
try:
co2 = read_co2()
if co2 is not None:
print(f"CO2 Concentration: {co2} ppm")
#save to file
output_file = "/var/www/moduleair_pro_4g/matrix/input_MHZ16.txt"
with open(output_file, "w") as file:
file.write(f"{co2} \n")
print(f"Data written to {output_file}")
#GET RTC TIME from SQlite
cursor.execute("SELECT * FROM timestamp_table LIMIT 1")
row = cursor.fetchone() # Get the first (and only) row
rtc_time_str = row[1] # '2025-02-07 12:30:45'
#save to sqlite
cursor.execute('''
INSERT INTO data_CO2 (timestamp,CO2) VALUES (?,?)'''
, (rtc_time_str,co2))
# Commit and close the connection
conn.commit()
else:
print("Failed to get CO2 data.")
except KeyboardInterrupt:
print("Program terminated.")
finally:
ser.close()
conn.close()
if __name__ == '__main__':
main()