Source code for zafiaonline.transport.websocket.websocket_handler

# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2025 unelected
#
# This file is part of the zafiaonline project.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
WebSocketHandler module.

Provides a high-level wrapper around a WebSocket client connection with
automatic reconnection, background message listening, and lifecycle
management. This class is designed to integrate with the zafiaonline
framework and handle unstable network conditions gracefully.

Typical usage example:
    from zafiaonline.transport.websocket.websocket_handler import WebSocketHandler
    from zafiaonline.transport.websocket.websocket_module import Websocket

    ws = Websocket(client)
    handler = WebSocketHandler(ws)
    await handler._connect()
    await handler._post_connect_setup()
"""
import asyncio
import websockets

from typing import TYPE_CHECKING
from websockets.exceptions import ConnectionClosedOK, ConnectionClosed
from websockets.asyncio.client import connect

from zafiaonline.utils.proxy_store import store
from zafiaonline.transport.websocket.config import Config
from zafiaonline.utils.logging_config import logger
from zafiaonline.utils.exceptions import BanError
if TYPE_CHECKING:
    from zafiaonline.transport.websocket.websocket_module import Websocket


[docs] class WebSocketHandler(): """Manages the lifecycle of a WebSocket client connection. Handles connection setup, graceful disconnection, reconnection with exponential backoff, and background listening for incoming messages. Designed for robust operation in unreliable network environments. Attributes: alive (bool): Indicates whether the connection is currently active. ws (websockets.WebSocketClientProtocol | None): The active WebSocket connection instance. uri (str): The WebSocket server URI to connect to. ws_lock (asyncio.Lock): Lock used to protect concurrent access to the WebSocket. listener_task (asyncio.Task | None): Background task that listens for incoming messages. websocket (Any): The WebSocket wrapper that manages low-level connection logic. data_queue (asyncio.Queue): Queue for storing received messages. socket (Websocket): Optional reference to the parent client or controller. """ def __init__(self, socket: "Websocket") -> None: """ Initializes the WebSocket handler with configuration and state. Args: socket (Websocket): The underlying WebSocket client wrapper. """ config: Config = Config() self.alive: bool | None = None self.ws: websockets.ClientConnection | None = None self.data_queue: asyncio.Queue = asyncio.Queue() self.listener_task: asyncio.Task | None = None self.uri: str = f"{config.connect_type}://{config.address}:{config.port}" self.ws_lock: asyncio.Lock = asyncio.Lock() self.websocket: Websocket = socket async def __listener(self) -> None: """ Listens for incoming WebSocket messages and enqueues them. Continuously receives text or binary messages from the active WebSocket connection and adds them to `self.data_queue`, handling normal and unexpected disconnections, task cancellation, and reconnection. Returns: None Raises: AttributeError: If there is no active WebSocket connection. KeyboardInterrupt: If the listener is interrupted by a keyboard interrupt. """ while self.alive: try: if not self.ws: raise AttributeError message: str | bytes = await self.ws.recv() await self.data_queue.put(message) except ConnectionClosedOK: logger.debug("Connection closed normally (1000).") break except websockets.exceptions.ConnectionClosedError as e: logger.warning(f"Connection closed unexpectedly: {e}") break except asyncio.CancelledError: logger.debug("Listener task was cancelled.") break except websockets.ConnectionClosed: logger.warning( "WebSocket connection lost. Attempting to reconnect..." ) asyncio.create_task(self._reconnect()) break except KeyboardInterrupt: raise except Exception as e: logger.error(f"Unexpected error in __listener: {e}") if self.websocket is None: raise AttributeError("No WebSocket") await self.websocket.disconnect() break async def __on_connect(self) -> None: """ Performs handshake actions after establishing a WebSocket connection. Sends an initial handshake message over the active WebSocket and logs the event. ConnectionClosed and other exceptions are handled internally and logged. Returns: None """ try: if not self.ws: raise AttributeError await self.ws.send("Hello, World!") logger.debug("Sent initial handshake message.") except websockets.ConnectionClosed as e: logger.error(f"WebSocket closed before sending handshake: {e}") except Exception as e: logger.error(f"Unexpected error in __on_connect: {e}") async def _cancel_listener_task(self) -> None: """ Cancels the background listener task if it is still running. If `self.listener_task` exists and is not yet done, this method cancels it to stop processing incoming WebSocket messages, enabling a graceful shutdown or reconnection. It is safe to call multiple times. Returns: None """ if self.listener_task and not self.listener_task.done(): self.listener_task.cancel() logger.debug("Listener task cancelled.") async def _connect(self) -> None: """ Creates a WebSocket connection to the configured server URI. Initializes a low-level WebSocket connection using `self.uri`, applies the provided proxy settings, and includes a User-Agent header to mimic a common HTTP client. On success, sets `self.alive` to True. Returns: None. Raises: websockets.exceptions.InvalidURI: If `self.uri` has an invalid format. websockets.exceptions.InvalidHandshake: If the WebSocket handshake fails. Exception: For any other errors encountered during the connection attempt. """ headers: dict[str, str] = { "User-Agent": "okhttp/4.12.0" } if not headers: raise AttributeError("No headers in WebSocket") self.ws = await connect( self.uri, user_agent_header=str(headers), proxy=store.get_random_proxy() ) self.alive = True async def _post_connect_setup(self) -> None: """ Performs post-connection initialization tasks. Calls `__on_connect` to handle any immediate post-connection logic and starts the background listener task for incoming messages. Returns: None """ await self.__on_connect() self.listener_task = asyncio.create_task(self.__listener()) async def _reconnect(self) -> None: """ Attempts to re-establish the WebSocket connection with backoff. When the connection is lost, this method makes up to five reconnection attempts using exponential backoff delays (1s, 2s, 4s, 8s, 16s, capped at 30s). Before each attempt, it safely closes any existing connection state by calling `_attempt_disconnect`. If `_try_create_connection` succeeds, the method returns immediately. If all attempts fail and `_should_stop_reconnect` returns True, it stops retrying without raising an exception. Returns: None """ logger.warning("Attempting to reconnect...") max_attempts: int = 5 for attempt in range(max_attempts): await self._attempt_disconnect() await asyncio.sleep(min(2 ** attempt, 30)) if await self._try_create_connection(): logger.info("Reconnection successful.") return logger.error(f"Reconnection attempt {attempt + 1} failed.") if await self._should_stop_reconnect(): return None logger.critical("Max reconnection attempts reached. Giving up.") return None async def _handle_reconnect(self) -> None: """ Initiates a background reconnection process after connection failure. Sets `self.alive` to False and schedules the `_reconnect` coroutine as a background task without awaiting it. Returns: None """ self.alive = False logger.info("Starting reconnection process.") asyncio.create_task(self._reconnect()) async def _close_websocket(self) -> None: """ Closes the WebSocket connection with a normal closure code. If an active WebSocket connection exists, closes it using code 1000 (normal closure). Safe to call if the connection is already closed or uninitialized. Returns: None Raises: Exception: If an unexpected error occurs during closure. """ try: if not self.ws: raise AttributeError await self.ws.close(code=1000) logger.debug("WebSocket connection closed gracefully.") except ConnectionClosed as e: logger.debug(f"Connection already closed: {e}") return except Exception as e: logger.error(f"Error while closing WebSocket connection: {e}") raise async def _should_stop_reconnect(self) -> bool: """ Determines whether reconnection attempts should cease. Returns: bool: True if the WebSocket connection is inactive and reconnection should stop; otherwise, False. """ if not self.alive: logger.info("WebSocket is inactive. Stopping reconnection.") return True return False async def _attempt_disconnect(self) -> None: """ Safely disconnects the WebSocket before attempting to reconnect. Acquires `self.ws_lock` to ensure no concurrent operations, then calls the `disconnect` method on the underlying WebSocket if the connection is alive. Returns: None """ try: async with self.ws_lock: if self.alive: if self.websocket is None: raise AttributeError("No WebSocket") await self.websocket.disconnect() except Exception as e: logger.error(f"Error during disconnect before reconnect: {e}") async def _try_create_connection(self) -> bool: """ Attempts to establish a new WebSocket connection within a timeout. Calls `self.websocket.create_connection()` and waits up to 10 seconds for it to complete. Returns: bool: True if the connection was established successfully within the timeout; otherwise, False (on timeout or other errors). """ try: if self.websocket is None: raise AttributeError("No WebSocket") await asyncio.wait_for( self.websocket.create_connection(), timeout=10 ) return True except asyncio.TimeoutError: logger.error("Timeout while trying to reconnect.") return False except Exception as e: logger.error(f"Unexpected error in _try_create_connection: {e}") return False async def _possibility_of_sending(self) -> bool: """ Check if sending data over the WebSocket is possible. This method verifies the connection state and attempts reconnection if the WebSocket is not alive. If reconnection fails or the client is banned, sending is not possible. Returns: bool: True if the WebSocket connection is alive and sending is possible, False otherwise. """ if not self.alive: try: logger.error( "WebSocket is not connected. Attempting to reconnect..." ) await self._reconnect() if not self.alive: logger.error("Reconnection failed. Dropping message.") return False except BanError: return False return True