Files
nebuleair_pro_4g/RTC/save_to_db.py
PaulVua 7ad133f446 v1.7.4: Source de verite unique pour les services + update self-healing
setup_services.sh devient la source unique pour les services systemd
(le service rtc_save_to_db etait auparavant cree inline dans
installation_part2.sh, en doublon avec un commentaire dans save_to_db.py).

update_firmware.sh appelle maintenant setup_services.sh apres le git
pull. Resultat: les capteurs deja deployes peuvent se reparer tout
seuls au prochain update firmware (services manquants, masques, ou
nouveaux services ajoutes au repo apres l'installation initiale).

Defensif: systemctl unmask sur rtc_save_to_db avant creation du
fichier .service, pour eviter d'ecrire dans /dev/null si le service
avait ete masque (cas observe sur un capteur en production).

Pas de risque sur les capteurs sains: reecriture des .service avec
le meme contenu, comportement inchange.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 17:33:54 +02:00

99 lines
3.1 KiB
Python
Executable File

'''
____ _____ ____
| _ \_ _/ ___|
| |_) || || |
| _ < | || |___
|_| \_\|_| \____|
Script to read time from RTC module and save it to DB
I2C connection - Address 0x68
Runs as a long-running systemd service (rtc_save_to_db.service).
The service file is created by services/setup_services.sh — single source of truth.
'''
import smbus2
import time
import json
from datetime import datetime
import sqlite3
# DS3231 I2C address
DS3231_ADDR = 0x68
# Registers for DS3231
REG_TIME = 0x00
# Connect to (or create if not existent) the database
DB_PATH = "/var/www/nebuleair_pro_4g/sqlite/sensors.db"
def bcd_to_dec(bcd):
return (bcd // 16 * 10) + (bcd % 16)
def read_time(bus):
"""Try to read and decode time from the RTC module (DS3231)."""
try:
data = bus.read_i2c_block_data(DS3231_ADDR, REG_TIME, 7)
seconds = bcd_to_dec(data[0] & 0x7F)
minutes = bcd_to_dec(data[1])
hours = bcd_to_dec(data[2] & 0x3F)
day = bcd_to_dec(data[4])
month = bcd_to_dec(data[5])
year = bcd_to_dec(data[6]) + 2000
return datetime(year, month, day, hours, minutes, seconds)
except OSError:
return None # RTC module not connected
def main():
# Read RTC time
bus = smbus2.SMBus(1)
while True:
# Open a new database connection inside the loop to prevent connection loss
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Try to read RTC time
rtc_time = read_time(bus)
# Get current system time
system_time = datetime.now() #local
utc_time = datetime.utcnow() #UTC
# If RTC is not connected, set default message
# Calculate time difference (in seconds) if RTC is connected
if rtc_time:
rtc_time_str = rtc_time.strftime('%Y-%m-%d %H:%M:%S')
time_difference = int((utc_time - rtc_time).total_seconds()) # Convert to int
else:
rtc_time_str = "not connected"
time_difference = "N/A" # Not applicable
# Print both times
#print(f"RTC module Time: {rtc_time.strftime('%Y-%m-%d %H:%M:%S')}")
#print(f"Sys local Time: {system_time.strftime('%Y-%m-%d %H:%M:%S')}")
#print(f"Sys UTC Time: {utc_time.strftime('%Y-%m-%d %H:%M:%S')}")
# Create JSON output
time_data = {
"rtc_module_time":rtc_time_str,
"system_local_time": system_time.strftime('%Y-%m-%d %H:%M:%S'),
"system_utc_time": utc_time.strftime('%Y-%m-%d %H:%M:%S'),
"time_difference_seconds": time_difference
}
#print(json.dumps(time_data, indent=4))
# Save to database
try:
cursor.execute("UPDATE timestamp_table SET last_updated = ? WHERE id = 1", (rtc_time_str,))
conn.commit()
#print("Sensor data saved successfully!")
except sqlite3.Error as e:
print(f"Database error: {e}")
conn.close() # Close connection to avoid database locking issues
time.sleep(1) # Wait for 1 second before reading again
if __name__ == "__main__":
main()