Select Git revision
communication.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
communication.py 3.94 KiB
"""
This module is responsible for the communication to and from ATDB
"""
from argparse import Namespace
from typing import List, Generator
import requests
class APIError(Exception):
"""
The APIError is an exception which is raised when the communication with the ATDB
API fails or the requested operation fails
"""
def __init__(self, reply: requests.Response):
status_code = reply.status_code
reason = reply.reason
message = reply.content
url = reply.url
super().__init__(
self, f"Failed contacting {url} [{status_code}]: {reason} - {message}"
)
class DRFReply:
"""
A class to represent the DJANGO REST framework reply
"""
def __init__(self, response):
self._content = response.json()
@property
def n_items(self) -> int:
"""
Returns the number of items in the DRF reply
"""
return self._content["count"]
@property
def results(self) -> List[dict]:
"""
Access to the results list
"""
return self._content["results"]
@property
def next_page_url(self):
"""
Access to the next page if the results are paginated
"""
return self._content["next"]
@property
def previous_page_url(self):
"""
Access to the previous page of results if results are paginated
"""
return self._content["previous"]
class APIConnector:
"""
A class to represent the connection to the API
"""
def __init__(self, url, token):
self._url = url.rstrip("/")
self._token = token
self._session = None
@staticmethod
def from_args(args: Namespace):
"""
Creates API connector from command line arguments
"""
return APIConnector(args.atdb_site, args.token)
def session(self) -> requests.Session:
"""
Returns a http session object and creates if it is not initialized
"""
if self._session is None:
self._session = self.start_session()
return self._session
def start_session(self) -> requests.Session:
"""
Start a session
"""
session_instance = requests.Session()
session_instance.headers["Authorization"] = f"Token {self._token}"
session_instance.headers["content-type"] = "application/json"
session_instance.headers["cache-control"] = "no-cache"
return session_instance
def _request_url(self, method, url, query=None, content=None):
url = url.replace("http://", "https://")
response = self.session().request(method, url, params=query, json=content)
if not response.ok:
raise APIError(response)
return response
def _request_path(self, method, item, query=None, content=None):
url = "/".join((self._url, item.lstrip("/"), ""))
return self._request_url(method, url, query=query, content=content)
def list_iter(self, object_type, query=None) -> Generator[dict, None, None]:
"""
Returns a list iterator to a specific object_type in the REST API
"""
response = self._request_path("get", object_type, query=query)
drf_reply = DRFReply(response)
for item in drf_reply.results:
yield item
while drf_reply.next_page_url is not None:
drf_reply = DRFReply(self._request_url("get", drf_reply.next_page_url))
for item in drf_reply.results:
yield item
def update_task_processed_size(self, task_id, processed_size):
"""
Change the whole task content
"""
return self._request_path("PUT", f"tasks/{task_id}",
content={"size_processed": processed_size})
def change_task_status(self, task_id, status) -> None:
"""
Change the status of a task
"""
self._request_path("PUT", f"tasks/{task_id}", content={"new_status": status})