LogNotes

2025-06-06 11:58:46
Часы в автомобиль ( rp2040-zero ) — компактное устройство для точного отображения времени в машине с использованием GPS.


Ключевые особенности:


Синхронизация времени по GPS (GPGGA/GPRMC) с отключением модуля для экономии энергии.
Автономная работа с точностью <2 секунд за сутки без GPS.
4-значный LED-дисплей (TM1637) с динамическим отображением времени в зависимости от качества сигнала.
Циклическое управление GPS (включение каждые 600 секунд).
Минимальное энергопотребление: <10 мА (без GPS), 30–50 мА (с GPS).
Индикация качества сигнала через яркость дисплея и отладочный лог.
Устойчивость к помехам и слабому сигналу.

Применение: Идеально для автомобилей, особенно в условиях переменного GPS-сигнала (город, тоннели). Работает автономно, с возможностью отладки через последовательный порт.

----------------

$ cat gps-clock.py 
import board
import busio
import time
import TM1637
import asyncio

IS_PRODUCTION = False  # Для отладки
MAX_BUFFER_SIZE = 1000
DATA_TIMEOUT = 20
SYNC_THRESHOLD = 5  # Дельта синхронизации

# Инициализация UART и дисплея
uart = busio.UART(board.GP0, board.GP1, baudrate=9600, timeout=0.1)
display = TM1637.TM1637(board.GP15, board.GP14)
display.brightness(0)
display.show("----")

# Глобальные переменные
raw_buffer = bytearray()
latitude = longitude = altitude = date = gps_time = speed = heading = None
strong_satellites = 0
fix_type = hdop = None
gpgsv_block = []
error_count = 0
last_error_line = None

# Внутренний таймер
base_time = 0  # Секунды с полуночи
base_monotonic = time.monotonic()
last_date = None
last_time_str = "00:00:00"

# Инициализация GPS
uart.write(b"$PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0*28\r\n")

def seconds_since_midnight(time_str):
    try:
        time_str = time_str.replace(':', '').split('.')[0]
        if len(time_str) != 6:
            raise ValueError("Неверная длина времени")
        hours = int(time_str[:2])
        minutes = int(time_str[2:4])
        seconds = int(time_str[4:6])
        return hours * 3600 + minutes * 60 + seconds
    except (ValueError, IndexError) as e:
        if not IS_PRODUCTION:
            print(f"Ошибка парсинга времени: {time_str}, {e}")
        return None

