# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2022 Crisp Crow
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Air Raid Alert API client module."""
from __future__ import annotations
__all__: typing.Sequence[str] = ('APIClient', 'GatewayClient')
import asyncio
import typing
import aiohttp
from aiohttp_sse_client import client as sse_client
from alertapi.impl import http
from alertapi.impl import event_manager
from alertapi.impl import event_factory
from alertapi.impl import entity_factory
from alertapi.internal import converters
from alertapi.internal import routes
if typing.TYPE_CHECKING:
from alertapi.internal.converters import StateConverter
from alertapi.events import base_events
from alertapi import snowflakes
from alertapi import states
from alertapi import images
[docs]class APIClient:
"""Alert API client.
Api client for Air Raid Alert API.
Parameters
----------
access_token : builtins.str
An access token to the Air Raid Alert API.
Can be obtained `here <https://alerts.com.ua>`_
Example
-------
.. code-block:: python
import asyncio
import alertapi
async def main() -> None:
client = alertapi.APIClient(access_token='...')
print(await client.fetch_states())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
__slots__: typing.Sequence[str] = (
'_session',
'_access_token',
'_http',
'_state_converter'
)
def __init__(self, access_token: str) -> None:
self._access_token = access_token
self._session = aiohttp.ClientSession
self._http = http.HttpClientImpl(access_token, self._session)
self._state_converter = converters.StateConverter()
@property
def access_token(self) -> str:
return self._access_token
@typing.overload
async def fetch_states(self, state: snowflakes.Snowflake) -> states.State:
...
@typing.overload
async def fetch_states(self, with_alert: bool) -> tuple[states.State]:
...
@typing.overload
async def fetch_states(self, limit: int) -> tuple[states.State]:
...
[docs] async def fetch_states(
self,
state: typing.Optional[snowflakes.Snowflake] = None,
with_alert: typing.Optional[bool] = None,
limit: typing.Optional[int] = 25
) -> tuple[states.State]:
"""Fetch all state entities from Alert API.
Parameters
----------
state : typing.Optional[alertapi.snowflakes.Snowflake]
State for search. If specified,
returns state object with information.
with_alert : typing.Optional[builtins.bool]
Fetch states with active/inactive alert.
limit : typing.Optional[builtins.int]
Limit of states. Defaults to 25.
Returns
-------
alertapi.states.State
Deserialied state entity if state is specified.
builtins.tuple[alertapi.states.State]
Tuple of deserialised state entities.
Raises
------
alertapi.errors.StateNotFound
* If specified state does not exists.
"""
if state:
return await self.fetch_state(state)
return await self._http.fetch_states(with_alert=with_alert, limit=limit)
[docs] async def fetch_state(
self,
state: typing.Union[
typing.Literal[StateConverter.STATES], snowflakes.Snowflake
]
) -> states.State:
"""Fetch state entity from Alert API
Parameters
----------
state : typing.Union[typing.Literal[converters.StateConverter.STATES], snowflakes.Snowflake]
State for search.
Returns
-------
alertapi.states.State
Deserialied state entity.
Raises
------
alertapi.errors.StateNotFound
* If specified state does not exists.
"""
if isinstance(state, str):
state = self._state_converter.convert(state)
return await self._http.fetch_state(state=state)
[docs] async def is_alert(
self,
state: typing.Union[
typing.Literal[StateConverter.STATES], snowflakes.Snowflake
]
) -> bool:
"""Check whether active alert in specified state or not.
Parameters
----------
state : typing.Union[typing.Literal[converters.StateConverter.STATES], snowflakes.Snowflake]
State for search.
Returns
-------
builtins.bool
* `builtins.True` if alert is active.
* `builtins.False` if alert is inactive.
Raises
------
alertapi.errors.StateNotFound
* If specified state does not exists.
"""
if isinstance(state, str):
state = self._state_converter.convert(state)
return await self._http.is_alert(state=state)
[docs] async def static_map(self) -> images.Image:
"""Fetch static map of states.
Returns
-------
alertapi.images.Image
Deserialized Image object.
"""
return await self._http.fetch_static_map()
[docs]class GatewayClient:
"""Gateway Alert API client.
Parameters
----------
access_token : builtins.str
An access token to the Air Raid Alert API.
Can be obtained `here <https://alerts.com.ua>`_
Example
-------
.. code-block:: python
import alertapi
client = alertapi.GatewayClient(access_token='...')
@client.listen(alertapi.ClientConnectedEvent)
async def on_client_connected(event: alertapi.ClientConnectedEvent) -> None:
states = await event.api.fetch_states()
print(states)
@client.listen(alertapi.StateUpdateEvent)
async def on_state_update(event: alertapi.StateUpdateEvent) -> None:
print('State updated': event.state)
client.connect()
"""
__slots__: typing.Sequence[str] = (
'_access_token',
'_event_source',
'_client',
'_event_factory',
'_entity_factory',
'_event_manager',
'_loop'
)
def __init__(self, access_token: str) -> None:
self._access_token = access_token
self._event_source = sse_client.EventSource
self._client = APIClient(access_token=self._access_token)
self._event_factory = event_factory.EventFactoryImpl(self._client)
self._entity_factory = entity_factory.EntityFactoryImpl()
self._event_manager = event_manager.EventManagerImpl(self._event_factory, self._entity_factory)
self._loop = asyncio.get_event_loop()
@property
def access_token(self) -> str:
return self._access_token
@property
def loop(self) -> str:
return self._loop
@property
def entity_factory(self) -> entity_factory.EntityFactoryImpl:
return self._entity_factory
@property
def event_factory(self) -> event_factory.EventFactoryImpl:
return self._event_factory
@property
def client(self) -> APIClient:
return self._client
[docs] def connect(self) -> None:
"""Connect client to Air Raid Alert API endpoint."""
compiled_route = routes.SSE_LIVE.compile()
url = compiled_route.create_url(routes.BASE_URL)
headers = {'X-API-Key': self._access_token}
self._loop.run_until_complete(self._listen_event_source(url, headers))
async def _listen_event_source(self, url: str, headers: dict[str, typing.Any]) -> None:
"""Connect to SSE endpoint and listen events.
Parameters
----------
url : builtins.str
Url to endpoint.
headers : builtins.dict[builtins.str, typing.Any]
Headers for HTTP-request body.
"""
async with self._event_source(url, timeout=None, headers=headers) as event_source:
async for event in event_source:
self._event_manager.consume_raw_event(event)
[docs] def listen(self, event_type: typing.Type[base_events.Event]) -> typing.Callable:
"""Generate a decorator to subscribe a callback to an event type.
Parameters
----------
event_type : typing.Type[alertapi.events.base_events.Event]
The event type to subscribe to.
Returns
-------
typing.Callable
A decorator for a coroutine function that passes it to
`EventManager.subscribe` before returning the function
reference.
"""
return self._event_manager.listen(event_type)
[docs] def subscribe(self, event_type: typing.Type[base_events.Event], callback: typing.Callable) -> None:
"""Subscribe a given callback to a given event type.
Parameters
----------
event_type : typing.Type[alertapi.events.base_events.Event]
The event type to listen for. This will also listen for any
subclasses of the given type.
callback : typing.Callable
Must be a coroutine function to invoke. This should
consume an instance of the given event.
Example
-------
The following demonstrates subscribing a callback to state update event.
.. code-block :: python
from alertapi.events.base_events import StateUpdateEvent
async def on_state_update(event):
...
client.subscribe(StateUpdateEvent, on_state_update)
"""
self._event_manager.subscribe(event_type, callback)