<===
2025-11-01 09:02:12
#version rp-baro-gps
import gc
import time
import board
import busio
import digitalio
import TM1637
import adafruit_ds1307
from adafruit_bme280 import basic as adafruit_bme280
from lcd.lcd import LCD, CursorMode
from lcd.i2c_pcf8574_interface import I2CPCF8574Interface
from myalarms import alarms
import microcontroller
import struct
# Переменные для корректировки времени (fallback)
SECONDS_TO_ADD_PER_WEEK = 2
DAYS_BEFORE_ADJUSTMENT = 5
# Глобальные переменные
I2C_SDA_PIN = board.GP15
I2C_SCL_PIN = board.GP14
TM1637_CLK_PIN = board.GP2 # ← ИЗМЕНЕНО: GP2/3 для GPS UART
TM1637_DIO_PIN = board.GP3
BEEPER_PIN = board.GP12
BME280_ADDRESS = 0x76
LCD_ADDRESS = 0x27
RTC_ADDRESS = 0x68
CHECK_ADJUSTMENT_INTERVAL = 18000 # 5ч
HOURLY_BEEP_START_HOUR = 8
HOURLY_BEEP_END_HOUR = 22
MORSE_WPM = 18
PRESSURE_CORRECTION = 32.5
MAIN_LOOP_DELAY = 0.2
# GPS UART (GP0 RX, GP1 TX)
GPS_UART_RX = board.GP0
GPS_UART_TX = board.GP1
GPS_BUFFER_SIZE = 2048
# Функция проверки I2C
def wait_for_i2c_device(i2c, address, timeout=5):
start_time = time.monotonic()
while time.monotonic() - start_time < timeout:
try:
i2c.try_lock()
if address in i2c.scan():
i2c.unlock()
return True
i2c.unlock()
except:
pass
time.sleep(1)
return False
# Инициализация
try:
i2c = busio.I2C(I2C_SDA_PIN, I2C_SCL_PIN)
except Exception as e:
print(f"I2C error: {e}")
i2c = None
try:
ldisplay = TM1637.TM1637(TM1637_CLK_PIN, TM1637_DIO_PIN)
ldisplay.brightness(0)
except:
ldisplay = None
# RTC
rtc = None
if i2c and wait_for_i2c_device(i2c, RTC_ADDRESS):
try:
rtc = adafruit_ds1307.DS1307(i2c)
except:
pass
else:
print("No RTC")
# BME280
bme280 = None
if i2c and wait_for_i2c_device(i2c, BME280_ADDRESS):
try:
bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c, address=BME280_ADDRESS)
except:
pass
# LCD
lcd = None
if i2c and wait_for_i2c_device(i2c, LCD_ADDRESS):
try:
from lcd.i2c_pcf8574_interface import I2CPCF8574Interface
interface = I2CPCF8574Interface(i2c, LCD_ADDRESS)
lcd = LCD(interface, num_rows=2, num_cols=16)
lcd.set_cursor_mode(CursorMode.HIDE)
lcd.clear()
except:
pass
# Beeper
beep = None
try:
beep = digitalio.DigitalInOut(BEEPER_PIN)
beep.direction = digitalio.Direction.OUTPUT
except:
pass
# GPS UART
gps_uart = busio.UART(GPS_UART_RX, GPS_UART_TX, baudrate=9600, timeout=0.1)
gps_buffer = bytearray()
gps_last_time = None
gps_sats = 0
gps_hdop = 99
gps_fix = 0
# PMTK init GPS (RMC/GGA/GSV/GSA only)
gps_uart.write(b"$PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0*28\r\n") # Time pos
gps_uart.write(b"$PMTK101*32\r\n") # Cold start (optional)
# NVM funcs (unchanged)
def save_last_adjustment_date():
global cached_last_adjustment, last_nvm_read_time
if rtc:
current_time = rtc.datetime
date_bytes = struct.pack('>HBB', current_time.tm_year, current_time.tm_mon, current_time.tm_mday)
microcontroller.nvm[0:4] = date_bytes
cached_last_adjustment = current_time
last_nvm_read_time = time.time()
cached_last_adjustment = None
last_nvm_read_time = 0
def load_last_adjustment_date():
global cached_last_adjustment, last_nvm_read_time
current_time = time.time()
if cached_last_adjustment is None or (current_time - last_nvm_read_time) > 3600:
try:
date_bytes = microcontroller.nvm[0:4]
unpacked = struct.unpack('>HBB', date_bytes)
cached_last_adjustment = time.struct_time((unpacked[0], unpacked[1], unpacked[2], 0,0,0,0,0,-1))
last_nvm_read_time = current_time
except:
cached_last_adjustment = rtc.datetime if rtc else time.localtime()
last_nvm_read_time = current_time
return cached_last_adjustment
# FIXED adjust_time
def adjust_time(seconds_to_add):
if rtc:
current_time = rtc.datetime
total_sec = time.mktime(current_time) + seconds_to_add
new_time = time.localtime(total_sec)
rtc.datetime = new_time
print(f"Manual +{seconds_to_add}s")
save_last_adjustment_date()
def check_time_adjustment():
if rtc:
t = rtc.datetime
last = load_last_adjustment_date()
days = (time.mktime(t) - time.mktime(last)) // 86400
if days >= DAYS_BEFORE_ADJUSTMENT:
weeks = days // DAYS_BEFORE_ADJUSTMENT
adjust_time(weeks * SECONDS_TO_ADD_PER_WEEK)
check_time_adjustment() # Boot adjust
# **GPS NMEA PARSER** (simplified)
def verify_checksum(line):
try:
if '*' not in line or line[-2:] != '\r\n': return False
data, chk = line[1:].split('*',1)
calc = 0
for c in data: calc ^= ord(c)
return calc == int(chk[:2],16)
except: return False
def parse_gps_time(line):
if '$GPGGA' in line or '$GPRMC' in line:
fields = line.split(',')
t = fields[1]
if len(t) >= 6: return f"{t[:2]}:{t[2:4]}:{t[4:6]}"
return None
def parse_gps_sats(line):
if '$GPGSV' in line:
fields = line.split(',')
for i in range(4, len(fields)-1, 4):
snr = fields[i+3] if i+3 < len(fields) else '0'
if snr.isdigit() and int(snr) > 20: return 1 # Count strong
return 0
def parse_gps_fix(line):
if '$GPGSA' in line:
fields = line.split(',')
fix = fields[2]
hdop = float(fields[16]) if len(fields)>16 else 99
return int(fix) if fix.isdigit() else 0, hdop
return 0, 99
# **GPS SYNC RTC** ← ГЛАВНАЯ ФИЧА!
def gps_sync_rtc():
global gps_last_time, gps_sats, gps_hdop, gps_fix, gps_buffer
if not rtc: return
# Parse buffer
lines = gps_buffer.split(b'\n')
gps_time_str = None
sats = 0
fix, hdop = 0, 99
for line_b in lines[-10:]: # Last 10 lines
line = line_b.decode('ascii', errors='ignore').strip()
if not line or not verify_checksum(line + '\r\n'): continue
gps_time_str = parse_gps_time(line) or gps_time_str
sats += parse_gps_sats(line)
f, h = parse_gps_fix(line)
if f > fix: fix, hdop = f, h
gps_sats = sats
gps_fix = fix
gps_hdop = hdop
if gps_time_str and fix >= 3 and hdop < 2 and sats >= 4:
# GPS time → RTC
h,m,s = map(int, gps_time_str.split(':'))
now = rtc.datetime
gps_dt = time.struct_time((now.tm_year, now.tm_mon, now.tm_mday, h, m, s, now.tm_wday, 0, 0))
rtc.datetime = gps_dt
gps_last_time = gps_time_str
beeper(3, 0.05, 0.05) # Short beeps OK
print(f"**GPS SYNC: {gps_time_str} | {sats}sats HDOP:{hdop:.1f}**")
save_last_adjustment_date()
else:
print(f"GPS weak: fix={fix} hdop={hdop} sats={sats}")
# **GPS POLL** (в loop)
def poll_gps():
global gps_buffer
data = gps_uart.read(256)
if data:
gps_buffer.extend(data)
if len(gps_buffer) > GPS_BUFFER_SIZE:
gps_buffer = gps_buffer[-GPS_BUFFER_SIZE:]
# Show
print_counter = 0
def myshow():
global print_counter, curent_time
t = curent_time
hour = f"{t.tm_hour:02d}"
minute = f"{t.tm_min:02d}"
if ldisplay:
ldisplay.show(hour + minute)
try:
if bme280:
temp = bme280.temperature
hum = bme280.relative_humidity
press = bme280.pressure + PRESSURE_CORRECTION
P0 = 1013.25
alt = 44330 * (1 - (press / P0) ** (1/5.255)) + 330
st1 = f"{temp:5.1f}C {hum:4.1f}% {t.tm_sec:02d}"
st2 = f"{press:5.1f}hP {t.tm_year-2000:02d}{t.tm_mon:02d}{t.tm_mday:02d}"
if lcd:
lcd.set_cursor_pos(0, 0)
lcd.print(st1[:16])
lcd.set_cursor_pos(1, 0)
lcd.print(st2[:16])
# GPS status
gps_st = f"GPS:{gps_sats}s{gps_fix}"
lcd.set_cursor_pos(1, 11)
lcd.print(gps_st[:5])
if print_counter % 25 == 0: # 5s throttle
last_adj = load_last_adjustment_date()
print(f"{t.tm_year:04d}-{t.tm_mon:02d}-{t.tm_mday:02d} {hour}:{t.tm_min:02d}:{t.tm_sec:02d} "
f"{temp:.1f}C {hum:.1f}% {press:.1f}hPa {alt:.0f}m GPS:{gps_sats}sats/{gps_hdop:.1f} "
f"NVM:{last_adj.tm_year:04d}-{last_adj.tm_mon:02d}-{last_adj.tm_mday:02d}")
except Exception as e:
if print_counter % 25 == 0: print(f"Show err: {e}")
print_counter += 1
# Beeper/Morse (unchanged)
def beeper(bcount=1, tbeep=0.1, tdelay=0):
if beep:
for _ in range(bcount):
beep.value = True
time.sleep(tbeep)
beep.value = False
time.sleep(tdelay)
morse_dict = {
'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.', 'F': '..-.',
'G': '--.', 'H': '....', 'I': '..', 'J': '.---', 'K': '-.-', 'L': '.-..',
'M': '--', 'N': '-.', 'O': '---', 'P': '.--.', 'Q': '--.-', 'R': '.-.',
'S': '...', 'T': '-', 'U': '..-', 'V': '...-', 'W': '.--', 'X': '-..-',
'Y': '-.--', 'Z': '--..', '0': '-----', '1': '.----', '2': '..---',
'3': '...--', '4': '....-', '5': '.....', '6': '-....', '7': '--...',
'8': '---..', '9': '----.', '7': '--...', '3': '73'
}
def morse_transmitter(msg, wpm=MORSE_WPM):
dot = 1.2 / wpm
morse = ''.join(morse_dict.get(c.upper(), '') + ' ' for c in msg)
for sym in morse:
if sym == '.': beeper(1, dot, dot)
elif sym == '-': beeper(1, 3*dot, dot)
else: time.sleep(3*dot)
# Alarms/Hourly
hourly_beep_played = False
def check_alarms():
t = curent_time
for alarm in alarms:
if (t.tm_hour, t.tm_min) == alarm["time"] and (alarm["days"] is None or t.tm_wday in alarm["days"]):
beeper(1, 0.05)
print(f"ALARM {alarm['time']}")
def check_hourly_beep():
global hourly_beep_played
t = curent_time
if t.tm_min == 0 and not hourly_beep_played:
if HOURLY_BEEP_START_HOUR <= t.tm_hour < HOURLY_BEEP_END_HOUR:
**gps_sync_rtc()** # ← ЗДЕСЬ!
if gps_last_time: # Success
morse_transmitter("73") # GPS OK!
else:
morse_transmitter(f"{t.tm_hour:02d}")
hourly_beep_played = True
elif t.tm_min != 0:
hourly_beep_played = False
# Init
curent_time = rtc.datetime if rtc else time.localtime()
beeper()
adjustment_check_counter = 0
# **TEST GPS** (раскоммент для debug)
# gps_sync_rtc()
# morse_transmitter("GPS OK")
# MAIN LOOP
while True:
if rtc:
curent_time = rtc.datetime
else:
curent_time = time.localtime()
poll_gps() # ← GPS poll
myshow()
check_hourly_beep()
**check_alarms()** # ← BEFORE sleep!
time.sleep(MAIN_LOOP_DELAY)
adjustment_check_counter += 1
if adjustment_check_counter * MAIN_LOOP_DELAY >= CHECK_ADJUSTMENT_INTERVAL:
check_time_adjustment()
adjustment_check_counter = 0
gc.collect()