events_protocol.core.views¶
Submodules¶
Package Contents¶
-
class
events_protocol.core.views.BaseHealth¶ Bases:
events_protocol.core.views.base.BaseView-
_checkers:typing.List[typing.Dict[str, typing.Union[typing.Awaitable[typing.Callable], typing.Callable]]] = []¶
-
classmethod
add_checker(cls, checker_name: str, checker_func: typing.Union[typing.Awaitable[typing.Callable], typing.Callable])¶
-
async
_get(self)¶
-
-
class
events_protocol.core.views.BaseView¶ -
request¶
-
_base_header¶
-
async
send_response(self, message: str = None, data: dict = None, description: str = None, http_status: int = None, code: int = 1000, log_level=None)¶
-
abstract
get_query_args(self)¶
-
abstract async
write_response(self, http_status: int, description: str, response_body: dict, log_level: str = None, headers: dict = {})¶
-
async
_treat_general_exception(self, exception: Exception)¶
-
-
async
events_protocol.core.views.init_app(routes: typing.List[typing.Tuple[View, str]], middlewares: typing.Iterable[AIOHTTPMiddleware] = []) → Application¶
-
events_protocol.core.views._NOT_ALLOWED_AIOHTTP¶
-
class
events_protocol.core.views.AIOHTTPView(*args, **kwargs)¶ Bases:
events_protocol.core.views.base.BaseView,aiohttp.web.View-
request:Request¶
-
body:str¶
-
__iter__(self)¶
-
async
get_body(self)¶
-
async
get(self, *args, **kwargs)¶
-
async
_get(self, *args, **kwargs)¶
-
async
put(self, *args, **kwargs)¶
-
async
_put(self, *args, **kwargs)¶
-
async
post(self, *args, **kwargs)¶
-
async
_post(self, *args, **kwargs)¶
-
async
delete(self, *args, **kwargs)¶
-
async
_delete(self, *args, **kwargs)¶
-
get_query_args(self)¶
-
async
write_response(self, http_status: HTTPStatus, response_body: dict, headers: dict = {})¶
-
-
class
events_protocol.core.views.AIOHTTPHealthCheckView¶ Bases:
events_protocol.core.views.base.BaseHealth,events_protocol.core.views.aiohttp.AIOHTTPView
-
class
events_protocol.core.views.AIOHTTPView(*args, **kwargs) Bases:
events_protocol.core.views.base.BaseView,aiohttp.web.View-
request:Request
-
body:str
-
__iter__(self)
-
async
get_body(self)
-
async
get(self, *args, **kwargs)
-
async
_get(self, *args, **kwargs)
-
async
put(self, *args, **kwargs)
-
async
_put(self, *args, **kwargs)
-
async
post(self, *args, **kwargs)
-
async
_post(self, *args, **kwargs)
-
async
delete(self, *args, **kwargs)
-
async
_delete(self, *args, **kwargs)
-
get_query_args(self)
-
async
write_response(self, http_status: HTTPStatus, response_body: dict, headers: dict = {})
-
-
class
events_protocol.core.views.DataDogAsyncEventProcessor¶ Bases:
events_protocol.server.parser.event_processor.AsyncEventProcessor-
async classmethod
process_event(cls, raw_event: str, request)¶
-
async classmethod
-
class
events_protocol.core.views.EventView¶ Bases:
events_protocol.core.views.aiohttp.AIOHTTPView-
async
_post(self)¶
-
async
-
class
events_protocol.core.views.EventContextHolder¶ -
static
get()¶
-
static
set(event_context: EventContext)¶
-
static
clean()¶
-
classmethod
with_context(cls, context_id: IdType, context_flow_id: IdType, event_name: str, event_version: int, user_id: str = None, user_type: str = None)¶
-
async classmethod
with_async_context(cls, context_id: IdType, context_flow_id: IdType, event_name: str, event_version: int, user_id: str = None, user_type: str = None)¶
-
static
-
exception
events_protocol.core.views.EventException(parameters: Dict[str, Optional[Any]], expected: bool = False)¶ Bases:
RuntimeError-
_CODE:str =¶
-
_TYPE:EventErrorType¶
-
property
code(self)¶
-
property
event_error_type(self)¶
-
-
exception
events_protocol.core.views.EventParsingException¶ Bases:
events_protocol.core.exception.EventException-
_CODE= INVALID_COMMUNICATION_PROTOCOL¶
-
_TYPE¶
-
-
exception
events_protocol.core.views.ValidationError¶ Bases:
Exception-
fields:typing.List[Field]¶
-
to_dict(self)¶
-
to_json(self)¶
-
-
class
events_protocol.core.views.Event¶ Bases:
events_protocol.core.model.base.CamelPydanticMixin-
name:str¶
-
version:int¶
-
payload:PayloadType¶
-
id:Optional[str]¶
-
flow_id:Optional[str]¶
-
identity:Optional[Dict[str, Any]]¶
-
auth:Optional[Dict[str, Any]]¶
-
metadata:Optional[Dict[str, Any]]¶
-
payload_as(self, clazz: CamelPydanticMixin)¶
-
identity_as(self, clazz: Generic)¶
-
auth_as(self, clazz: Generic)¶
-
property
user_id(self)¶
-
property
user_type(self)¶
-
property
user(self)¶
-
property
origin(self)¶
-
-
class
events_protocol.core.views.ResponseEvent¶ Bases:
events_protocol.core.model.event.Event-
static
from_event(event: Event, event_type: EventType = EventSuccessType.SUCCESS)¶
-
property
is_success(self)¶
-
property
is_redirect(self)¶
-
property
is_error(self)¶
-
property
_event_name(self)¶
-
property
event_type(self)¶
-
property
error_type(self)¶
-
static
-
class
events_protocol.core.views.EventHandler¶ Bases:
abc.ABC-
event_name:str¶
-
event_version:typing.Union[None, int]¶
-
_SCHEMA:CamelPydanticMixin¶
-
__post_init__(self)¶
-
abstract
handle(cls, event: RequestEvent)¶
-
classmethod
parse_event(cls, event: Event)¶
-
-
class
events_protocol.core.views.AsyncEventHandler¶ Bases:
events_protocol.server.handler.event_handler.EventHandler,abc.ABC-
_SCHEMA:CamelPydanticMixin¶
-
abstract async
handle(cls, event: RequestEvent)¶
-
-
class
events_protocol.core.views.EventDiscovery¶ Bases:
events_protocol.core.logging.mixins.loggable.LoggableMixin-
_events:typing.Dict[typing.Tuple[str, int], typing.Union[EventHandler, AsyncEventHandler]]¶
-
_EVENT_NAME_STD:str = [a-z_]+[a-z]:[a-z_]+[a-z](:[a-z]+[a-z])*¶
-
classmethod
add(cls, event_name: str, event_handler: EventHandler, version: int = 1)¶
-
classmethod
get(cls, event_name: str, event_version: int = 1)¶
-
-
class
events_protocol.core.views.EventBuilder¶ Bases:
events_protocol.core.logging.mixins.loggable.LoggableMixin-
classmethod
error_for(cls, exception: EventException, event: typing.Optional[Event] = Event(name='', version=1, id=str(uuid4())), id_flow=str(uuid4()), loggable=True)¶
-
classmethod
response_for(cls, event: Event, payload: PayloadType, event_type: EventSuccessType = EventSuccessType.SUCCESS, loggable=True)¶
-
classmethod
-
class
events_protocol.core.views.EventProcessor¶ Bases:
events_protocol.core.logging.mixins.loggable.LoggableMixin-
event_discovery¶
-
event_validator¶
-
classmethod
process_event(cls, raw_event: str)¶
-
classmethod
parse_event(cls, str_event: str)¶
-
-
class
events_protocol.core.views.AsyncEventProcessor¶ Bases:
events_protocol.server.parser.event_processor.EventProcessor-
async classmethod
process_event(cls, raw_event: str)¶
-
async classmethod