2025-04-10 14:49:33
#ok Программа wifi-uqu-get-tm-time.py (функциональный аналог eth-uqu-get-tm-time.py ) предназначена для работы на микроконтроллере ESP32-S3-Zero с использованием CircuitPython. Она выполняет следующие основные функции:
Назначение программы:
Сбор данных с аналоговых входов: Программа считывает значения с четырёх аналоговых входов (A10, A11, A12, A13) микроконтроллера.
Подключение к Wi-Fi: Устанавливает соединение с одной из заданных Wi-Fi сетей и поддерживает его в активном состоянии.
Отправка данных на сервер: Отправляет данные с аналоговых входов на сервер через HTTP GET-запросы.
Обработка ответа сервера: Получает команды от сервера для управления шестью цифровыми выходами (D1–D6), которые могут использоваться, например, для управления светодиодами.
Отображение информации: Выводит данные (время с сервера или статус) на четырёхзначный дисплей TM1637.
Основные особенности программы:
Асинхронное выполнение: Использует библиотеку asyncio для параллельного управления подключением к Wi-Fi и отправкой HTTP-запросов.
Автоматическое переподключение к Wi-Fi:
Сканирует доступные сети и подключается к одной из заранее заданных (F_2, mi5, prg).
При потере соединения программа сбрасывает цифровые выходы и пытается переподключиться.
Обработка аналоговых данных: Считывает значения с аналоговых входов (16-битные, 0–65535) и отправляет их в формате строки через GET-запрос.
Управление выходами: На основе ответа сервера (например, d1=1,d2=0,...) устанавливает состояния цифровых выходов (включено/выключено).
Дисплей TM1637:
Показывает время из ответа сервера (например, "2013" из "20-13-52").
Отображает статус подключения или ошибки (например, "----" при отсутствии Wi-Fi, "ERR " при сбое запроса).
Обработка ошибок: Программа устойчива к сбоям — при ошибках сбрасывает выходы и выводит сообщение об ошибке.
Регулярные запросы: HTTP-запросы отправляются каждые 10 секунд (настраиваемый интервал через await asyncio.sleep(10)).
Структура программы:
manage_wifi_connection(): Асинхронная функция для управления подключением к Wi-Fi. Сканирует сети, подключается и проверяет состояние каждые 5 секунд.
send_http_request(): Асинхронная функция для отправки данных с аналоговых входов и обработки ответа сервера.
process_server_response(): Парсит ответ сервера и управляет цифровыми выходами, а также выводит время на дисплей.
display_time_on_tm1637(): Отображает время запроса в миллисекундах (если требуется, сейчас закомментировано).
main(): Запускает обе задачи (wifi_task и http_task) параллельно.
Пример работы:
Микроконтроллер подключается к сети F_2.
Считывает значения с аналоговых входов, например: A10=34852, A11=37315, A12=52149, A13=15331.
Отправляет запрос: http://ur4uqu.com/?id=1&get=34852,37315,52149,15331.
Получает ответ: :in,2025-04-09,20-13-52,34852,37315,52149,15331:out,d1=1,d2=1,d3=0,d4=1,d5=0,d6=0.
Устанавливает D1=1, D2=1, D3=0, D4=1, D5=0, D6=0 и показывает "2013" на дисплее.
Примечания:
Программа требует библиотек: wifi, socketpool, adafruit_requests, digitalio, analogio и TM1637.
URL сервера (http://ur4uqu.com/...) и список Wi-Fi сетей должны быть настроены под конкретные условия.
Пины для выходов (D1–D6) нужно указать в соответствии с аппаратной конфигурацией.
Это решение подходит для IoT-приложений, где требуется удалённый мониторинг аналоговых сигналов и управление устройствами через интернет.
$ cat wifi-uqu-get-tm-time.py
#ESP32-S3-Zero : Отправка данных с аналоговых входов и управление светодиодами согласно ответу сервера
import wifi
import socketpool
import time
import asyncio
import board
import adafruit_requests as requests
import digitalio
import analogio
import TM1637 # https://github.com/bablokb/circuitpython-tm1637
# Настройка TM1637
display = TM1637.TM1637(board.D7, board.D8) # clk, dio
display.brightness(1)
display.show("8888") # Очищаем экран при старте
# Глобальные переменные
connected = False # Флаг состояния подключения к Wi-Fi
wifi_pool = None # Глобальная переменная для пула сокетов
session = None # HTTP-сессия
# URL для GET-запроса
HTTP_URL = "http://ur4uqu.com/...?id=1&get={}" # Форматированная строка
# Список сетей Wi-Fi
networks = [
{"ssid": 'F_2', "password": '888'},
{"ssid": 'mi5', "password": '888'},
{"ssid": 'prg', "password": '888'},
]
# Инициализация аналоговых входов
analog_in_a10 = analogio.AnalogIn(board.A10)
analog_in_a11 = analogio.AnalogIn(board.A11)
analog_in_a12 = analogio.AnalogIn(board.A12)
analog_in_a13 = analogio.AnalogIn(board.A13)
# Определение пинов для цифровых выходов
d1_pin = board.D1 # Замените на фактический пин
d2_pin = board.D2 # Замените на фактический пин
d3_pin = board.D3 # Замените на фактический пин
d4_pin = board.D4 # Замените на фактический пин
d5_pin = board.D5 # Замените на фактический пин
d6_pin = board.D6 # Замените на фактический пин
# Инициализация цифровых выходов
d1 = digitalio.DigitalInOut(d1_pin)
d1.direction = digitalio.Direction.OUTPUT
d2 = digitalio.DigitalInOut(d2_pin)
d2.direction = digitalio.Direction.OUTPUT
d3 = digitalio.DigitalInOut(d3_pin)
d3.direction = digitalio.Direction.OUTPUT
d4 = digitalio.DigitalInOut(d4_pin)
d4.direction = digitalio.Direction.OUTPUT
d5 = digitalio.DigitalInOut(d5_pin)
d5.direction = digitalio.Direction.OUTPUT
d6 = digitalio.DigitalInOut(d6_pin)
d6.direction = digitalio.Direction.OUTPUT
def reset_all_outputs():
"""Сбрасывает все цифровые выходы в состояние 0."""
d1.value = False
d2.value = False
d3.value = False
d4.value = False
d5.value = False
d6.value = False
print("All digital outputs have been reset to 0.")
def scan_available_networks():
"""Сканирует доступные сети Wi-Fi."""
available_networks = []
try:
print("Scanning for available networks...")
for network in wifi.radio.start_scanning_networks():
available_networks.append(network.ssid)
wifi.radio.stop_scanning_networks()
print(f"Available networks: {available_networks}")
except Exception as e:
print(f"Error scanning networks: {e}")
return available_networks
def connect_to_wifi(ssid, password):
"""Подключается к Wi-Fi."""
try:
print(f"Trying to connect to {ssid}...")
wifi.radio.connect(ssid, password)
print(f"Connected to {ssid}.")
print(f"IP address: {wifi.radio.ipv4_address}")
return True
except ConnectionError as e:
print(f"Connection error for {ssid}: {e}")
except Exception as e:
print(f"Unexpected error connecting to {ssid}: {e}")
return False
async def manage_wifi_connection():
"""Управляет Wi-Fi соединением."""
global connected, wifi_pool, session
while True:
if not connected:
reset_all_outputs()
# Сканируем доступные сети
available_networks = scan_available_networks()
# Попытка подключения к доступным сетям Wi-Fi из нашего списка
for network in networks:
if network["ssid"] in available_networks:
if connect_to_wifi(network["ssid"], network["password"]):
wifi_pool = socketpool.SocketPool(wifi.radio)
session = requests.Session(wifi_pool)
connected = True
break
if not connected:
print("No available networks. Retrying in 15 seconds...")
await asyncio.sleep(15)
else:
# Проверяем состояние Wi-Fi подключения
if wifi.radio.ipv4_address is None:
print("Wi-Fi connection lost. Attempting to reconnect...")
connected = False
await asyncio.sleep(5) # Проверяем состояние каждые 5 секунд
async def send_http_request():
"""Отправляет HTTP-запрос каждую секунду и обрабатывает ответ."""
global connected, session
while True:
if connected and session:
try:
# Получаем значения с аналоговых входов
a10_value = analog_in_a10.value
a11_value = analog_in_a11.value
a12_value = analog_in_a12.value
a13_value = analog_in_a13.value
print(f"Analog inputs: A10={a10_value}, A11={a11_value}, A12={a12_value}, A13={a13_value}")
# Формируем URL с данными для отправки
url = HTTP_URL.format(f"{a10_value},{a11_value},{a12_value},{a13_value}")
print(f"Sending request to: {url}")
display.show("8888")
# Отправляем GET-запрос
start_time = time.monotonic()
response = session.get(url)
end_time = time.monotonic()
request_time_ms = (end_time - start_time) * 1000
print(f"HTTP request successful. Time: {request_time_ms:.2f} ms")
print(f"Response: {response.status_code}, {response.text}")
# Обработка ответа сервера
process_server_response(response.text)
response.close() # Закрываем соединение
# Выводим время запроса на TM1637
#display_time_on_tm1637(request_time_ms)
except Exception as e:
print(f"Error sending HTTP request: {e}")
reset_all_outputs()
display.show("ERR ") # Показываем ошибку на дисплее
else:
print("Wi-Fi not connected. Cannot send HTTP request.")
display.show("----") # Показываем, что запрос невозможен
reset_all_outputs()
await asyncio.sleep(10) # Отправляем запрос кажд...
def process_server_response(response_text):
"""Обрабатывает ответ сервера."""
try:
# Парсим ответ сервера (например, ":in,2025-04-09,20-13-52,34852,37315,52149,15331:out,d1=1,d2=1,d3=0,d4=1")
parts = response_text.split(":")
input_part = parts[1] # Часть с входными данными
output_part = parts[2] # Часть с выходными данными
# Обрабатываем входные данные (время)
time_str_full = input_part.split(",")[2] # Извлекаем "20-13-52"
time_str = time_str_full.replace("-", "")[:4] # Убираем дефисы и берём первые 4 символа ("2013")
display.show(time_str) # Выводим время на дисплей
# Обрабатываем выходные данные (управление светодиодами)
d1_value = int(output_part.split(",")[1].split("=")[1])
d2_value = int(output_part.split(",")[2].split("=")[1])
d3_value = int(output_part.split(",")[3].split("=")[1])
d4_value = int(output_part.split(",")[4].split("=")[1])
d5_value = int(output_part.split(",")[5].split("=")[1])
d6_value = int(output_part.split(",")[6].split("=")[1])
d1.value = bool(d1_value)
d2.value = bool(d2_value)
d3.value = bool(d3_value)
d4.value = bool(d4_value)
d5.value = bool(d5_value)
d6.value = bool(d6_value)
print(f"set to: {d1_value} {d2_value} {d3_value} {d4_value} {d5_value} {d6_value} \n")
except Exception as e:
print(f"Error parsing server response: {e}")
def display_time_on_tm1637(request_time_ms):
"""
Отображает время запроса на TM1637.
Формат: XXXX (миллисекунды, до 4 цифр).
Если значение больше 9999, отображается "MAX".
"""
if request_time_ms > 9999:
display.show("MAX ")
else:
# Форматируем время запроса как целое число
request_str = f"{int(request_time_ms):04}"
display.show(request_str)
async def main():
"""Основная функция программы."""
wifi_task = asyncio.create_task(manage_wifi_connection())
http_task = asyncio.create_task(send_http_request())
await asyncio.gather(wifi_task, http_task)
# Запуск программы
asyncio.run(main())
Back to list