TrashNotes

2025-04-24 18:43:17
на микроконтроллере ESP32-S3-Zero с использованием CircuitPython

Adafruit CircuitPython 9.2.6 on 2025-03-23
Waveshare ESP32-S3-Zero with ESP32S3
Board ID:waveshare_esp32_s3_zero


$ cat wifi_manager.py 
# wifi_manager.py
import wifi
import socketpool
import time
import asyncio
import adafruit_requests

class WiFiManager:
    def __init__(self, networks, server_ip):
        """
        Инициализация менеджера Wi-Fi.
        :param networks: Список сетей Wi-Fi для подключения (с SSID и паролями).
        :param server_ip: IP-адрес сервера для пинга.
        """
        self.networks = networks
        self.server_ip = server_ip
        self.connected = False
        self.wifi_pool = None
        self.requests_session = None
        self.last_response = None  # Глобальная переменная для хранения последнего ответа

    def connect_to_wifi(self, ssid, password):
        """Подключается к Wi-Fi."""
        try:
            print(f"Trying to connect to {ssid}...")
            wifi.radio.connect(ssid, password)
            print(f"Connected to {ssid}.")
            print(f"IP address: {wifi.radio.ipv4_address}")
            self.wifi_pool = socketpool.SocketPool(wifi.radio)
            self.requests_session = adafruit_requests.Session(self.wifi_pool)
            self.connected = True
            return True
        except ConnectionError as e:
            print(f"Connection error for {ssid}: {e}")
        except Exception as e:
            print(f"Unexpected error connecting to {ssid}: {e}")
        return False

    def scan_available_networks(self):
        """Сканирует доступные сети Wi-Fi."""
        available_networks = []
        try:
            print("Scanning for available networks...")
            for network in wifi.radio.start_scanning_networks():
                available_networks.append(network.ssid)
            wifi.radio.stop_scanning_networks()
            print(f"Available networks: {available_networks}")
        except Exception as e:
            print(f"Error scanning networks: {e}")
        return available_networks

    async def manage_wifi_connection(self):
        """Управляет Wi-Fi соединением."""
        while True:
            if not self.connected:
                # Сканируем доступные сети
                available_networks = self.scan_available_networks()
                # Попытка подключения к доступным сетям Wi-Fi из нашего списка
                for network in self.networks:
                    if network["ssid"] in available_networks:
                        if self.connect_to_wifi(network["ssid"], network["password"]):
                            break
                if not self.connected:
                    print("No available networks. Retrying in 15 seconds...")
                    await asyncio.sleep(15)
            else:
                # Проверяем состояние Wi-Fi подключения
                if wifi.radio.ipv4_address is None:
                    print("Wi-Fi connection lost. Attempting to reconnect...")
                    self.connected = False
                await asyncio.sleep(5)  # Проверяем состояние каждые 5 секунд

    async def ping_server(self):
        """Пингует сервер и выводит время пинга в терминал."""
        while True:
            if self.connected and self.wifi_pool:
                try:
                    start_time = time.monotonic()
                    socket = self.wifi_pool.socket()
                    socket.connect((self.server_ip, 80))  # DNS порт для проверки подключения
                    socket.close()
                    end_time = time.monotonic()
                    ping_time_ms = (end_time - start_time) * 1000
                    print(f"Ping to {self.server_ip} successful. Time: {ping_time_ms:.2f} ms")
                except Exception as e:
                    print(f"Error pinging {self.server_ip}: {e}")
            else:
                print("Wi-Fi not connected. Cannot ping server.")
            await asyncio.sleep(20)  # Пинг каждую...

    async def fetch_url(self, url):
        """
        Выполняет HTTP GET запрос к указанному URL.
        Возвращает содержимое ответа или None в случае ошибки.
        Также сохраняет ответ в глобальную переменную self.last_response.
        """
        if not self.connected or not self.requests_session:
            print("Wi-Fi not connected. Cannot fetch URL.")
            return None

        try:
            response = self.requests_session.get(url)
            if response.status_code == 200:
                self.last_response = response.text  # Сохраняем ответ в глобальную переменную
                print(f"Response from {url}:")
                print(self.last_response)
                return self.last_response  # Возвращаем содержимое ответа
            else:
                print(f"Failed to fetch {url}. Status code: {response.status_code}")
                return None
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None

    def get_last_response(self):
        """Возвращает последний успешный ответ."""
        return self.last_response

    async def run(self):
        """Запускает задачи управления Wi-Fi и пинга."""
        wifi_task = asyncio.create_task(self.manage_wifi_connection())
        ping_task = asyncio.create_task(self.ping_server())
        await asyncio.gather(wifi_task, ping_task)


'''
$ cat wifi_manager_ex.py 
# code.py
from wifi_manager import WiFiManager
import asyncio

# Конфигурация
networks = [
    {"ssid": 'F_2', "password": '***'},
    {"ssid": 'mi5', "password": '***'},
    {"ssid": 'prg', "password": '***'},
]

server_ip = "ur4uqu.com"  # IP-адрес для пинга
time_url = "http://ur4uqu.com/time/";  # URL для получения времени

# Инициализация менеджера Wi-Fi
wifi_manager = WiFiManager(networks=networks, server_ip=server_ip)

async def fetch_time_periodically():
    """Периодически запрашивает время с сервера."""
    while True:
        # Выполняем запрос и получаем ответ
        response = await wifi_manager.fetch_url(time_url)
        if response:
            # Обрабатываем ответ (например, парсим время)
            print(f"Response: \"{response[:25]}\"")  # Печатаем первые 50 символов
        await asyncio.sleep(1)  # Запрос каждую секунду

async def main():
    """Основная функция программы."""
    # Запускаем задачи
    wifi_task = asyncio.create_task(wifi_manager.run())
    time_task = asyncio.create_task(fetch_time_periodically())
    await asyncio.gather(wifi_task, time_task)

# Запуск программы
asyncio.run(main())



'''
← Previous Next →
Back to list