2025-04-24 18:43:17
на микроконтроллере ESP32-S3-Zero с использованием CircuitPython
Adafruit CircuitPython 9.2.6 on 2025-03-23
Waveshare ESP32-S3-Zero with ESP32S3
Board ID:waveshare_esp32_s3_zero
$ cat wifi_manager.py
# wifi_manager.py
import wifi
import socketpool
import time
import asyncio
import adafruit_requests
class WiFiManager:
def __init__(self, networks, server_ip):
"""
Инициализация менеджера Wi-Fi.
:param networks: Список сетей Wi-Fi для подключения (с SSID и паролями).
:param server_ip: IP-адрес сервера для пинга.
"""
self.networks = networks
self.server_ip = server_ip
self.connected = False
self.wifi_pool = None
self.requests_session = None
self.last_response = None # Глобальная переменная для хранения последнего ответа
def connect_to_wifi(self, ssid, password):
"""Подключается к Wi-Fi."""
try:
print(f"Trying to connect to {ssid}...")
wifi.radio.connect(ssid, password)
print(f"Connected to {ssid}.")
print(f"IP address: {wifi.radio.ipv4_address}")
self.wifi_pool = socketpool.SocketPool(wifi.radio)
self.requests_session = adafruit_requests.Session(self.wifi_pool)
self.connected = True
return True
except ConnectionError as e:
print(f"Connection error for {ssid}: {e}")
except Exception as e:
print(f"Unexpected error connecting to {ssid}: {e}")
return False
def scan_available_networks(self):
"""Сканирует доступные сети Wi-Fi."""
available_networks = []
try:
print("Scanning for available networks...")
for network in wifi.radio.start_scanning_networks():
available_networks.append(network.ssid)
wifi.radio.stop_scanning_networks()
print(f"Available networks: {available_networks}")
except Exception as e:
print(f"Error scanning networks: {e}")
return available_networks
async def manage_wifi_connection(self):
"""Управляет Wi-Fi соединением."""
while True:
if not self.connected:
# Сканируем доступные сети
available_networks = self.scan_available_networks()
# Попытка подключения к доступным сетям Wi-Fi из нашего списка
for network in self.networks:
if network["ssid"] in available_networks:
if self.connect_to_wifi(network["ssid"], network["password"]):
break
if not self.connected:
print("No available networks. Retrying in 15 seconds...")
await asyncio.sleep(15)
else:
# Проверяем состояние Wi-Fi подключения
if wifi.radio.ipv4_address is None:
print("Wi-Fi connection lost. Attempting to reconnect...")
self.connected = False
await asyncio.sleep(5) # Проверяем состояние каждые 5 секунд
async def ping_server(self):
"""Пингует сервер и выводит время пинга в терминал."""
while True:
if self.connected and self.wifi_pool:
try:
start_time = time.monotonic()
socket = self.wifi_pool.socket()
socket.connect((self.server_ip, 80)) # DNS порт для проверки подключения
socket.close()
end_time = time.monotonic()
ping_time_ms = (end_time - start_time) * 1000
print(f"Ping to {self.server_ip} successful. Time: {ping_time_ms:.2f} ms")
except Exception as e:
print(f"Error pinging {self.server_ip}: {e}")
else:
print("Wi-Fi not connected. Cannot ping server.")
await asyncio.sleep(20) # Пинг каждую...
async def fetch_url(self, url):
"""
Выполняет HTTP GET запрос к указанному URL.
Возвращает содержимое ответа или None в случае ошибки.
Также сохраняет ответ в глобальную переменную self.last_response.
"""
if not self.connected or not self.requests_session:
print("Wi-Fi not connected. Cannot fetch URL.")
return None
try:
response = self.requests_session.get(url)
if response.status_code == 200:
self.last_response = response.text # Сохраняем ответ в глобальную переменную
print(f"Response from {url}:")
print(self.last_response)
return self.last_response # Возвращаем содержимое ответа
else:
print(f"Failed to fetch {url}. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
def get_last_response(self):
"""Возвращает последний успешный ответ."""
return self.last_response
async def run(self):
"""Запускает задачи управления Wi-Fi и пинга."""
wifi_task = asyncio.create_task(self.manage_wifi_connection())
ping_task = asyncio.create_task(self.ping_server())
await asyncio.gather(wifi_task, ping_task)
'''
$ cat wifi_manager_ex.py
# code.py
from wifi_manager import WiFiManager
import asyncio
# Конфигурация
networks = [
{"ssid": 'F_2', "password": '***'},
{"ssid": 'mi5', "password": '***'},
{"ssid": 'prg', "password": '***'},
]
server_ip = "ur4uqu.com" # IP-адрес для пинга
time_url = "http://ur4uqu.com/time/" # URL для получения времени
# Инициализация менеджера Wi-Fi
wifi_manager = WiFiManager(networks=networks, server_ip=server_ip)
async def fetch_time_periodically():
"""Периодически запрашивает время с сервера."""
while True:
# Выполняем запрос и получаем ответ
response = await wifi_manager.fetch_url(time_url)
if response:
# Обрабатываем ответ (например, парсим время)
print(f"Response: \"{response[:25]}\"") # Печатаем первые 50 символов
await asyncio.sleep(1) # Запрос каждую секунду
async def main():
"""Основная функция программы."""
# Запускаем задачи
wifi_task = asyncio.create_task(wifi_manager.run())
time_task = asyncio.create_task(fetch_time_periodically())
await asyncio.gather(wifi_task, time_task)
# Запуск программы
asyncio.run(main())
'''
Back to list