Source code for bleak.backends.winrt.util

import sys
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    if sys.platform != "win32":
        assert False, "This backend is only available on Windows"

import asyncio
import ctypes
from ctypes import wintypes
from enum import IntEnum

from bleak.exc import BleakError

if sys.version_info < (3, 11):
    from async_timeout import timeout as async_timeout
else:
    from asyncio import timeout as async_timeout


def _check_result(result: int, func, args):
    if not result:
        raise ctypes.WinError()

    return args


def _check_hresult(result: int, func, args):
    if result:
        raise ctypes.WinError(result)

    return args


# not defined in wintypes
_UINT_PTR = wintypes.WPARAM

# https://learn.microsoft.com/en-us/windows/win32/api/winuser/nc-winuser-timerproc
_TIMERPROC = ctypes.WINFUNCTYPE(
    None, wintypes.HWND, _UINT_PTR, wintypes.UINT, wintypes.DWORD
)

# https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-settimer
_SET_TIMER_PROTOTYPE = ctypes.WINFUNCTYPE(
    _UINT_PTR, wintypes.HWND, _UINT_PTR, wintypes.UINT, _TIMERPROC
)
_SET_TIMER_PARAM_FLAGS = (
    (1, "hwnd", None),
    (1, "nidevent"),
    (1, "uelapse"),
    (1, "lptimerfunc", None),
)
_SetTimer = _SET_TIMER_PROTOTYPE(
    ("SetTimer", ctypes.windll.user32), _SET_TIMER_PARAM_FLAGS
)
_SetTimer.errcheck = _check_result

# https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-killtimer
_KILL_TIMER_PROTOTYPE = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, _UINT_PTR)
_KILL_TIMER_PARAM_FLAGS = (
    (1, "hwnd", None),
    (1, "uidevent"),
)
_KillTimer = _KILL_TIMER_PROTOTYPE(
    ("KillTimer", ctypes.windll.user32), _KILL_TIMER_PARAM_FLAGS
)

# https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-cogetapartmenttype
_CO_GET_APARTMENT_TYPE_PROTOTYPE = ctypes.WINFUNCTYPE(
    ctypes.c_int,
    ctypes.POINTER(ctypes.c_int),
    ctypes.POINTER(ctypes.c_int),
)
_CO_GET_APARTMENT_TYPE_PARAM_FLAGS = (
    (1, "papttype", None),
    (1, "paptqualifier", None),
)
_CoGetApartmentType = _CO_GET_APARTMENT_TYPE_PROTOTYPE(
    ("CoGetApartmentType", ctypes.windll.ole32), _CO_GET_APARTMENT_TYPE_PARAM_FLAGS
)
_CoGetApartmentType.errcheck = _check_hresult

_CO_E_NOTINITIALIZED = -2147221008


# https://learn.microsoft.com/en-us/windows/win32/api/objidl/ne-objidl-apttype
class _AptType(IntEnum):
    CURRENT = -1
    STA = 0
    MTA = 1
    NA = 2
    MAIN_STA = 3


# https://learn.microsoft.com/en-us/windows/win32/api/objidl/ne-objidl-apttypequalifier
class _AptQualifierType(IntEnum):
    NONE = 0
    IMPLICIT_MTA = 1
    NA_ON_MTA = 2
    NA_ON_STA = 3
    NA_ON_IMPLICIT_STA = 4
    NA_ON_MAIN_STA = 5
    APPLICATION_STA = 6
    RESERVED_1 = 7


def _get_apartment_type() -> tuple[_AptType, _AptQualifierType]:
    """
    Calls CoGetApartmentType to get the current apartment type and qualifier.

    Returns:
        The current apartment type and qualifier.
    Raises:
        OSError: If the call to CoGetApartmentType fails.
    """
    api_type = ctypes.c_int()
    api_type_qualifier = ctypes.c_int()
    _CoGetApartmentType(ctypes.byref(api_type), ctypes.byref(api_type_qualifier))
    return _AptType(api_type.value), _AptQualifierType(api_type_qualifier.value)


[docs] async def assert_mta() -> None: """ Asserts that the current apartment type is MTA. Raises: BleakError: If the current apartment type is not MTA and there is no Windows message loop running. .. versionadded:: 0.22 .. versionchanged:: 0.22.2 Function is now async and will not raise if the current apartment type is STA and the Windows message loop is running. """ if hasattr(allow_sta, "_allowed"): return try: apt_type, _ = _get_apartment_type() except OSError as e: # All is OK if not initialized yet. WinRT will initialize it. if e.winerror == _CO_E_NOTINITIALIZED: return raise if apt_type == _AptType.MTA: # if we get here, WinRT probably set the apartment type to MTA and all # is well, we don't need to check again setattr(allow_sta, "_allowed", True) return event = asyncio.Event() def wait_event(*_): event.set() # have to keep a reference to the callback or it will be garbage collected # before it is called callback = _TIMERPROC(wait_event) # set a timer to see if we get a callback to ensure the windows event loop # is running timer = _SetTimer(None, 1, 0, callback) try: async with async_timeout(0.5): await event.wait() except asyncio.TimeoutError: raise BleakError( "Thread is configured for Windows GUI but callbacks are not working." + ( " Suspect unwanted side effects from importing 'pythoncom'." if "pythoncom" in sys.modules else "" ) ) else: # if the windows event loop is running, we assume it is going to keep # running and we don't need to check again setattr(allow_sta, "_allowed", True) finally: _KillTimer(None, timer)
[docs] def allow_sta(): """ Suppress check for MTA thread type and allow STA. Bleak will hang forever if the current thread is not MTA - unless there is a Windows event loop running that is properly integrated with asyncio in Python. If your program meets that condition, you must call this function do disable the check for MTA. If your program doesn't have a graphical user interface you probably shouldn't call this function. and use ``uninitialize_sta()`` instead. .. versionadded:: 0.22.1 """ allow_sta._allowed = True
[docs] def uninitialize_sta(): """ Uninitialize the COM library on the current thread if it was not initialized as MTA. This is intended to undo the implicit initialization of the COM library as STA by packages like pywin32. It should be called as early as possible in your application after the offending package has been imported. .. versionadded:: 0.22 """ try: _get_apartment_type() except OSError as e: # All is OK if not initialized yet. WinRT will initialize it. if e.winerror == _CO_E_NOTINITIALIZED: return else: ctypes.windll.ole32.CoUninitialize()