2025-04-16 07:30:56
# Grok ( Для подключения GPS-модуля HW-248-GPS6MV2 к микроконтроллеру ESP32-S3-Zero с использованием CircuitPython )
На основе анализа вашего кода я подготовил улучшенную версию с учетом рекомендаций. Основные изменения:
- Добавлена проверка контрольной суммы NMEA.
- Улучшено декодирование строк с использованием `decode('ascii', errors='ignore')`.
- Добавлено ограничение размера буфера.
- Реализована функция для преобразования координат в числовой формат (градусы).
- Валидация данных перед использованием (проверка статуса фикса и наличия полей).
- Структурирован парсинг с помощью отдельных функций для каждого типа сообщения.
- Использование `logging` вместо `print`.
- Оптимизирован вывод в производственном режиме с использованием словаря.
Код остается совместимым с вашим окружением (CircuitPython, UART), сохраняет оба режима (отладочный и производственный) и обрабатывает те же NMEA-сообщения.
```python
import board
import busio
import time
import logging
# Настройка логирования
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Переменная для управления режимом продакшн
production_mode = False
# Инициализация UART
uart = busio.UART(board.RX, board.TX, baudrate=9600, timeout=1)
# Буфер для хранения сырых данных
raw_buffer = bytearray()
MAX_BUFFER_SIZE = 1024 # Максимальный размер буфера
41# Хранилище для GPS-данных
gps_data = {
'gpgga': {'time': '', 'latitude': None, 'longitude': None, 'altitude': None},
'gprmc': {'time': '', 'date': '', 'latitude': None, 'longitude': None, 'speed': None, 'course': None, 'status': ''},
'gpgll': {'time': '', 'latitude': None, 'longitude': None}
}
# Время последнего вывода в режиме продакшн
last_output_time = time.monotonic()
def validate_nmea_checksum(line):
"""Проверка контрольной суммы NMEA-сообщения."""
if b'*' not in line:
return False
try:
data, checksum = line.split(b'*')
calculated = 0
for char in data[1:]: # Пропускаем '$'
calculated ^= char
return calculated == int(checksum, 16)
except ValueError:
return False
def convert_to_degrees(raw, direction):
"""Преобразование координат в градусы."""
if not raw or not direction:
return None
try:
degrees = float(raw[:2]) + float(raw[2:]) / 60
return degrees if direction in ('N', 'E') else -degrees
except (ValueError, IndexError):
return None
def parse_gpgga(parts):
"""Парсинг сообщения $GPGGA."""
if len(parts) < 10 or not parts[1]:
return None
try:
return {
'time': parts[1],
'latitude': convert_to_degrees(parts[2], parts[3]),
'longitude': convert_to_degrees(parts[4], parts[5]),
'altitude': float(parts[9]) if parts[9] else None
}
except (ValueError, IndexError):
return None
def parse_gprmc(parts):
"""Парсинг сообщения $GPRMC."""
if len(parts) < 10 or parts[2] != 'A': # Проверяем статус фикса
return None
try:
return {
'time': parts[1],
'date': parts[9],
'latitude': convert_to_degrees(parts[3], parts[4]),
'longitude': convert_to_degrees(parts[5], parts[6]),
'speed': float(parts[7]) if parts[7] else None,
'course': float(parts[8]) if parts[8] else None,
'status': parts[2]
}
except (ValueError, IndexError):
return None
def parse_gpgll(parts):
"""Парсинг сообщения $GPGLL."""
if len(parts) < 6 or not parts[5]:
return None
try:
return {
'time': parts[5],
'latitude': convert_to_degrees(parts[1], parts[2]),
'longitude': convert_to_degrees(parts[3], parts[4])
}
except (ValueError, IndexError):
return None
def debug_print(parts, decoded_data):
"""Вывод отладочной информации."""
if parts[0] == '$GPGGA' and len(parts) > 9:
logger.debug(f"Время: {parts[1]}")
logger.debug(f"Широта: {parts[2]}, {parts[3]}")
logger.debug(f"Долгота: {parts[4]}, {parts[5]}")
logger.debug(f"Количество спутников: {parts[7]}")
logger.debug(f"Точность (HDOP): {parts[8]}")
logger.debug(f"Высота: {parts[9]} метров")
elif parts[0] == '$GPGSA' and len(parts) > 3:
logger.debug(f"Режим: {parts[1]}")
logger.debug(f"Тип фиксации: {parts[2]}")
logger.debug(f"Количество спутников: {len(parts) - 4}")
logger.debug(f"PDOP: {parts[-3]}")
logger.debug(f"HDOP: {parts[-2]}")
logger.debug(f"VDOP: {parts[-1][:-1]}")
elif parts[0] == '$GPGSV' and len(parts) > 3:
logger.debug(f"Количество сообщений: {parts[1]}")
logger.debug(f"Номер сообщения: {parts[2]}")
logger.debug(f"Количество спутников: {parts[3]}")
for i in range(4, len(parts), 4):
if i + 3 < len(parts):
logger.debug(f"Спутник {parts[i]}: {parts[i+1]} градусов, {parts[i+2]}")
elif parts[0] == '$GPGLL' and len(parts) > 5:
logger.debug(f"Широта: {parts[1]}, {parts[2]}")
logger.debug(f"Долгота: {parts[3]}, {parts[4]}")
logger.debug(f"Время: {parts[5]}")
elif parts[0] == '$GPRMC' and len(parts) > 9:
logger.debug(f"Время: {parts[1]}")
logger.debug(f"Статус: {parts[2]}")
logger.debug(f"Широта: {parts[3]}, {parts[4]}")
logger.debug(f"Долгота: {parts[5]}, {parts[6]}")
logger.debug(f"Скорость: {parts[7]} узлов")
logger.debug(f"Курс: {parts[8]} градусов")
elif parts[0] == '$GPVTG' and len(parts) > 5:
logger.debug(f"Курс: {parts[1]} градусов")
logger.debug(f"Скорость: {parts[5]} узлов")
logger.debug(f"Raw NMEA: {decoded_data}")
def production_print():
"""Вывод данных в производственном режиме."""
if gps_data['gprmc']['time'] and gps_data['gprmc']['date']:
hours = gps_data['gprmc']['time'][:2]
minutes = gps_data['gprmc']['time'][2:4]
seconds = gps_data['gprmc']['time'][4:]
day = gps_data['gprmc']['date'][:2]
month = gps_data['gprmc']['date'][2:4]
year = "20" + gps_data['gprmc']['date'][4:]
logger.info(f"Дата и время: {year}-{month}-{day} {hours}:{minutes}:{seconds}")
else:
logger.info("Дата и время не доступны")
if gps_data['gpgga']['latitude'] is not None and gps_data['gpgga']['longitude'] is not None:
logger.info(f"Широта (GPGGA): {gps_data['gpgga']['latitude']:.6f} градусов")
logger.info(f"Долгота (GPGGA): {gps_data['gpgga']['longitude']:.6f} градусов")
logger.info(f"Высота (GPGGA): {gps_data['gpgga']['altitude'] or 'N/A'} метров")
else:
logger.info("Координаты GPGGA не доступны")
if gps_data['gprmc']['latitude'] is not None and gps_data['gprmc']['longitude'] is not None:
logger.info(f"Широта (GPRMC): {gps_data['gprmc']['latitude']:.6f} градусов")
logger.info(f"Долгота (GPRMC): {gps_data['gprmc']['longitude']:.6f} градусов")
logger.info(f"Скорость (GPRMC): {gps_data['gprmc']['speed'] or 'N/A'} узлов")
logger.info(f"Курс (GPRMC): {gps_data['gprmc']['course'] or 'N/A'} градусов")
else:
logger.info("Координаты GPRMC не доступны")
if gps_data['gpgll']['latitude'] is not None and gps_data['gpgll']['longitude'] is not None:
logger.info(f"Широта (GPGLL): {gps_data['gpgll']['latitude']:.6f} градусов")
logger.info(f"Долгота (GPGLL): {gps_data['gpgll']['longitude']:.6f} градусов")
else:
logger.info("Координаты GPGLL не доступны")
while True:
# Чтение данных из UART
raw_data = uart.read(100)
if raw_data:
raw_buffer.extend(raw_data)
# Ограничение размера буфера
if len(raw_buffer) > MAX_BUFFER_SIZE:
logger.warning("Буфер переполнен, очищен")
raw_buffer = bytearray()
# Обработка буфера
while b'\n' in raw_buffer:
index = raw_buffer.find(b'\n')
line = raw_buffer[:index]
raw_buffer = raw_buffer[index + 1:]
# Проверка контрольной суммы
if not validate_nmea_checksum(line):
logger.warning("Неверная контрольная сумма, строка пропущена")
continue
try:
# Декодирование строки
decoded_data = line.decode('ascii', errors='ignore')
parts = decoded_data.split(',')
# Парсинг данных
if parts[0] == '$GPGGA':
parsed = parse_gpgga(parts)
if parsed:
gps_data['gpgga'].update(parsed)
elif parts[0] == '$GPRMC':
parsed = parse_gprmc(parts)
if parsed:
gps_data['gprmc'].update(parsed)
elif parts[0] == '$GPGLL':
parsed = parse_gpgll(parts)
if parsed:
gps_data['gpgll'].update(parsed)
# Режим дебага
if not production_mode:
debug_print(parts, decoded_data)
# Режим продакшн
if production_mode:
current_time = time.monotonic()
if current_time - last_output_time >= 1:
last_output_time = current_time
production_print()
except Exception as e:
logger.error(f"Ошибка обработки данных: {e}")
```
Back to list