TrashNotes

2025-04-05 18:59:37
# variant (not tested)
import supervisor
import wifi
import socketpool
import time
import os
import analogio
import board
import adafruit_requests as requests
import digitalio

# Конфигурация
server_ip = "172.104.137.172"

# Список сетей
networks = [
    {"ssid": 'mi5', "password": '***'},
    {"ssid": 'prg', "password": '***'},
]

# URL для GET-запроса
HTTP_URL = "http://prg.in.ua/ard/ard-get.php?id=1&;get={}"  # Форматированная строка

# Инициализация цифровых выходов (пример)
d10 = digitalio.DigitalInOut(board.D10)
d10.direction = digitalio.Direction.OUTPUT

d11 = digitalio.DigitalInOut(board.D11)
d11.direction = digitalio.Direction.OUTPUT

d12 = digitalio.DigitalInOut(board.D12)
d12.direction = digitalio.Direction.OUTPUT

d5 = digitalio.DigitalInOut(board.D5)
d5.direction = digitalio.Direction.OUTPUT

def reset_all_outputs():
    """Сбрасывает все цифровые выходы в состояние 0."""
    for name, obj in globals().items():
        if isinstance(obj, digitalio.DigitalInOut) and hasattr(obj, 'direction'):
            if obj.direction == digitalio.Direction.OUTPUT:
                obj.value = False
    print("All digital outputs have been reset to 0.")

def scan_available_networks():
    """Сканирует доступные сети."""
    available_networks = []
    for network in wifi.radio.start_scanning_networks():
        available_networks.append(network)
    wifi.radio.stop_scanning_networks()
    return available_networks

def connect_to_wifi(ssid, password):
    """Подключается к Wi-Fi."""
    try:
        print(f"Trying to connect to {ssid}...")
        wifi.radio.connect(ssid, password)
        print(f"Connected to {ssid}.")
        print(f"IP address: {wifi.radio.ipv4_address}")
        return True
    except ConnectionError as e:
        print(f"Connection error for {ssid}: {e}")
    except Exception as e:
        print(f"Unexpected error connecting to {ssid}: {e}")
    return False

def ping_server(pool, server_ip):
    """Пингует сервер и возвращает время пинга в миллисекундах."""
    try:
        start_time = time.monotonic()
        socket = pool.socket()
        socket.connect((server_ip, 53))  # DNS порт для проверки подключения
        socket.close()
        end_time = time.monotonic()
        ping_time_ms = (end_time - start_time) * 1000
        print(f"Ping to {server_ip} successful. Time: {ping_time_ms:.2f} ms")
        return True, ping_time_ms
    except Exception as e:
        print(f"Error pinging {server_ip}: {e}")
        return False, None

def get_voltage(pin):
    """Преобразует значение аналогового входа в напряжение."""
    return (pin.value * 3.3) / 65536

def set_digital_outputs(response_text):
    """
    Устанавливает уровни на цифровых выходах на основе ответа сервера.
    Динамически извлекает все пары ключ=значение после маркера "out,".
    """
    try:
        # Разделяем строку по запятым
        parts = response_text.split(',')

        # Ищем индекс начала секции "out"
        try:
            out_index = parts.index("out") + 1  # Начинаем после "out"
        except ValueError:
            print("Error: 'out' section not found in response.")
            print(f"Response text: {response_text}")
            return

        # Формируем словарь значений из секции "out"
        output_values = {}
        for part in parts[out_index:]:
            if '=' in part:  # Проверяем, что это пара ключ=значение
                key, value = part.split('=', 1)  # Разделяем по первому знаку "="
                output_values[key] = int(value)

        # Перебираем все глобальные переменные, чтобы найти инициализированные пины
        for name, obj in globals().items():
            # Проверяем, является ли объект пином и инициализирован ли он как выход
            if isinstance(obj, digitalio.DigitalInOut) and hasattr(obj, 'direction'):
                if obj.direction == digitalio.Direction.OUTPUT:
                    # Если пин инициализирован как выход, устанавливаем значение
                    pin_value = output_values.get(name, 0)  # По умолчанию 0, если ключ отсутствует
                    obj.value = bool(pin_value)
                    print(f"{name} set to: {pin_value}")
                else:
                    print(f"Warning: Pin {name} is not initialized as an output.")
            else:
                # Игнорируем объекты, которые не являются пинами
                continue

    except Exception as e:
        # Обработка любых других ошибок
        print(f"Error parsing output values: {e}")

def main():
    # Инициализация аналоговых входов
    analog_in_a0 = analogio.AnalogIn(board.A0)
    analog_in_a1 = analogio.AnalogIn(board.A1)
    analog_in_a2 = analogio.AnalogIn(board.A2)
    analog_in_a3 = analogio.AnalogIn(board.A3)

    while True:
        available_networks = scan_available_networks()
        connected = False

        for network in networks:
            if any(net.ssid == network["ssid"] for net in available_networks):
                print(f"Network {network['ssid']} is available. Attempting to connect...")
                if connect_to_wifi(network["ssid"], network["password"]):
                    pool = socketpool.SocketPool(wifi.radio)
                    print(f"Using network: {network['ssid']}, IP: {wifi.radio.ipv4_address}")
                    
                    session = None

                    while True:
                        try:
                            if session is None:
                                session = requests.Session(pool)

                            reachable, ping_time = ping_server(pool, server_ip)
                            if reachable:
                                print(f"Server {server_ip} is reachable. Ping time: {ping_time:.2f} ms")
                                
                                # Получите значения с аналоговых входов
                                voltage_a0 = get_voltage(analog_in_a0)
                                voltage_a1 = get_voltage(analog_in_a1)
                                voltage_a2 = get_voltage(analog_in_a2)
                                voltage_a3 = get_voltage(analog_in_a3)

                                print("A0 Voltage:", voltage_a0)
                                print("A1 Voltage:", voltage_a1)
                                print("A2 Voltage:", voltage_a2)
                                print("A3 Voltage:", voltage_a3)

                                # Сформируйте URL с значениями всех аналоговых входов
                                url = HTTP_URL.format(f"{voltage_a0},{voltage_a1},{voltage_a2},{voltage_a3}")
                                print("Sending:", url)

                                # Отправьте GET-запрос
                                try:
                                    response = session.get(url)
                                    print("Response:", response.status_code, response.text)  # Выведите статус и содержимое ответа

                                    # Установите цифровые выходы
                                    set_digital_outputs(response.text)

                                    response.close()  # Закройте соединение
                                except Exception as e:
                                    print("Error sending request:", e)
                                    session = None  # Перезапустите сессию при ошибке

                            else:
                                print(f"Server {server_ip} is not reachable. Switching to next network.")
                                break
                            connected = True
                            time.sleep(10)  # Ждите 10 секунд
                        except Exception as e:
                            print(f"Unexpected error: {e}")
                            session = None  # Перезапустите сессию при ошибке
                            break
                else:
                    print(f"Failed to connect to {network['ssid']}. Trying next network...")
                    continue
            else:
                print(f"{network['ssid']} not available.")

        # Если ни одна сеть не доступна
        if not connected:
            print("No available networks. Resetting all digital outputs to 0.")
            reset_all_outputs()
            print("Trying again in 5 seconds.")
            time.sleep(5)

while True:
    main()
← Previous Next →
Back to list