TrashNotes

2025-04-20 07:51:27
#ok # ESP32 TTGO : lilygo_ttgo_tdisplay_esp32_16m :  Отправка данных с аналоговых входов и управление светодиодами согласно ответу сервера
# LOAD: ampy --port /dev/ttyACM0 put ~/esp32/lib # ampy --port /dev/ttyACM0 put ~/esp32/code.py

import wifi
import socketpool
import time
import asyncio
import board
import adafruit_requests as requests
import digitalio
import analogio
import TM1637  # https://github.com/bablokb/circuitpython-tm1637

# Настройка TM1637
display = TM1637.TM1637(board.IO21, board.IO22)  # clk, dio
display.brightness(1)
display.show("8888")  # Очищаем экран при старте

# Глобальные переменные
connected = False  # Флаг состояния подключения к Wi-Fi
wifi_pool = None   # Глобальная переменная для пула сокетов
session = None     # HTTP-сессия

# URL для GET-запроса
HTTP_URL = "http://172.104.137.172/esp32/get.php?id=2&get={}"  # Форматированная строка

# Список сетей Wi-Fi
networks = [
    {"ssid": 'F_2', "password": '***'},
    {"ssid": 'mi5', "password": '***'},
    {"ssid": 'prg', "password": '***'},
]

# Инициализация аналоговых входов
analog_in_a10 = analogio.AnalogIn(board.IO12)
analog_in_a11 = analogio.AnalogIn(board.IO13)
analog_in_a12 = analogio.AnalogIn(board.IO15)
analog_in_a13 = analogio.AnalogIn(board.IO25)

# Определение пинов для цифровых выходов
d1_pin = board.IO32  # Замените на фактический пин
d2_pin = board.IO33  # Замените на фактический пин

# Инициализация цифровых выходов
d1 = digitalio.DigitalInOut(d1_pin)
d1.direction = digitalio.Direction.OUTPUT
d2 = digitalio.DigitalInOut(d2_pin)
d2.direction = digitalio.Direction.OUTPUT

def reset_all_outputs():
    """Сбрасывает все цифровые выходы в состояние 0."""
    d1.value = False
    d2.value = False
    print("All digital outputs have been reset to 0.")

def scan_available_networks():
    """Сканирует доступные сети Wi-Fi."""
    available_networks = []
    try:
        print("Scanning for available networks...")
        for network in wifi.radio.start_scanning_networks():
            available_networks.append(network.ssid)
        wifi.radio.stop_scanning_networks()
        print(f"Available networks: {available_networks}")
    except Exception as e:
        print(f"Error scanning networks: {e}")
    return available_networks

def connect_to_wifi(ssid, password):
    """Подключается к Wi-Fi."""
    try:
        print(f"Trying to connect to {ssid}...")
        wifi.radio.connect(ssid, password)
        print(f"Connected to {ssid}.")
        print(f"IP address: {wifi.radio.ipv4_address}")
        return True
    except ConnectionError as e:
        print(f"Connection error for {ssid}: {e}")
    except Exception as e:
        print(f"Unexpected error connecting to {ssid}: {e}")
    return False

async def manage_wifi_connection():
    """Управляет Wi-Fi соединением."""
    global connected, wifi_pool, session
    while True:
        if not connected:
            reset_all_outputs()
            # Сканируем доступные сети
            available_networks = scan_available_networks()
            # Попытка подключения к доступным сетям Wi-Fi из нашего списка
            for network in networks:
                if network["ssid"] in available_networks:
                    if connect_to_wifi(network["ssid"], network["password"]):
                        wifi_pool = socketpool.SocketPool(wifi.radio)
                        session = requests.Session(wifi_pool)
                        connected = True
                        break
            if not connected:
                print("No available networks. Retrying in 15 seconds...")
                await asyncio.sleep(15)
        else:
            # Проверяем состояние Wi-Fi подключения
            if wifi.radio.ipv4_address is None:
                print("Wi-Fi connection lost. Attempting to reconnect...")
                connected = False
            await asyncio.sleep(5)  # Проверяем состояние каждые 5 секунд

async def send_http_request():
    """Отправляет HTTP-запрос каждую секунду и обрабатывает ответ."""
    global connected, session
    while True:
        if connected and session:
            try:
                # Получаем значения с аналоговых входов
#                a10_value = analog_in_a10.value
#                a11_value = analog_in_a11.value
#                a12_value = analog_in_a12.value
#                a13_value = analog_in_a13.value

                a10_value = 0
                a11_value = 0
                a12_value = 0
                a13_value = 0
                print(f"Analog inputs: A10={a10_value}, A11={a11_value}, A12={a12_value}, A13={a13_value}")

                # Формируем URL с данными для отправки
                url = HTTP_URL.format(f"{a10_value},{a11_value},{a12_value},{a13_value}")
                print(f"Sending request to: {url}")

                display.show("8888")

                # Отправляем GET-запрос
                start_time = time.monotonic()
                response = session.get(url)
                end_time = time.monotonic()
                request_time_ms = (end_time - start_time) * 1000
                print(f"HTTP request successful. Time: {request_time_ms:.2f} ms")
                print(f"Response: {response.status_code}, {response.text}")

                # Обработка ответа сервера
                process_server_response(response.text)
                response.close()  # Закрываем соединение

                # Выводим время запроса на TM1637
                #display_time_on_tm1637(request_time_ms)
            except Exception as e:
                print(f"Error sending HTTP request: {e}")
                reset_all_outputs()
                display.show("ERR ")  # Показываем ошибку на дисплее
        else:
            print("Wi-Fi not connected. Cannot send HTTP request.")
            display.show("----")  # Показываем, что запрос невозможен
            reset_all_outputs()
        await asyncio.sleep(10)  # Отправляем запрос кажд...

def process_server_response(response_text):
    """Обрабатывает ответ сервера."""
    try:
        # Парсим ответ сервера (например, ":in,2025-04-09,20-13-52,34852,37315,52149,15331:out,d1=1,d2=1,d3=0,d4=1")
        parts = response_text.split(":")
        input_part = parts[1]  # Часть с входными данными
        output_part = parts[2]  # Часть с выходными данными

        # Обрабатываем входные данные (время)
        time_str_full = input_part.split(",")[2]  # Извлекаем "20-13-52"
        time_str = time_str_full.replace("-", "")[:4]  # Убираем дефисы и берём первые 4 символа ("2013")
        display.show(time_str)  # Выводим время на дисплей

        # Обрабатываем выходные данные (управление светодиодами)
        d1_value = int(output_part.split(",")[1].split("=")[1])
        d2_value = int(output_part.split(",")[2].split("=")[1])

        d1.value = bool(d1_value)
        d2.value = bool(d2_value)

        print(f"set to: {d1_value} {d2_value} \n")
    except Exception as e:
        print(f"Error parsing server response: {e}")

def display_time_on_tm1637(request_time_ms):
    """
    Отображает время запроса на TM1637.
    Формат: XXXX (миллисекунды, до 4 цифр).
    Если значение больше 9999, отображается "MAX".
    """
    if request_time_ms > 9999:
        display.show("MAX ")
    else:
        # Форматируем время запроса как целое число
        request_str = f"{int(request_time_ms):04}"
        display.show(request_str)

async def main():
    """Основная функция программы."""
    wifi_task = asyncio.create_task(manage_wifi_connection())
    http_task = asyncio.create_task(send_http_request())
    await asyncio.gather(wifi_task, http_task)

# Запуск программы
asyncio.run(main())
← Previous Next →
Back to list