TrashNotes

2025-04-07 22:03:26
#ok # ESP32-S3-Zero: Получение времени с http://ur4uqu.com/time/ и вывод в терминал каждую секунду
# Используется asyncio для параллельной обработки задач

import wifi
import socketpool
import adafruit_requests as requests
import time
import asyncio

# Список сетей Wi-Fi
networks = [
    {"ssid": 'F_2', "password": '888'},
    {"ssid": 'mi5', "password": '888'},
    {"ssid": 'prg', "password": '888'},
]

# URL для GET-запроса
TIME_URL = "http://ur4uqu.com/time/";

# Глобальные переменные
local_time = None
last_fetch_time = 0  # Время последнего запроса времени с сервера
connected = False  # Флаг состояния подключения к Wi-Fi
time_initialized = False  # Флаг, указывающий, было ли время хотя бы раз установлено
session = None  # HTTP-сессия

def connect_to_wifi(ssid, password):
    """Подключается к Wi-Fi."""
    try:
        print(f"Trying to connect to {ssid}...")
        wifi.radio.connect(ssid, password)
        print(f"Connected to {ssid}.")
        print(f"IP address: {wifi.radio.ipv4_address}")
        return True
    except ConnectionError as e:
        print(f"Connection error for {ssid}: {e}")
    except Exception as e:
        print(f"Unexpected error connecting to {ssid}: {e}")
    return False

def scan_available_networks():
    """Сканирует доступные сети Wi-Fi."""
    available_networks = []
    try:
        print("Scanning for available networks...")
        for network in wifi.radio.start_scanning_networks():
            available_networks.append(network.ssid)
        wifi.radio.stop_scanning_networks()
        print(f"Available networks: {available_networks}")
    except Exception as e:
        print(f"Error scanning networks: {e}")
    return available_networks

def fetch_time():
    """Получает время с указанного URL."""
    global session
    try:
        print(f"Fetching time from {TIME_URL}...")
        response = session.get(TIME_URL)
        if response.status_code == 200:
            time_data = response.text.strip()  # Убираем лишние пробелы и переносы строк
            print(f"Time fetched successfully: {time_data}")

            # Парсим строку времени (формат: "UTC: YYYY-MM-DD HH:MM:SS")
            if time_data.startswith("UTC: "):
                time_str = time_data[5:]  # Убираем префикс "UTC: "
                try:
                    # Разделяем строку на дату и время
                    date_part, time_part = time_str.split(" ")
                    year, month, day = map(int, date_part.split("-"))
                    hour, minute, second = map(int, time_part.split(":"))

                    # Создаем кортеж времени (year, month, day, hour, minute, second, weekday, yearday)
                    time_tuple = (year, month, day, hour, minute, second, -1, -1, -1)

                    # Преобразуем кортеж времени в Unix timestamp
                    timestamp = int(time.mktime(time_tuple))
                    return timestamp
                except ValueError as e:
                    print(f"Error parsing time string: {e}")
            else:
                print("Unexpected time format received.")
        else:
            print(f"Failed to fetch time. Status code: {response.status_code}")
    except Exception as e:
        print(f"Error fetching time: {e}")
    return None

def format_time(timestamp):
    """Форматирует Unix timestamp в читаемый формат HH:MM:SS."""
    return time.localtime(timestamp)

async def manage_wifi_connection():
    """Управляет Wi-Fi соединением."""
    global connected, session
    while True:
        if not connected:
            # Сканируем доступные сети
            available_networks = scan_available_networks()

            # Попытка подключения к доступным сетям Wi-Fi из нашего списка
            for network in networks:
                if network["ssid"] in available_networks:
                    if connect_to_wifi(network["ssid"], network["password"]):
                        pool = socketpool.SocketPool(wifi.radio)
                        session = requests.Session(pool)
                        connected = True
                        break

            if not connected:
                print("No available networks. Retrying in 30 seconds...")
                await asyncio.sleep(30)
        else:
            # Проверяем состояние Wi-Fi подключения
            if wifi.radio.ipv4_address is None:
                print("Wi-Fi connection lost. Attempting to reconnect...")
                connected = False
            await asyncio.sleep(5)  # Проверяем состояние каждые 5 секунд

async def synchronize_time():
    """Синхронизирует время с сервером."""
    global local_time, last_fetch_time, time_initialized
    while True:
        if connected:
            current_time = time.monotonic()  # Текущее время в секундах (для отслеживания интервалов)

            # Если локальное время не установлено или прошла минута с последнего запроса
            if local_time is None or (current_time - last_fetch_time) >= 60:
                fetched_time = fetch_time()
                if fetched_time:
                    # Проверяем разницу между локальным временем и полученным с сервера
                    if time_initialized and abs(fetched_time - local_time) < 5:
                        print("Time difference is small. Skipping synchronization.")
                    else:
                        local_time = fetched_time
                        last_fetch_time = current_time
                        time_initialized = True  # Устанавливаем флаг, что время было инициализировано
                else:
                    print("Failed to fetch time. Retrying...")

        await asyncio.sleep(10)  # Проверяем время каждые 10 секунд

async def display_time():
    """Выводит текущее время каждую секунду."""
    global local_time, time_initialized
    while True:
        if time_initialized:
            formatted_time = format_time(local_time)
            print(f"Current time: {formatted_time.tm_hour:02}:{formatted_time.tm_min:02}:{formatted_time.tm_sec:02}")
            local_time += 1  # Увеличиваем локальное время на одну секунду
        await asyncio.sleep(1)  # Ждём одну секунду перед следующим выводом

async def main():
    """Основная функция программы."""
    wifi_task = asyncio.create_task(manage_wifi_connection())
    time_sync_task = asyncio.create_task(synchronize_time())
    display_task = asyncio.create_task(display_time())

    await asyncio.gather(wifi_task, time_sync_task, display_task)

# Запуск программы
asyncio.run(main())
← Previous Next →
Back to list