<===

ProNotes

2025-04-10 14:49:33
#ok Программа wifi-uqu-get-tm-time.py (функциональный аналог eth-uqu-get-tm-time.py ) предназначена для работы на микроконтроллере ESP32-S3-Zero с использованием CircuitPython. Она выполняет следующие основные функции:

Назначение программы:
Сбор данных с аналоговых входов: Программа считывает значения с четырёх аналоговых входов (A10, A11, A12, A13) микроконтроллера.
Подключение к Wi-Fi: Устанавливает соединение с одной из заданных Wi-Fi сетей и поддерживает его в активном состоянии.
Отправка данных на сервер: Отправляет данные с аналоговых входов на сервер через HTTP GET-запросы.
Обработка ответа сервера: Получает команды от сервера для управления шестью цифровыми выходами (D1–D6), которые могут использоваться, например, для управления светодиодами.
Отображение информации: Выводит данные (время с сервера или статус) на четырёхзначный дисплей TM1637.
Основные особенности программы:
Асинхронное выполнение: Использует библиотеку asyncio для параллельного управления подключением к Wi-Fi и отправкой HTTP-запросов.
Автоматическое переподключение к Wi-Fi:
Сканирует доступные сети и подключается к одной из заранее заданных (F_2, mi5, prg).
При потере соединения программа сбрасывает цифровые выходы и пытается переподключиться.
Обработка аналоговых данных: Считывает значения с аналоговых входов (16-битные, 0–65535) и отправляет их в формате строки через GET-запрос.
Управление выходами: На основе ответа сервера (например, d1=1,d2=0,...) устанавливает состояния цифровых выходов (включено/выключено).
Дисплей TM1637:
Показывает время из ответа сервера (например, "2013" из "20-13-52").
Отображает статус подключения или ошибки (например, "----" при отсутствии Wi-Fi, "ERR " при сбое запроса).
Обработка ошибок: Программа устойчива к сбоям — при ошибках сбрасывает выходы и выводит сообщение об ошибке.
Регулярные запросы: HTTP-запросы отправляются каждые 10 секунд (настраиваемый интервал через await asyncio.sleep(10)).
Структура программы:
manage_wifi_connection(): Асинхронная функция для управления подключением к Wi-Fi. Сканирует сети, подключается и проверяет состояние каждые 5 секунд.
send_http_request(): Асинхронная функция для отправки данных с аналоговых входов и обработки ответа сервера.
process_server_response(): Парсит ответ сервера и управляет цифровыми выходами, а также выводит время на дисплей.
display_time_on_tm1637(): Отображает время запроса в миллисекундах (если требуется, сейчас закомментировано).
main(): Запускает обе задачи (wifi_task и http_task) параллельно.
Пример работы:
Микроконтроллер подключается к сети F_2.
Считывает значения с аналоговых входов, например: A10=34852, A11=37315, A12=52149, A13=15331.
Отправляет запрос: http://ur4uqu.com/?id=1&;get=34852,37315,52149,15331.
Получает ответ: :in,2025-04-09,20-13-52,34852,37315,52149,15331:out,d1=1,d2=1,d3=0,d4=1,d5=0,d6=0.
Устанавливает D1=1, D2=1, D3=0, D4=1, D5=0, D6=0 и показывает "2013" на дисплее.
Примечания:
Программа требует библиотек: wifi, socketpool, adafruit_requests, digitalio, analogio и TM1637.
URL сервера (http://ur4uqu.com/...) и список Wi-Fi сетей должны быть настроены под конкретные условия.
Пины для выходов (D1–D6) нужно указать в соответствии с аппаратной конфигурацией.
Это решение подходит для IoT-приложений, где требуется удалённый мониторинг аналоговых сигналов и управление устройствами через интернет.

Image

$ cat wifi-uqu-get-tm-time.py 
#ESP32-S3-Zero : Отправка данных с аналоговых входов и управление светодиодами согласно ответу сервера
import wifi
import socketpool
import time
import asyncio
import board
import adafruit_requests as requests
import digitalio
import analogio
import TM1637  # https://github.com/bablokb/circuitpython-tm1637

# Настройка TM1637
display = TM1637.TM1637(board.D7, board.D8)  # clk, dio
display.brightness(1)
display.show("8888")  # Очищаем экран при старте

# Глобальные переменные
connected = False  # Флаг состояния подключения к Wi-Fi
wifi_pool = None   # Глобальная переменная для пула сокетов
session = None     # HTTP-сессия

# URL для GET-запроса
HTTP_URL = "http://ur4uqu.com/...?id=1&;get={}"  # Форматированная строка

# Список сетей Wi-Fi
networks = [
    {"ssid": 'F_2', "password": '888'},
    {"ssid": 'mi5', "password": '888'},
    {"ssid": 'prg', "password": '888'},
]

# Инициализация аналоговых входов
analog_in_a10 = analogio.AnalogIn(board.A10)
analog_in_a11 = analogio.AnalogIn(board.A11)
analog_in_a12 = analogio.AnalogIn(board.A12)
analog_in_a13 = analogio.AnalogIn(board.A13)

# Определение пинов для цифровых выходов
d1_pin = board.D1  # Замените на фактический пин
d2_pin = board.D2  # Замените на фактический пин
d3_pin = board.D3  # Замените на фактический пин
d4_pin = board.D4  # Замените на фактический пин
d5_pin = board.D5  # Замените на фактический пин
d6_pin = board.D6  # Замените на фактический пин

# Инициализация цифровых выходов
d1 = digitalio.DigitalInOut(d1_pin)
d1.direction = digitalio.Direction.OUTPUT
d2 = digitalio.DigitalInOut(d2_pin)
d2.direction = digitalio.Direction.OUTPUT
d3 = digitalio.DigitalInOut(d3_pin)
d3.direction = digitalio.Direction.OUTPUT
d4 = digitalio.DigitalInOut(d4_pin)
d4.direction = digitalio.Direction.OUTPUT
d5 = digitalio.DigitalInOut(d5_pin)
d5.direction = digitalio.Direction.OUTPUT
d6 = digitalio.DigitalInOut(d6_pin)
d6.direction = digitalio.Direction.OUTPUT

def reset_all_outputs():
    """Сбрасывает все цифровые выходы в состояние 0."""
    d1.value = False
    d2.value = False
    d3.value = False
    d4.value = False
    d5.value = False
    d6.value = False
    print("All digital outputs have been reset to 0.")

def scan_available_networks():
    """Сканирует доступные сети Wi-Fi."""
    available_networks = []
    try:
        print("Scanning for available networks...")
        for network in wifi.radio.start_scanning_networks():
            available_networks.append(network.ssid)
        wifi.radio.stop_scanning_networks()
        print(f"Available networks: {available_networks}")
    except Exception as e:
        print(f"Error scanning networks: {e}")
    return available_networks

def connect_to_wifi(ssid, password):
    """Подключается к Wi-Fi."""
    try:
        print(f"Trying to connect to {ssid}...")
        wifi.radio.connect(ssid, password)
        print(f"Connected to {ssid}.")
        print(f"IP address: {wifi.radio.ipv4_address}")
        return True
    except ConnectionError as e:
        print(f"Connection error for {ssid}: {e}")
    except Exception as e:
        print(f"Unexpected error connecting to {ssid}: {e}")
    return False

async def manage_wifi_connection():
    """Управляет Wi-Fi соединением."""
    global connected, wifi_pool, session
    while True:
        if not connected:
            reset_all_outputs()
            # Сканируем доступные сети
            available_networks = scan_available_networks()
            # Попытка подключения к доступным сетям Wi-Fi из нашего списка
            for network in networks:
                if network["ssid"] in available_networks:
                    if connect_to_wifi(network["ssid"], network["password"]):
                        wifi_pool = socketpool.SocketPool(wifi.radio)
                        session = requests.Session(wifi_pool)
                        connected = True
                        break
            if not connected:
                print("No available networks. Retrying in 15 seconds...")
                await asyncio.sleep(15)
        else:
            # Проверяем состояние Wi-Fi подключения
            if wifi.radio.ipv4_address is None:
                print("Wi-Fi connection lost. Attempting to reconnect...")
                connected = False
            await asyncio.sleep(5)  # Проверяем состояние каждые 5 секунд

async def send_http_request():
    """Отправляет HTTP-запрос каждую секунду и обрабатывает ответ."""
    global connected, session
    while True:
        if connected and session:
            try:
                # Получаем значения с аналоговых входов
                a10_value = analog_in_a10.value
                a11_value = analog_in_a11.value
                a12_value = analog_in_a12.value
                a13_value = analog_in_a13.value
                print(f"Analog inputs: A10={a10_value}, A11={a11_value}, A12={a12_value}, A13={a13_value}")
                
                # Формируем URL с данными для отправки
                url = HTTP_URL.format(f"{a10_value},{a11_value},{a12_value},{a13_value}")
                print(f"Sending request to: {url}")

                display.show("8888")

                # Отправляем GET-запрос
                start_time = time.monotonic()
                response = session.get(url)
                end_time = time.monotonic()
                request_time_ms = (end_time - start_time) * 1000
                print(f"HTTP request successful. Time: {request_time_ms:.2f} ms")
                print(f"Response: {response.status_code}, {response.text}")
                
                # Обработка ответа сервера
                process_server_response(response.text)
                response.close()  # Закрываем соединение
                
                # Выводим время запроса на TM1637
                #display_time_on_tm1637(request_time_ms)
            except Exception as e:
                print(f"Error sending HTTP request: {e}")
                reset_all_outputs()
                display.show("ERR ")  # Показываем ошибку на дисплее
        else:
            print("Wi-Fi not connected. Cannot send HTTP request.")
            display.show("----")  # Показываем, что запрос невозможен
            reset_all_outputs()
        await asyncio.sleep(10)  # Отправляем запрос кажд...

def process_server_response(response_text):
    """Обрабатывает ответ сервера."""
    try:
        # Парсим ответ сервера (например, ":in,2025-04-09,20-13-52,34852,37315,52149,15331:out,d1=1,d2=1,d3=0,d4=1")
        parts = response_text.split(":")
        input_part = parts[1]  # Часть с входными данными
        output_part = parts[2]  # Часть с выходными данными
        
        # Обрабатываем входные данные (время)
        time_str_full = input_part.split(",")[2]  # Извлекаем "20-13-52"
        time_str = time_str_full.replace("-", "")[:4]  # Убираем дефисы и берём первые 4 символа ("2013")
        display.show(time_str)  # Выводим время на дисплей
        
        # Обрабатываем выходные данные (управление светодиодами)
        d1_value = int(output_part.split(",")[1].split("=")[1])
        d2_value = int(output_part.split(",")[2].split("=")[1])
        d3_value = int(output_part.split(",")[3].split("=")[1])
        d4_value = int(output_part.split(",")[4].split("=")[1])
        d5_value = int(output_part.split(",")[5].split("=")[1])
        d6_value = int(output_part.split(",")[6].split("=")[1])
        
        d1.value = bool(d1_value)
        d2.value = bool(d2_value)
        d3.value = bool(d3_value)
        d4.value = bool(d4_value)
        d5.value = bool(d5_value)
        d6.value = bool(d6_value)
        
        print(f"set to: {d1_value} {d2_value} {d3_value} {d4_value} {d5_value} {d6_value} \n")
    except Exception as e:
        print(f"Error parsing server response: {e}")

def display_time_on_tm1637(request_time_ms):
    """
    Отображает время запроса на TM1637.
    Формат: XXXX (миллисекунды, до 4 цифр).
    Если значение больше 9999, отображается "MAX".
    """
    if request_time_ms > 9999:
        display.show("MAX ")
    else:
        # Форматируем время запроса как целое число
        request_str = f"{int(request_time_ms):04}"
        display.show(request_str)

async def main():
    """Основная функция программы."""
    wifi_task = asyncio.create_task(manage_wifi_connection())
    http_task = asyncio.create_task(send_http_request())
    await asyncio.gather(wifi_task, http_task)

# Запуск программы
asyncio.run(main())




============  Adafruit CircuitPython 10.0.3 on 2025-10-17; Waveshare ESP32-S3-Zero with ESP32S3 ============

import wifi
import socketpool
import time
import board
import adafruit_requests as requests
import digitalio
import analogio
import TM1637

# TM1637 setup
display = TM1637.TM1637(board.D7, board.D8)
display.brightness(1)
display.show("8888")

# Wi-Fi setup
networks = [
    {"ssid": "F_2", "password": "***"},
    {"ssid": "mi5", "password": "***"},
    {"ssid": "prg", "password": "***"},
]
HTTP_URL = "http://ur4uqu.com/esp32/get.php?id=1&;get={}"

# Analog inputs
analog_in_a10 = analogio.AnalogIn(board.A10)
analog_in_a11 = analogio.AnalogIn(board.A11)
analog_in_a12 = analogio.AnalogIn(board.A12)
analog_in_a13 = analogio.AnalogIn(board.A13)

# Digital outputs
d1 = digitalio.DigitalInOut(board.D1)
d1.direction = digitalio.Direction.OUTPUT
d2 = digitalio.DigitalInOut(board.D2)
d2.direction = digitalio.Direction.OUTPUT
d3 = digitalio.DigitalInOut(board.D3)
d3.direction = digitalio.Direction.OUTPUT
d4 = digitalio.DigitalInOut(board.D4)
d4.direction = digitalio.Direction.OUTPUT
d5 = digitalio.DigitalInOut(board.D5)
d5.direction = digitalio.Direction.OUTPUT
d6 = digitalio.DigitalInOut(board.D6)
d6.direction = digitalio.Direction.OUTPUT

def reset_all_outputs():
    d1.value = d2.value = d3.value = d4.value = d5.value = d6.value = False
    print("All digital outputs reset to 0.")

def connect_to_wifi():
    for network in networks:
        try:
            print(f"Trying to connect to {network['ssid']}...")
            wifi.radio.connect(network["ssid"], network["password"])
            print(f"Connected to {network['ssid']}. IP: {wifi.radio.ipv4_address}")
            return True
        except Exception as e:
            print(f"Failed to connect to {network['ssid']}: {e}")
    return False

def process_server_response(response_text):
    try:
        parts = response_text.split(":")
        input_part = parts[1]
        output_part = parts[2]
        time_str = input_part.split(",")[2].replace("-", "")[:4]
        display.show(time_str)
        d1_value = int(output_part.split(",")[1].split("=")[1])
        d2_value = int(output_part.split(",")[2].split("=")[1])
        d3_value = int(output_part.split(",")[3].split("=")[1])
        d4_value = int(output_part.split(",")[4].split("=")[1])
        d5_value = int(output_part.split(",")[5].split("=")[1])
        d6_value = int(output_part.split(",")[6].split("=")[1])
        d1.value = bool(d1_value)
        d2.value = bool(d2_value)
        d3.value = bool(d3_value)
        d4.value = bool(d4_value)
        d5.value = bool(d5_value)
        d6.value = bool(d6_value)
        print(f"Set outputs: {d1_value} {d2_value} {d3_value} {d4_value} {d5_value} {d6_value}")
    except Exception as e:
        print(f"Error parsing response: {e}")
        reset_all_outputs()
        display.show("ERR ")

# Connect to Wi-Fi
if connect_to_wifi():
    pool = socketpool.SocketPool(wifi.radio)
    session = requests.Session(pool)
else:
    print("Wi-Fi connection failed.")
    display.show("----")
    reset_all_outputs()
    while True:
        time.sleep(10)

# Main loop
while True:
    try:
        a10_value = analog_in_a10.value
        a11_value = analog_in_a11.value
        a12_value = analog_in_a12.value
        a13_value = analog_in_a13.value
        print(f"Analog inputs: A10={a10_value}, A11={a11_value}, A12={a12_value}, A13={a13_value}")
        url = HTTP_URL.format(f"{a10_value},{a11_value},{a12_value},{a13_value}")
        print(f"Sending request to: {url}")
        start_time = time.monotonic()
        response = session.get(url)
        try:
            end_time = time.monotonic()
            request_time_ms = (end_time - start_time) * 1000
            print(f"HTTP request successful. Time: {request_time_ms:.2f} ms")
            print(f"Response: {response.status_code}, {response.text}")
            process_server_response(response.text)
        finally:
            response.close()
    except Exception as e:
        print(f"Error sending HTTP request: {e}")
        reset_all_outputs()
        display.show("ERR ")
    time.sleep(10)

------- asyncio ---------- 
import wifi
import socketpool
import time
import asyncio
import board
import adafruit_requests
import digitalio
import analogio
import TM1637
import gc

# Настройка дисплея TM1637
display = TM1637.TM1637(board.D7, board.D8)
display.brightness(1)
display.show("8888")

# Конфигурация
HTTP_URL = "http://ur4uqu.com/esp32/get.php?id=1&;get={}"
NETWORKS = [{"ssid": "F_2", "password": "***"}, {"ssid": "mi5", "password": "***"}, {"ssid": "prg", "password": "***"}]

# Аналоговые входы
analog_ins = [analogio.AnalogIn(pin) for pin in (board.A10, board.A11, board.A12, board.A13)]

# Цифровые выходы
digital_outs = [digitalio.DigitalInOut(pin) for pin in (board.D1, board.D2, board.D3, board.D4, board.D5, board.D6)]
for out in digital_outs:
    out.direction = digitalio.Direction.OUTPUT

def reset_outputs(error_code="ERR"):
    """Сбрасывает выходы и показывает код ошибки."""
    for out in digital_outs:
        out.value = False
    display.show(error_code + " ")

def connect_wifi():
    """Подключается к Wi-Fi, возвращает сессию или None."""
    display.show("WIFI")
    for net in NETWORKS:
        try:
            wifi.radio.connect(net["ssid"], net["password"])
            print(f"Подключено к {net['ssid']}. IP: {wifi.radio.ipv4_address}")
            display.show("CONN")
            return adafruit_requests.Session(socketpool.SocketPool(wifi.radio))
        except Exception as e:
            print(f"Ошибка подключения к {net['ssid']}: {e}")
            reset_outputs("W-ER")
    return None

async def manage_wifi():
    """Управляет Wi-Fi соединением."""
    session = None
    while True:
        if not session or not wifi.radio.ipv4_address:
            reset_outputs("W-ER")
            session = connect_wifi()
            if not session:
                print("Wi-Fi не подключен. Повтор через 15 сек...")
                display.show("----")
                await asyncio.sleep(15)
                continue
            await asyncio.sleep(2)  # Показываем CONN 2 секунды
        await asyncio.sleep(5)
        global current_session
        current_session = session

async def send_requests():
    """Отправляет HTTP-запросы и обрабатывает ответы."""
    global current_session
    while True:
        if current_session:
            for attempt in range(3):  # До 3 попыток
                try:
                    values = [pin.value for pin in analog_ins]
                    url = HTTP_URL.format(",".join(map(str, values)))
                    print(f"Входы: {values}, URL: {url}")
                    start = time.monotonic()
                    with current_session.get(url) as resp:
                        if resp.status_code != 200:
                            raise ValueError(f"HTTP {resp.status_code}")
                        end = time.monotonic()
                        request_ms = (end - start) * 1000
                        process_response(resp.text)
                    print(f"Запрос успешен: {resp.status_code}, Время: {request_ms:.2f} мс")
                    break
                except Exception as e:
                    print(f"Ошибка HTTP (попытка {attempt + 1}/3): {e}")
                    reset_outputs("H-ER")
                    if attempt == 2:
                        reset_outputs("H-ER")
                    await asyncio.sleep(1)
        else:
            display.show("----")
            reset_outputs("W-ER")
        gc.collect()
        print(f"Свободная память: {gc.mem_free()} байт")
        await asyncio.sleep(10)

async def check_memory_later():
    """Проверяет память через 2 минуты после старта."""
    await asyncio.sleep(120)
    gc.collect()
    print(f"Свободная память через 2 минуты: {gc.mem_free()} байт")

def process_response(text):
    """Обрабатывает ответ сервера."""
    try:
        parts = text.split(":")
        if len(parts) != 3 or not text.startswith(":in") or ":out" not in text:
            raise ValueError("Неверный формат ответа")
        time_str = parts[1].split(",")[2].replace("-", "")[:4]
        outs = [int(v.split("=")[1]) for v in parts[2].split(",")[1:7]]
        for out, val in zip(digital_outs, outs):
            out.value = bool(val)
        print(f"Выходы: {outs}")
        display.show(time_str)  # Статичное отображение времени сервера
    except Exception as e:
        print(f"Ошибка обработки ответа: {e}")
        reset_outputs("R-ER")

async def main():
    """Основная функция."""
    global current_session
    current_session = None
    gc.collect()
    print(f"Свободная память при старте: {gc.mem_free()} байт")
    try:
        await asyncio.gather(manage_wifi(), send_requests(), check_memory_later())
    except KeyboardInterrupt:
        print("Программа прервана.")
        reset_outputs("OFF")
        raise

asyncio.run(main())
← Previous Next →
Back to list