def get_current_time():
    global last_time_str
    elapsed = int(time.monotonic() - base_monotonic)
    current_seconds = base_time + elapsed
    hours = (current_seconds // 3600) % 24
    minutes = (current_seconds // 60) % 60
    seconds = current_seconds % 60
    last_time_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
    return last_time_str, seconds

def sync_internal_clock(gps_time_str):
    global base_time, base_monotonic, last_time_str
    gps_seconds = seconds_since_midnight(gps_time_str)
    if gps_seconds is None:
        return
    
    current_seconds = seconds_since_midnight(get_current_time()[0])
    if current_seconds is None or (gps_seconds > current_seconds and abs(gps_seconds - current_seconds) > SYNC_THRESHOLD):
        base_time = gps_seconds
        base_monotonic = time.monotonic()
        last_time_str = gps_time_str
        if not IS_PRODUCTION:
            print(f"Синхронизация времени: GPS={gps_time_str}, разница={abs(gps_seconds - current_seconds) if current_seconds else 'N/A'}s")

def verify_nmea_checksum(line):
    try:
        if line[-2:] == '\r\n':
            line = line[:-2]
        if len(line) < 4 or line[-3] != '*' or line.count('*') != 1 or len(line.split(',')) < 2:
            return False
        data, checksum = line[1:].split('*')
        calc_checksum = 0
        for char in data:
            calc_checksum ^= ord(char)
        return calc_checksum == int(checksum, 16)
    except Exception as e:
        if not IS_PRODUCTION:
            print(f"Ошибка в verify_nmea_checksum: {e}")
        return False

def parse_gpgga(line):
    try:
        fields = line.split(',')
        if fields[0] != '$GPGGA' or len(fields) < 10:
            return {'time': None}
        
        t = fields[1]
        time_str = f"{t[:2]}:{t[2:4]}:{t[4:6]}" if t and len(t) >= 6 else None
        
        if fields[6] not in ('1', '2') or not fields[2] or not fields[3] or not fields[4] or not fields[5]:
            return {'time': time_str}
        
        lat = float(fields[2]) / 100
        lat_deg = int(lat)
        lat_min = (lat - lat_deg) * 100 / 60
        lat = lat_deg + lat_min
        if fields[3] == 'S':
            lat = -lat

        lon = float(fields[4]) / 100
        lon_deg = int(lon)
        lon_min = (lon - lon_deg) * 100 / 60
        lon = lon_deg + lon_min
        if fields[5] == 'W':
            lon = -lon

        alt = float(fields[9]) if fields[9] else None

        return {'latitude': lat, 'longitude': lon, 'altitude': alt, 'time': time_str}
    except Exception as e:
        if not IS_PRODUCTION:
            print(f"Ошибка в parse_gpgga: {e}")
        return {'time': None}

def parse_gprmc(line):
    try:
        fields = line.split(',')
        if fields[0] != '$GPRMC' or len(fields) < 10:
            return {'time': None, 'date': None}
        
        t = fields[1]
        time_str = f"{t[:2]}:{t[2:4]}:{t[4:6]}" if t and len(t) >= 6 else None
        d = fields[9]
        date_str = f"20{d[4:6]}-{d[2:4]}-{d[:2]}" if d and len(d) >= 6 else None
        spd = None
        hdg = None
        if fields[2] == 'A' and fields[7] and fields[7].replace('.', '').isdigit():
            spd = float(fields[7]) * 1.852
            if spd < 0.1:
                spd = 0.0
            hdg = float(fields[8]) if fields[8] else None

        return {'date': date_str, 'time': time_str, 'speed': spd, 'heading': hdg}
    except Exception as e:
        if not IS_PRODUCTION:
            print(f"Ошибка в parse_gprmc: {e}")
        return {'time': None, 'date': None}

def parse_gpgsv_block(block):
    try:
        satellite_count = 0
        satellite_info = []
        total_msgs = int(block[-1].split(',')[1])
        if len(block) != total_msgs:
            return 0
        for i, line in enumerate(block):
            fields = line.split(',')
            if fields[0] != '$GPGSV' or int(fields[2]) != i + 1:
                return 0
            for j in range(4, len(fields) - 1, 4):
                snr = fields[j + 3] if j + 3 < len(fields) else ''
                if snr and snr.isdigit() and int(snr) > 10:
                    satellite_count += 1
                    if not IS_PRODUCTION:
                        elev = fields[j + 1] if fields[j + 1] else '?'
                        azim = fields[j + 2] if fields[j + 2] else '?'
                        satellite_info.append(f"{fields[j]}:SNR={snr},Elev={elev},Azim={azim}")
        if not IS_PRODUCTION:
            print(f"Спутники с SNR > 10: {satellite_count} ({satellite_info})")
        else:
            print(f"Спутники с SNR > 10: {satellite_count}")
        return satellite_count
    except Exception as e:
        if not IS_PRODUCTION:
            print(f"Ошибка в parse_gpgsv_block: {e}")
        return 0

def parse_gpgsa(line):
    try:
        fields = line.split(',')
        if fields[0] != '$GPGSA' or len(fields) < 18:
            return False
        fix_type = fields[2]
        pdop = float(fields[15]) if fields[15] else None
        hdop = float(fields[16]) if fields[16] else None
        vdop = float(fields[17].split('*')[0]) if fields[17] else None
        return {'fix_type': fix_type, 'pdop': pdop, 'hdop': hdop, 'vdop': vdop}
    except Exception as e:
        if not IS_PRODUCTION:
            print(f"Ошибка в parse_gpgsa: {e}")
        return False

async def read_gps():
    global raw_buffer, latitude, longitude, altitude, gps_time, speed, date, heading
    global strong_satellites, fix_type, hdop, gpgsv_block, error_count, last_error_line
    global last_date
    last_update = time.monotonic()
    
    while True:
        current_time = time.monotonic()
        if current_time - last_update > DATA_TIMEOUT:
            latitude = longitude = altitude = gps_time = date = speed = heading = None
            strong_satellites = None
            fix_type = hdop = None
            last_update = current_time
        
        raw_data = uart.read(200)
        if not raw_data:
            if strong_satellites is not None and strong_satellites < 2:
                if not IS_PRODUCTION:
                    print("Switching off GPS")
                uart.write(b"$PMTK161,0*28\r\n")
                await asyncio.sleep(15)
            await asyncio.sleep(0.1)
            continue

        raw_buffer.extend(raw_data)
        if len(raw_buffer) > MAX_BUFFER_SIZE:
            raw_buffer = raw_buffer[-MAX_BUFFER_SIZE:]

        while b'\n' in raw_buffer:
            index = raw_buffer.find(b'\n')
            line = raw_buffer[:index]
            raw_buffer = raw_buffer[index + 1:]

            try:
                ascii_line = ''.join(chr(b) for b in line if 32 <= b <= 126)
                if ascii_line and ascii_line.startswith('$') and len(ascii_line) > 10:
                    if ascii_line.startswith('$GPTXT'):
                        if not IS_PRODUCTION:
                            print(f"Raw NMEA: {ascii_line}")
                        continue
                    if not verify_nmea_checksum(ascii_line):
                        last_error_line = ascii_line
                        if not IS_PRODUCTION:
                            print(f"Некорректная контрольная сумма: {last_error_line}")
                        continue
                    if not IS_PRODUCTION:
                        print(f"Raw NMEA: {ascii_line}")

                    if ascii_line.startswith('$GPGLL'):
                        continue

                    elif ascii_line.startswith('$GPGGA'):
                        gpgga_data = parse_gpgga(ascii_line)
                        if gpgga_data:
                            if 'latitude' in gpgga_data:
                                latitude = gpgga_data['latitude']
                                longitude = gpgga_data['longitude']
                                altitude = gpgga_data['altitude']
                            if gpgga_data['time']:
                                gps_time = gpgga_data['time']
                                sync_internal_clock(gps_time)
                            last_update = time.monotonic()

                    elif ascii_line.startswith('$GPRMC'):
                        gprmc_data = parse_gprmc(ascii_line)
                        if gprmc_data:
                            last_date = gprmc_data['date']
                            if gprmc_data['time']:
                                gps_time = gprmc_data['time']
                                sync_internal_clock(gps_time)
                            if gprmc_data['speed'] is not None:
                                speed = gprmc_data['speed']
                                heading = gprmc_data['heading']
                            last_update = time.monotonic()

                    elif ascii_line.startswith('$GPGSV'):
                        fields = ascii_line.split(',')
                        total_msgs = int(fields[1])
                        current_msg = int(fields[2])
                        if current_msg == 1:
                            gpgsv_block = [ascii_line]
                        else:
                            gpgsv_block.append(ascii_line)
                        if current_msg == total_msgs:
                            strong_satellites = parse_gpgsv_block(gpgsv_block)

                    elif ascii_line.startswith('$GPGSA'):
                        gpgsa_data = parse_gpgsa(ascii_line)
                        if gpgsa_data:
                            fix_type = gpgsa_data['fix_type']
                            hdop = gpgsa_data['hdop']
                            if not IS_PRODUCTION:
                                print(f"GPGSA: Fix={gpgsa_data['fix_type']}, PDOP={gpgsa_data['pdop']}, HDOP={gpgsa_data['hdop']}, VDOP={gpgsa_data['vdop']}")
                            if strong_satellites is None:
                                strong_satellites = 0

            except Exception as e:
                error_count += 1
                if not IS_PRODUCTION:
                    print(f"Ошибка обработки #{error_count}: {e}")
                last_error_line = ascii_line

        await asyncio.sleep(0.01)

async def update_display():
    while True:
        current_time, seconds = get_current_time()
        if current_time:
            if last_date:
                print(f"{last_date} {current_time}", end="")
            else:
                print(f"---- {current_time}", end="")
            display.brightness(0)
            if strong_satellites is not None and strong_satellites < 4:
                display.show(current_time.replace(":", "")[:4])  # HHMM
                await asyncio.sleep(1)
                display.show(f"  {seconds:02d}")  # Секунды: SS с двумя пробелами
                await asyncio.sleep(1)
            else:
                display.show(current_time.replace(":", "")[:4])  # HHMM
                await asyncio.sleep(2)
            
            if fix_type == '3' and hdop is not None and hdop <= 8:
                lon_str = f"Lon: {longitude:.6f}" if longitude is not None else "Lon: --"
                lat_str = f"Lat: {latitude:.6f}" if latitude is not None else "Lat: --"
                alt_str = f"Alt: {altitude:.1f}m" if altitude is not None else "Alt: --"
                spd_str = f"Spd: {speed:.1f}km/h" if speed is not None else "Spd: --"
                fix_str = f"Fix: {fix_type}" if fix_type else "Fix: --"
                hdop_str = f"HDOP: {hdop:.2f}" if hdop else "HDOP: --"
                hdg_str = f"Hdg: {heading:.1f}°" if heading is not None else "Hdg: --"
                print(f" | {lat_str} | {lon_str} | {alt_str} | {spd_str} | {fix_str} | {hdop_str} | {hdg_str} | Sat: {strong_satellites or 0}")
            else:
                print(" | ---")
        await asyncio.sleep(0.1)

async def main():
    try:
        tasks = [
            asyncio.create_task(read_gps()),
            asyncio.create_task(update_display())
        ]
        await asyncio.gather(*tasks)
    except KeyboardInterrupt:
        print("Программа завершена пользователем")
        display.show("----")
        display.brightness(0)

# Запуск программы
asyncio.run(main())
← Previous
Back to list