TrashNotes

2025-04-16 07:30:56
# Grok ( Для подключения GPS-модуля HW-248-GPS6MV2 к микроконтроллеру ESP32-S3-Zero с использованием CircuitPython )

На основе анализа вашего кода я подготовил улучшенную версию с учетом рекомендаций. Основные изменения:
- Добавлена проверка контрольной суммы NMEA.
- Улучшено декодирование строк с использованием `decode('ascii', errors='ignore')`.
- Добавлено ограничение размера буфера.
- Реализована функция для преобразования координат в числовой формат (градусы).
- Валидация данных перед использованием (проверка статуса фикса и наличия полей).
- Структурирован парсинг с помощью отдельных функций для каждого типа сообщения.
- Использование `logging` вместо `print`.
- Оптимизирован вывод в производственном режиме с использованием словаря.

Код остается совместимым с вашим окружением (CircuitPython, UART), сохраняет оба режима (отладочный и производственный) и обрабатывает те же NMEA-сообщения.

```python
import board
import busio
import time
import logging

# Настройка логирования
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Переменная для управления режимом продакшн
production_mode = False

# Инициализация UART
uart = busio.UART(board.RX, board.TX, baudrate=9600, timeout=1)

# Буфер для хранения сырых данных
raw_buffer = bytearray()
MAX_BUFFER_SIZE = 1024  # Максимальный размер буфера

41# Хранилище для GPS-данных
gps_data = {
    'gpgga': {'time': '', 'latitude': None, 'longitude': None, 'altitude': None},
    'gprmc': {'time': '', 'date': '', 'latitude': None, 'longitude': None, 'speed': None, 'course': None, 'status': ''},
    'gpgll': {'time': '', 'latitude': None, 'longitude': None}
}

# Время последнего вывода в режиме продакшн
last_output_time = time.monotonic()

def validate_nmea_checksum(line):
    """Проверка контрольной суммы NMEA-сообщения."""
    if b'*' not in line:
        return False
    try:
        data, checksum = line.split(b'*')
        calculated = 0
        for char in data[1:]:  # Пропускаем '$'
            calculated ^= char
        return calculated == int(checksum, 16)
    except ValueError:
        return False

def convert_to_degrees(raw, direction):
    """Преобразование координат в градусы."""
    if not raw or not direction:
        return None
    try:
        degrees = float(raw[:2]) + float(raw[2:]) / 60
        return degrees if direction in ('N', 'E') else -degrees
    except (ValueError, IndexError):
        return None

def parse_gpgga(parts):
    """Парсинг сообщения $GPGGA."""
    if len(parts) < 10 or not parts[1]:
        return None
    try:
        return {
            'time': parts[1],
            'latitude': convert_to_degrees(parts[2], parts[3]),
            'longitude': convert_to_degrees(parts[4], parts[5]),
            'altitude': float(parts[9]) if parts[9] else None
        }
    except (ValueError, IndexError):
        return None

def parse_gprmc(parts):
    """Парсинг сообщения $GPRMC."""
    if len(parts) < 10 or parts[2] != 'A':  # Проверяем статус фикса
        return None
    try:
        return {
            'time': parts[1],
            'date': parts[9],
            'latitude': convert_to_degrees(parts[3], parts[4]),
            'longitude': convert_to_degrees(parts[5], parts[6]),
            'speed': float(parts[7]) if parts[7] else None,
            'course': float(parts[8]) if parts[8] else None,
            'status': parts[2]
        }
    except (ValueError, IndexError):
        return None

def parse_gpgll(parts):
    """Парсинг сообщения $GPGLL."""
    if len(parts) < 6 or not parts[5]:
        return None
    try:
        return {
            'time': parts[5],
            'latitude': convert_to_degrees(parts[1], parts[2]),
            'longitude': convert_to_degrees(parts[3], parts[4])
        }
    except (ValueError, IndexError):
        return None

def debug_print(parts, decoded_data):
    """Вывод отладочной информации."""
    if parts[0] == '$GPGGA' and len(parts) > 9:
        logger.debug(f"Время: {parts[1]}")
        logger.debug(f"Широта: {parts[2]}, {parts[3]}")
        logger.debug(f"Долгота: {parts[4]}, {parts[5]}")
        logger.debug(f"Количество спутников: {parts[7]}")
        logger.debug(f"Точность (HDOP): {parts[8]}")
        logger.debug(f"Высота: {parts[9]} метров")
    elif parts[0] == '$GPGSA' and len(parts) > 3:
        logger.debug(f"Режим: {parts[1]}")
        logger.debug(f"Тип фиксации: {parts[2]}")
        logger.debug(f"Количество спутников: {len(parts) - 4}")
        logger.debug(f"PDOP: {parts[-3]}")
        logger.debug(f"HDOP: {parts[-2]}")
        logger.debug(f"VDOP: {parts[-1][:-1]}")
    elif parts[0] == '$GPGSV' and len(parts) > 3:
        logger.debug(f"Количество сообщений: {parts[1]}")
        logger.debug(f"Номер сообщения: {parts[2]}")
        logger.debug(f"Количество спутников: {parts[3]}")
        for i in range(4, len(parts), 4):
            if i + 3 < len(parts):
                logger.debug(f"Спутник {parts[i]}: {parts[i+1]} градусов, {parts[i+2]}")
    elif parts[0] == '$GPGLL' and len(parts) > 5:
        logger.debug(f"Широта: {parts[1]}, {parts[2]}")
        logger.debug(f"Долгота: {parts[3]}, {parts[4]}")
        logger.debug(f"Время: {parts[5]}")
    elif parts[0] == '$GPRMC' and len(parts) > 9:
        logger.debug(f"Время: {parts[1]}")
        logger.debug(f"Статус: {parts[2]}")
        logger.debug(f"Широта: {parts[3]}, {parts[4]}")
        logger.debug(f"Долгота: {parts[5]}, {parts[6]}")
        logger.debug(f"Скорость: {parts[7]} узлов")
        logger.debug(f"Курс: {parts[8]} градусов")
    elif parts[0] == '$GPVTG' and len(parts) > 5:
        logger.debug(f"Курс: {parts[1]} градусов")
        logger.debug(f"Скорость: {parts[5]} узлов")
    logger.debug(f"Raw NMEA: {decoded_data}")

def production_print():
    """Вывод данных в производственном режиме."""
    if gps_data['gprmc']['time'] and gps_data['gprmc']['date']:
        hours = gps_data['gprmc']['time'][:2]
        minutes = gps_data['gprmc']['time'][2:4]
        seconds = gps_data['gprmc']['time'][4:]
        day = gps_data['gprmc']['date'][:2]
        month = gps_data['gprmc']['date'][2:4]
        year = "20" + gps_data['gprmc']['date'][4:]
        logger.info(f"Дата и время: {year}-{month}-{day} {hours}:{minutes}:{seconds}")
    else:
        logger.info("Дата и время не доступны")

    if gps_data['gpgga']['latitude'] is not None and gps_data['gpgga']['longitude'] is not None:
        logger.info(f"Широта (GPGGA): {gps_data['gpgga']['latitude']:.6f} градусов")
        logger.info(f"Долгота (GPGGA): {gps_data['gpgga']['longitude']:.6f} градусов")
        logger.info(f"Высота (GPGGA): {gps_data['gpgga']['altitude'] or 'N/A'} метров")
    else:
        logger.info("Координаты GPGGA не доступны")

    if gps_data['gprmc']['latitude'] is not None and gps_data['gprmc']['longitude'] is not None:
        logger.info(f"Широта (GPRMC): {gps_data['gprmc']['latitude']:.6f} градусов")
        logger.info(f"Долгота (GPRMC): {gps_data['gprmc']['longitude']:.6f} градусов")
        logger.info(f"Скорость (GPRMC): {gps_data['gprmc']['speed'] or 'N/A'} узлов")
        logger.info(f"Курс (GPRMC): {gps_data['gprmc']['course'] or 'N/A'} градусов")
    else:
        logger.info("Координаты GPRMC не доступны")

    if gps_data['gpgll']['latitude'] is not None and gps_data['gpgll']['longitude'] is not None:
        logger.info(f"Широта (GPGLL): {gps_data['gpgll']['latitude']:.6f} градусов")
        logger.info(f"Долгота (GPGLL): {gps_data['gpgll']['longitude']:.6f} градусов")
    else:
        logger.info("Координаты GPGLL не доступны")

while True:
    # Чтение данных из UART
    raw_data = uart.read(100)
    if raw_data:
        raw_buffer.extend(raw_data)

        # Ограничение размера буфера
        if len(raw_buffer) > MAX_BUFFER_SIZE:
            logger.warning("Буфер переполнен, очищен")
            raw_buffer = bytearray()

        # Обработка буфера
        while b'\n' in raw_buffer:
            index = raw_buffer.find(b'\n')
            line = raw_buffer[:index]
            raw_buffer = raw_buffer[index + 1:]

            # Проверка контрольной суммы
            if not validate_nmea_checksum(line):
                logger.warning("Неверная контрольная сумма, строка пропущена")
                continue

            try:
                # Декодирование строки
                decoded_data = line.decode('ascii', errors='ignore')
                parts = decoded_data.split(',')

                # Парсинг данных
                if parts[0] == '$GPGGA':
                    parsed = parse_gpgga(parts)
                    if parsed:
                        gps_data['gpgga'].update(parsed)
                elif parts[0] == '$GPRMC':
                    parsed = parse_gprmc(parts)
                    if parsed:
                        gps_data['gprmc'].update(parsed)
                elif parts[0] == '$GPGLL':
                    parsed = parse_gpgll(parts)
                    if parsed:
                        gps_data['gpgll'].update(parsed)

                # Режим дебага
                if not production_mode:
                    debug_print(parts, decoded_data)

                # Режим продакшн
                if production_mode:
                    current_time = time.monotonic()
                    if current_time - last_output_time >= 1:
                        last_output_time = current_time
                        production_print()

            except Exception as e:
                logger.error(f"Ошибка обработки данных: {e}")
```
← Previous Next →
Back to list