Skip to content

Instantly share code, notes, and snippets.

@sergiors
Created December 22, 2024 19:19
Show Gist options
  • Save sergiors/59d07324f2c99c3ab97dc97e68cb19f9 to your computer and use it in GitHub Desktop.
Save sergiors/59d07324f2c99c3ab97dc97e68cb19f9 to your computer and use it in GitHub Desktop.
Konviva API python
import os
from http import HTTPMethod
from typing import Any
from urllib.parse import ParseResult, urlparse
import requests
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler.exceptions import BadRequestError
from glom import glom
from requests import request
logger = Logger(__name__)
def _url(**kwargs) -> ParseResult:
return urlparse(os.environ['KONVIVA_API_URL'])._replace(**kwargs)
class Struct:
def __init__(self, **kwargs) -> None:
self.__dict__.update(kwargs)
def asdict(self) -> dict:
return vars(self)
class User(Struct):
IDUsuario: int
Identificador: str
NomeUsuario: str
Email: str
CPF: str | None = None
class Enrollment(Struct):
IDMatricula: int
class KonvivaAPIClient:
def __init__(self, secret_key: str) -> None:
self.secret_key = secret_key
@property
def headers(self) -> dict[str, str]:
return {
'Authorization': f'KONVIVA {self.secret_key}',
'Content-Type': 'application/json',
}
def request(
self,
method: HTTPMethod,
url: ParseResult,
**kwargs,
) -> dict[str, Any]:
try:
r = request(
method=method,
url=url.geturl(),
headers=self.headers,
timeout=12,
**kwargs,
)
r.raise_for_status()
if err := glom(r.json(), 'errors.0', default=None):
raise BadRequestError(err)
except requests.HTTPError as exc:
logger.exception(exc)
raise
else:
return r.json()
class EnrollmentResource:
def __init__(self, client: KonvivaAPIClient) -> None:
self.request = client.request
def enroll(self, user_id: str, class_id: str) -> Enrollment:
r = self.request(
method=HTTPMethod.POST,
url=_url(path='/action/api/integrarMatricula'),
json={
'IDUsuario': user_id,
'IDTurma': class_id,
'StatusMatricula': 'MATRICULADO',
},
)
return Enrollment(**r)
def cancel(self, id: str) -> Enrollment:
r = self.request(
method=HTTPMethod.POST,
url=_url(path='/action/api/integrarMatricula'),
json={
'IDMatricula': id,
'StatusMatricula': 'CANCELADO',
},
)
return Enrollment(**r)
def list(self, user_id: str) -> tuple[Enrollment, ...]:
r = self.request(
method=HTTPMethod.GET,
url=_url(path=f'/action/api/getMatriculaByUsuario/{user_id}'),
)
return tuple(Enrollment(**kwargs) for kwargs in r) # type: ignore
class UserResource:
def __init__(self, client: KonvivaAPIClient) -> None:
self.request = client.request
def get_item(self, id: str) -> User:
r = self.request(
method=HTTPMethod.GET,
url=_url(path=f'/action/api/getUsuario/{id}'),
)
return User(**r)
def update(self, id: str, attrs: dict[str, str]) -> bool:
self.request(
method=HTTPMethod.PUT,
url=_url(
path='/action/api/integrarUsuario',
query='sendMail=false',
),
json={'IDUsuario': id},
)
return True
class Konviva:
def __init__(self, secret_key: str) -> None:
client = KonvivaAPIClient(secret_key)
self.user = UserResource(client)
self.enrollment = EnrollmentResource(client)
@sergiors
Copy link
Author

import os

secret_key = os.environ['KONVIVA_SECRET_KEY']
konviva = Konviva(secret_key)

r = konviva.enrollment.list('123')
print(r)

u = konviva.user.get_item('213')
print(u)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment