2025-04-20 07:51:27
#ok # ESP32 TTGO : lilygo_ttgo_tdisplay_esp32_16m : Отправка данных с аналоговых входов и управление светодиодами согласно ответу сервера
# LOAD: ampy --port /dev/ttyACM0 put ~/esp32/lib # ampy --port /dev/ttyACM0 put ~/esp32/code.py
import wifi
import socketpool
import time
import asyncio
import board
import adafruit_requests as requests
import digitalio
import analogio
import TM1637 # https://github.com/bablokb/circuitpython-tm1637
# Настройка TM1637
display = TM1637.TM1637(board.IO21, board.IO22) # clk, dio
display.brightness(1)
display.show("8888") # Очищаем экран при старте
# Глобальные переменные
connected = False # Флаг состояния подключения к Wi-Fi
wifi_pool = None # Глобальная переменная для пула сокетов
session = None # HTTP-сессия
# URL для GET-запроса
HTTP_URL = "http://172.104.137.172/esp32/get.php?id=2&get={}" # Форматированная строка
# Список сетей Wi-Fi
networks = [
{"ssid": 'F_2', "password": '***'},
{"ssid": 'mi5', "password": '***'},
{"ssid": 'prg', "password": '***'},
]
# Инициализация аналоговых входов
analog_in_a10 = analogio.AnalogIn(board.IO12)
analog_in_a11 = analogio.AnalogIn(board.IO13)
analog_in_a12 = analogio.AnalogIn(board.IO15)
analog_in_a13 = analogio.AnalogIn(board.IO25)
# Определение пинов для цифровых выходов
d1_pin = board.IO32 # Замените на фактический пин
d2_pin = board.IO33 # Замените на фактический пин
# Инициализация цифровых выходов
d1 = digitalio.DigitalInOut(d1_pin)
d1.direction = digitalio.Direction.OUTPUT
d2 = digitalio.DigitalInOut(d2_pin)
d2.direction = digitalio.Direction.OUTPUT
def reset_all_outputs():
"""Сбрасывает все цифровые выходы в состояние 0."""
d1.value = False
d2.value = False
print("All digital outputs have been reset to 0.")
def scan_available_networks():
"""Сканирует доступные сети Wi-Fi."""
available_networks = []
try:
print("Scanning for available networks...")
for network in wifi.radio.start_scanning_networks():
available_networks.append(network.ssid)
wifi.radio.stop_scanning_networks()
print(f"Available networks: {available_networks}")
except Exception as e:
print(f"Error scanning networks: {e}")
return available_networks
def connect_to_wifi(ssid, password):
"""Подключается к Wi-Fi."""
try:
print(f"Trying to connect to {ssid}...")
wifi.radio.connect(ssid, password)
print(f"Connected to {ssid}.")
print(f"IP address: {wifi.radio.ipv4_address}")
return True
except ConnectionError as e:
print(f"Connection error for {ssid}: {e}")
except Exception as e:
print(f"Unexpected error connecting to {ssid}: {e}")
return False
async def manage_wifi_connection():
"""Управляет Wi-Fi соединением."""
global connected, wifi_pool, session
while True:
if not connected:
reset_all_outputs()
# Сканируем доступные сети
available_networks = scan_available_networks()
# Попытка подключения к доступным сетям Wi-Fi из нашего списка
for network in networks:
if network["ssid"] in available_networks:
if connect_to_wifi(network["ssid"], network["password"]):
wifi_pool = socketpool.SocketPool(wifi.radio)
session = requests.Session(wifi_pool)
connected = True
break
if not connected:
print("No available networks. Retrying in 15 seconds...")
await asyncio.sleep(15)
else:
# Проверяем состояние Wi-Fi подключения
if wifi.radio.ipv4_address is None:
print("Wi-Fi connection lost. Attempting to reconnect...")
connected = False
await asyncio.sleep(5) # Проверяем состояние каждые 5 секунд
async def send_http_request():
"""Отправляет HTTP-запрос каждую секунду и обрабатывает ответ."""
global connected, session
while True:
if connected and session:
try:
# Получаем значения с аналоговых входов
# a10_value = analog_in_a10.value
# a11_value = analog_in_a11.value
# a12_value = analog_in_a12.value
# a13_value = analog_in_a13.value
a10_value = 0
a11_value = 0
a12_value = 0
a13_value = 0
print(f"Analog inputs: A10={a10_value}, A11={a11_value}, A12={a12_value}, A13={a13_value}")
# Формируем URL с данными для отправки
url = HTTP_URL.format(f"{a10_value},{a11_value},{a12_value},{a13_value}")
print(f"Sending request to: {url}")
display.show("8888")
# Отправляем GET-запрос
start_time = time.monotonic()
response = session.get(url)
end_time = time.monotonic()
request_time_ms = (end_time - start_time) * 1000
print(f"HTTP request successful. Time: {request_time_ms:.2f} ms")
print(f"Response: {response.status_code}, {response.text}")
# Обработка ответа сервера
process_server_response(response.text)
response.close() # Закрываем соединение
# Выводим время запроса на TM1637
#display_time_on_tm1637(request_time_ms)
except Exception as e:
print(f"Error sending HTTP request: {e}")
reset_all_outputs()
display.show("ERR ") # Показываем ошибку на дисплее
else:
print("Wi-Fi not connected. Cannot send HTTP request.")
display.show("----") # Показываем, что запрос невозможен
reset_all_outputs()
await asyncio.sleep(10) # Отправляем запрос кажд...
def process_server_response(response_text):
"""Обрабатывает ответ сервера."""
try:
# Парсим ответ сервера (например, ":in,2025-04-09,20-13-52,34852,37315,52149,15331:out,d1=1,d2=1,d3=0,d4=1")
parts = response_text.split(":")
input_part = parts[1] # Часть с входными данными
output_part = parts[2] # Часть с выходными данными
# Обрабатываем входные данные (время)
time_str_full = input_part.split(",")[2] # Извлекаем "20-13-52"
time_str = time_str_full.replace("-", "")[:4] # Убираем дефисы и берём первые 4 символа ("2013")
display.show(time_str) # Выводим время на дисплей
# Обрабатываем выходные данные (управление светодиодами)
d1_value = int(output_part.split(",")[1].split("=")[1])
d2_value = int(output_part.split(",")[2].split("=")[1])
d1.value = bool(d1_value)
d2.value = bool(d2_value)
print(f"set to: {d1_value} {d2_value} \n")
except Exception as e:
print(f"Error parsing server response: {e}")
def display_time_on_tm1637(request_time_ms):
"""
Отображает время запроса на TM1637.
Формат: XXXX (миллисекунды, до 4 цифр).
Если значение больше 9999, отображается "MAX".
"""
if request_time_ms > 9999:
display.show("MAX ")
else:
# Форматируем время запроса как целое число
request_str = f"{int(request_time_ms):04}"
display.show(request_str)
async def main():
"""Основная функция программы."""
wifi_task = asyncio.create_task(manage_wifi_connection())
http_task = asyncio.create_task(send_http_request())
await asyncio.gather(wifi_task, http_task)
# Запуск программы
asyncio.run(main())
Back to list