2025-04-07 22:03:26
#ok # ESP32-S3-Zero: Получение времени с http://ur4uqu.com/time/ и вывод в терминал каждую секунду
# Используется asyncio для параллельной обработки задач
import wifi
import socketpool
import adafruit_requests as requests
import time
import asyncio
# Список сетей Wi-Fi
networks = [
{"ssid": 'F_2', "password": '888'},
{"ssid": 'mi5', "password": '888'},
{"ssid": 'prg', "password": '888'},
]
# URL для GET-запроса
TIME_URL = "http://ur4uqu.com/time/"
# Глобальные переменные
local_time = None
last_fetch_time = 0 # Время последнего запроса времени с сервера
connected = False # Флаг состояния подключения к Wi-Fi
time_initialized = False # Флаг, указывающий, было ли время хотя бы раз установлено
session = None # HTTP-сессия
def connect_to_wifi(ssid, password):
"""Подключается к Wi-Fi."""
try:
print(f"Trying to connect to {ssid}...")
wifi.radio.connect(ssid, password)
print(f"Connected to {ssid}.")
print(f"IP address: {wifi.radio.ipv4_address}")
return True
except ConnectionError as e:
print(f"Connection error for {ssid}: {e}")
except Exception as e:
print(f"Unexpected error connecting to {ssid}: {e}")
return False
def scan_available_networks():
"""Сканирует доступные сети Wi-Fi."""
available_networks = []
try:
print("Scanning for available networks...")
for network in wifi.radio.start_scanning_networks():
available_networks.append(network.ssid)
wifi.radio.stop_scanning_networks()
print(f"Available networks: {available_networks}")
except Exception as e:
print(f"Error scanning networks: {e}")
return available_networks
def fetch_time():
"""Получает время с указанного URL."""
global session
try:
print(f"Fetching time from {TIME_URL}...")
response = session.get(TIME_URL)
if response.status_code == 200:
time_data = response.text.strip() # Убираем лишние пробелы и переносы строк
print(f"Time fetched successfully: {time_data}")
# Парсим строку времени (формат: "UTC: YYYY-MM-DD HH:MM:SS")
if time_data.startswith("UTC: "):
time_str = time_data[5:] # Убираем префикс "UTC: "
try:
# Разделяем строку на дату и время
date_part, time_part = time_str.split(" ")
year, month, day = map(int, date_part.split("-"))
hour, minute, second = map(int, time_part.split(":"))
# Создаем кортеж времени (year, month, day, hour, minute, second, weekday, yearday)
time_tuple = (year, month, day, hour, minute, second, -1, -1, -1)
# Преобразуем кортеж времени в Unix timestamp
timestamp = int(time.mktime(time_tuple))
return timestamp
except ValueError as e:
print(f"Error parsing time string: {e}")
else:
print("Unexpected time format received.")
else:
print(f"Failed to fetch time. Status code: {response.status_code}")
except Exception as e:
print(f"Error fetching time: {e}")
return None
def format_time(timestamp):
"""Форматирует Unix timestamp в читаемый формат HH:MM:SS."""
return time.localtime(timestamp)
async def manage_wifi_connection():
"""Управляет Wi-Fi соединением."""
global connected, session
while True:
if not connected:
# Сканируем доступные сети
available_networks = scan_available_networks()
# Попытка подключения к доступным сетям Wi-Fi из нашего списка
for network in networks:
if network["ssid"] in available_networks:
if connect_to_wifi(network["ssid"], network["password"]):
pool = socketpool.SocketPool(wifi.radio)
session = requests.Session(pool)
connected = True
break
if not connected:
print("No available networks. Retrying in 30 seconds...")
await asyncio.sleep(30)
else:
# Проверяем состояние Wi-Fi подключения
if wifi.radio.ipv4_address is None:
print("Wi-Fi connection lost. Attempting to reconnect...")
connected = False
await asyncio.sleep(5) # Проверяем состояние каждые 5 секунд
async def synchronize_time():
"""Синхронизирует время с сервером."""
global local_time, last_fetch_time, time_initialized
while True:
if connected:
current_time = time.monotonic() # Текущее время в секундах (для отслеживания интервалов)
# Если локальное время не установлено или прошла минута с последнего запроса
if local_time is None or (current_time - last_fetch_time) >= 60:
fetched_time = fetch_time()
if fetched_time:
# Проверяем разницу между локальным временем и полученным с сервера
if time_initialized and abs(fetched_time - local_time) < 5:
print("Time difference is small. Skipping synchronization.")
else:
local_time = fetched_time
last_fetch_time = current_time
time_initialized = True # Устанавливаем флаг, что время было инициализировано
else:
print("Failed to fetch time. Retrying...")
await asyncio.sleep(10) # Проверяем время каждые 10 секунд
async def display_time():
"""Выводит текущее время каждую секунду."""
global local_time, time_initialized
while True:
if time_initialized:
formatted_time = format_time(local_time)
print(f"Current time: {formatted_time.tm_hour:02}:{formatted_time.tm_min:02}:{formatted_time.tm_sec:02}")
local_time += 1 # Увеличиваем локальное время на одну секунду
await asyncio.sleep(1) # Ждём одну секунду перед следующим выводом
async def main():
"""Основная функция программы."""
wifi_task = asyncio.create_task(manage_wifi_connection())
time_sync_task = asyncio.create_task(synchronize_time())
display_task = asyncio.create_task(display_time())
await asyncio.gather(wifi_task, time_sync_task, display_task)
# Запуск программы
asyncio.run(main())
Back to list