import serial import time import sys import sqlite3 import datetime SERIAL_PORT = "COM3" #COM port pro ESP32 (každy bude mít jiný) BAUD_RATE = 115200 DB_FILE = "dht_data.sqlite" TABLE_NAME = "readings" def setup_database(): conn = None try: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(f''' CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, temperature REAL, humidity REAL) ''') conn.commit() print(f"Databaze {DB_FILE} a tabulka {TABLE_NAME} jsou ready") except sqlite3.Error as e: print(f"Chyba při práci s db: {e}") finally: if conn: conn.close() def insert_data(temp, hum): conn = None try: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(f''' INSERT INTO {TABLE_NAME} (temperature, humidity) VALUES (?, ?)''',(temp,hum)) conn.commit() print(f"Data uložena: T:{temp} & H:{hum}") except sqlite3.Error as e: print(f"Chyba při práci s db: {e}") finally: if conn: conn.close() def read_from_esp32(): setup_database() ser = None print(f"čtu {SERIAL_PORT} port s rychloti {BAUD_RATE}") try: ser = serial.Serial(SERIAL_PORT,BAUD_RATE,timeout=1) print("Připojeno") time.sleep(2) #po přípojení začneme do "nekonečna" číst řádky while True: try: line_bytes = ser.readline() if line_bytes: line_str = line_bytes.decode("utf-8").strip() print(f"Přijato: {line_str}") if line_str.startswith("T:") and ",H:" in line_str: parts = line_str.split(",") #-> [0]:"T:XX.X", [1]:"H:YY.Y" temp_str = parts[0].split(":")[1] #->[0]:"T" [1]:"XX.X" hum_str = parts[1].split(":")[1] #->[0]:"H" [1]:"YY.Y" try: temperature = float(temp_str) humidity = float(hum_str) insert_data(temperature,humidity) except ValueError: print(f"chyba při převedení -> T:{temp_str}, H:{hum_str}") else: if line_str: print(f"Neznámy format pro DB: {line_str}") except UnicodeDecodeError: print("Chyba: Přijatá data nelze dekodovat") except serial.SerialException as e: print(f"Chyba seriove komunikace: {e} \n END") break except Exception as e: print(f"neznámá chyba: {e}") time.sleep(1) except serial.SerialException as e: print(f"Chyba: nepodařilo se otevřít port {SERIAL_PORT}") except KeyboardInterrupt: print("Program ukončen uživatelem (CTRL+C)") finally: #čast programu která se provede vždy normálním ukončení nebo chybou if ser and ser.is_open: ser.close() print("Seriovy prot byl uzavřen") if __name__ == "__main__": read_from_esp32()