Add comments

This commit is contained in:
Maël Kervella 2018-06-23 14:06:10 +00:00
parent d37d47430e
commit baa6de4f27

View file

@ -1,10 +1,10 @@
import requests
from requests.exceptions import HTTPError
import datetime import datetime
import iso8601 import iso8601
from pathlib import Path from pathlib import Path
import json
import stat import stat
import json
import requests
from requests.exceptions import HTTPError
from . import endpoints from . import endpoints
from . import exceptions from . import exceptions
@ -16,31 +16,44 @@ DEFAULT_TOKEN_FILENAME = '.re2o.token'
class Re2oAPIClient: class Re2oAPIClient:
""" """Wrapper to handle the requests to Re2o API in a seemless way.
Object to handle the requests to Re2o API in a seemless way.
You must first initialize the client and then you can access You must first initialize the client and then you can access
the API with dedicated functions that handle the authentication the API with dedicated functions that handle the authentication
process process for you.
""" """
def __init__(self, hostname, username, password, token_file=None, use_tls=True): def __init__(self, hostname, username, password, token_file=None, use_tls=True):
""" """Creates an API client.
Create an API client.
Args:
hostname: The hostname of the Re2o server to use.
username: The username to use.
password: The password to use.
token_file: An optional path to the file where re2o tokens are
stored. Used both for retrieving the token and saving it, so
the file must be accessible for reading and writing. The
default value is `None`, which indicated to use
`$HOME/{DEFAULT_TOKEN_FILENAME}`.
use_tls: A boolean to indicate whether the client should us TLS
(recommended for production). The Default is `True`.
Raises:
requests.exceptions.ConnectionError: Unable to resolve the
provided hostname.
requests.exceptions.HTTPError: The server used does not have a
valid Re2o API.
re2oapi.exceptions.InvalidCredentials: The credentials provided
are not valid according to the Re2o server.
""".format(DEFAULT_TOKEN_FILENAME=DEFAULT_TOKEN_FILENAME)
:param hostname: (str) The hostname of the Re2o server to use
:param username: (str) The username to use
:param password: (str) The password to use
:param token_file: (str) A path to a file where re2o tokens are stored
If `None`, then `$HOME/DEFAULT_TOKEN_FILENAME` is used.
(Default: None)
:param use_tls: (bool) Should the client used TLS (recommended for
production). (Default: True)
"""
self.use_tls = use_tls self.use_tls = use_tls
self.token_file = token_file or Path.home() / DEFAULT_TOKEN_FILENAME self.token_file = token_file or Path.home() / DEFAULT_TOKEN_FILENAME
self.hostname = hostname self.hostname = hostname
self._username = username self._username = username
self._password = password self._password = password
# Try to fetch token from token file else get a new one from the
# server
try: try:
self.token = self._get_token_from_file() self.token = self._get_token_from_file()
except exceptions.APIClientGenericError: except exceptions.APIClientGenericError:
@ -49,22 +62,28 @@ class Re2oAPIClient:
@property @property
def need_renew_token(self): def need_renew_token(self):
""" """The token needs to be renewed.
True is the token expiration time is within less than `TIME_FOR_RENEW`
seconds Returns:
""" True is the token expiration time is within less than
{TIME_FOR_RENEW} seconds.
""".format(TIME_FOR_RENEW=TIME_FOR_RENEW)
return self.token['expiration'] < \ return self.token['expiration'] < \
datetime.datetime.now(datetime.timezone.utc) + \ datetime.datetime.now(datetime.timezone.utc) + \
datetime.timedelta(seconds=TIME_FOR_RENEW) datetime.timedelta(seconds=TIME_FOR_RENEW)
def _get_token_from_file(self): def _get_token_from_file(self):
# Check the token file exists
if not self.token_file.is_file(): if not self.token_file.is_file():
raise exceptions.TokenFileNotFound(self.token_file) raise exceptions.TokenFileNotFound(self.token_file)
# Read the data in the file
with self.token_file.open() as f: with self.token_file.open() as f:
data = json.load(f) data = json.load(f)
try: try:
# Retrieve data for this hostname and this username in the file
token_data = data[self.hostname][self._username] token_data = data[self.hostname][self._username]
return { return {
'token': token_data['token'], 'token': token_data['token'],
@ -78,12 +97,16 @@ class Re2oAPIClient:
) )
def _save_token_to_file(self): def _save_token_to_file(self):
try: try:
# Read previous data to not erase other tokens
with self.token_file.open() as f: with self.token_file.open() as f:
data = json.load(f) data = json.load(f)
except Exception: except Exception:
# Default value if file not readbale or data not valid JSON
data = {} data = {}
# Insert the new token in the data (replace old token if it exists)
if self.hostname not in data.keys(): if self.hostname not in data.keys():
data[self.hostname] = {} data[self.hostname] = {}
data[self.hostname][self._username] = { data[self.hostname][self._username] = {
@ -91,6 +114,8 @@ class Re2oAPIClient:
'expiration': self.token['expiration'].isoformat() 'expiration': self.token['expiration'].isoformat()
} }
# Rewrite the token file and ensure only the user can read it
# Fails silently if the file cannot be written.
try: try:
with self.token_file.open('w') as f: with self.token_file.open('w') as f:
json.dump(data, f) json.dump(data, f)
@ -99,6 +124,8 @@ class Re2oAPIClient:
pass pass
def _get_token_from_server(self): def _get_token_from_server(self):
# Perform the authentication request
response = requests.post( response = requests.post(
self.get_url_for('token'), self.get_url_for('token'),
data={'username': self._username, 'password': self._password} data={'username': self._username, 'password': self._password}
@ -106,6 +133,8 @@ class Re2oAPIClient:
if response.status_code == requests.codes.bad_request: if response.status_code == requests.codes.bad_request:
raise exceptions.InvalidCredentials(self._username, self.hostname) raise exceptions.InvalidCredentials(self._username, self.hostname)
response.raise_for_status() response.raise_for_status()
# Return the token and expiration time
response = response.json() response = response.json()
return { return {
'token': response['token'], 'token': response['token'],
@ -113,8 +142,15 @@ class Re2oAPIClient:
} }
def get_token(self): def get_token(self):
""" """Retrieves the token to use for the current connection.
Returns the token for the current connection
Returns:
The token to use in the request as an authentication. It is
automatically renewed if needed.
Raises:
re2oapi.exceptions.InvalidCredentials: The token needs to be
renewed but the given credentials are not valid.
""" """
if self.need_renew_token: if self.need_renew_token:
self.token = self._get_token_from_server() self.token = self._get_token_from_server()
@ -122,85 +158,199 @@ class Re2oAPIClient:
return self.token['token'] return self.token['token']
def _request(self, method, url, headers={}, params={}, *args, **kwargs): def _request(self, method, url, headers={}, params={}, *args, **kwargs):
# Update headers to force the 'Authorization' field with the right token
headers.update({ headers.update({
'Authorization': 'Token {}'.format(self.get_token()) 'Authorization': 'Token {}'.format(self.get_token())
}) })
# Use a json format unless the user already specified something
if not 'format' in params.keys(): if not 'format' in params.keys():
params.update({'format': 'json'}) params.update({'format': 'json'})
# Perform the request
response = getattr(requests, method)( response = getattr(requests, method)(
url, headers=headers, params=params, *args, **kwargs url, headers=headers, params=params, *args, **kwargs
) )
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def delete(self, *args, **kwargs): def delete(self, *args, **kwargs):
""" """Performs a DELETE request.
DELETE requests on a given URL that acts like `requests.delete` except
that authentication to the API is automatically done and JSON response
is decoded
:param url: (str) URL of the requests DELETE request on a given URL that acts like `requests.delete` except
:param: See `requests.post` params that authentication to the API is automatically done and JSON response
:returns: (dict) The JSON-decoded result of the request is decoded.
Args:
url: The URL of the requests.
*args: See `requests.delete` parameters.
**kwargs: See `requests.delete` parameters.
Returns:
The JSON-decoded result of the request.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
""" """
return self._request('delete', *args, **kwargs) return self._request('delete', *args, **kwargs)
def get(self, *args, **kwargs): def get(self, *args, **kwargs):
""" """Performs a GET request.
GET requests on a given URL that acts like `requests.get` except that
authentication to the API is automatically done and JSON response is
decoded
:param url: (str) URL of the requests GET request on a given URL that acts like `requests.get` except that
:param: See `requests.get` params authentication to the API is automatically done and JSON response is
:returns: (dict) The JSON-decoded result of the request decoded.
Args:
url: The URL of the requests.
*args: See `requests.get` parameters.
**kwargs: See `requests.get` parameters.
Returns:
The JSON-decoded result of the request.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
"""
return self._request('get', *args, **kwargs)
def head(self, *args, **kwargs):
"""Performs a HEAD request.
HEAD request on a given URL that acts like `requests.head` except that
authentication to the API is automatically done and JSON response is
decoded.
Args:
url: The URL of the requests.
*args: See `requests.head` parameters.
**kwargs: See `requests.head` parameters.
Returns:
The JSON-decoded result of the request.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
"""
return self._request('get', *args, **kwargs)
def option(self, *args, **kwargs):
"""Performs a OPTION request.
OPTION request on a given URL that acts like `requests.option` except
that authentication to the API is automatically done and JSON response
is decoded.
Args:
url: The URL of the requests.
*args: See `requests.option` parameters.
**kwargs: See `requests.option` parameters.
Returns:
The JSON-decoded result of the request.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
""" """
return self._request('get', *args, **kwargs) return self._request('get', *args, **kwargs)
def patch(self, *args, **kwargs): def patch(self, *args, **kwargs):
""" """Performs a PATCH request.
PATCH requests on a given URL that acts like `requests.patch` except
that authentication to the API is automatically done and JSON response
is decoded
:param url: (str) URL of the requests PATCH request on a given URL that acts like `requests.patch` except
:param: See `requests.post` params that authentication to the API is automatically done and JSON response
:returns: (dict) The JSON-decoded result of the request is decoded.
Args:
url: The URL of the requests.
*args: See `requests.patch` parameters.
**kwargs: See `requests.patch` parameters.
Returns:
The JSON-decoded result of the request.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
""" """
return self._request('patch', *args, **kwargs) return self._request('patch', *args, **kwargs)
def post(self, *args, **kwargs): def post(self, *args, **kwargs):
""" """Performs a POST request.
POST requests on a given URL that acts like `requests.post` except
that authentication to the API is automatically done and JSON response
is decoded
:param url: (str) URL of the requests POST request on a given URL that acts like `requests.post` except that
:param: See `requests.post` params authentication to the API is automatically done and JSON response is
:returns: (dict) The JSON-decoded result of the request decoded.
Args:
url: The URL of the requests.
*args: See `requests.post` parameters.
**kwargs: See `requests.post` parameters.
Returns:
The JSON-decoded result of the request.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
""" """
return self._request('post', *args, **kwargs) return self._request('post', *args, **kwargs)
def put(self, *args, **kwargs): def put(self, *args, **kwargs):
""" """Performs a PUT request.
PUT requests on a given URL that acts like `requests.put` except
that authentication to the API is automatically done and JSON response
is decoded
:param url: (str) URL of the requests PUT request on a given URL that acts like `requests.put` except that
:param: See `requests.post` params authentication to the API is automatically done and JSON response is
:returns: (dict) The JSON-decoded result of the request decoded.
Args:
url: The URL of the requests.
*args: See `requests.put` parameters.
**kwargs: See `requests.put` parameters.
Returns:
The JSON-decoded result of the request.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
""" """
return self._request('put', *args, **kwargs) return self._request('put', *args, **kwargs)
def get_url_for(self, name, **kwargs): def get_url_for(self, name, **kwargs):
""" """Retrieve the complete URL to use for a given endpoint's name.
Retrieve the complete URL to use for a given endpoint's name
:param name: The name of the endpoint to look for. Args:
`re2oapi.exception.NameNotExists` is raised of the name does not name: The name of the endpoint to look for.
correspond to any endpoint **kwargs: A dictionnary with the parameter to use to build the
:param kwargs: A dict with the parameters to use to format the URL URL (using .format() syntax)
Returns:
The full URL to use.
Raises:
re2oapi.exception.NameNotExists: The provided name does not
correspond to any endpoint.
""" """
return '{proto}://{host}{endpoint}'.format( return '{proto}://{host}{endpoint}'.format(
proto=('https' if self.use_tls else 'http'), proto=('https' if self.use_tls else 'http'),
@ -209,28 +359,92 @@ class Re2oAPIClient:
) )
def _list_for(self, obj_name, max_results=None, params={}, **kwargs): def _list_for(self, obj_name, max_results=None, params={}, **kwargs):
"""List all '{obj_name}' objects on the server.
Args:
params: See `requests.get` params.
**kwargs: A dictionnary used to defines the required parameters
in order to build the URL (using `.format()`).
Returns:
The list of all the {obj_name} objects serialized
as returned by the API.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
"""
# For optimization, list all results in one page unless the user
# is forcing the use of a different `page_size`.
if not 'page_size' in params.keys(): if not 'page_size' in params.keys():
params['page_size'] = 'all' params['page_size'] = 'all'
# Performs the request for the first page
response = self.get( response = self.get(
self.get_url_for('%s-list' % obj_name, **kwargs), self.get_url_for('%s-list' % obj_name, **kwargs),
params=params params=params
) )
results = response['results'] results = response['results']
# Get all next pages and append the results
while response['next'] is not None and \ while response['next'] is not None and \
(max_results is None or len(results) < max_results): (max_results is None or len(results) < max_results):
response = self.get(response['next']) response = self.get(response['next'])
results += response['results'] results += response['results']
# Returns the exact number of results if applicable
return results[:max_results] if max_results else results return results[:max_results] if max_results else results
def _count_for(self, obj_name, params={}, **kwargs): def _count_for(self, obj_name, params={}, **kwargs):
"""Count all '{obj_name}' objects on the server:
Args:
params: See `requests.get` params.
**kwargs: A dictionnary used to defines the required parameters
in order to build the URL (using `.format()`).
Returns:
The number of {obj_name} objects on the server as returned by the
API.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
"""
# For optimization, ask fo only 1 result (so the server will take
# less time to process the request) unless the user is forcing the
# use of a different `page_size`.
if not 'page_size' in params.keys(): if not 'page_size' in params.keys():
params['page_size'] = 1 params['page_size'] = 1
# Performs the request and return the `count` value in the response.
return self.get( return self.get(
self.get_url_for('%s-list' % obj_name, **kwargs), self.get_url_for('%s-list' % obj_name, **kwargs),
params=params params=params
)['count'] )['count']
def _view_for(self, obj_name, params={}, **kwargs): def _view_for(self, obj_name, params={}, **kwargs):
"""Retrieved the details of a '{obj_name}' object from the server.
Args:
params: See `requests.get` params.
**kwargs: A dictionnary used to defines the required parameters
in order to build the URL (using `.format()`).
Returns:
The serialized data of the queried {obj_name} object as returned
by the API.
Raises:
requests.exceptions.RequestException: An error occured while
performing the request.
exceptions.PermissionDenied: The user does not have the right
to perform this request.
"""
return self.get( return self.get(
self.get_url_for('%s-detail' % obj_name, **kwargs), self.get_url_for('%s-detail' % obj_name, **kwargs),
params=params params=params
@ -239,25 +453,28 @@ class Re2oAPIClient:
def __getattr__(self, item): def __getattr__(self, item):
if item.startswith('list_'): if item.startswith('list_'):
return lambda *args, **kwargs: self._list_for( obj_name = item[len('list_'):]
item[len('list_'):], def f(*args, **kwargs):
*args, return self._list_for(obj_name, *args, **kwargs)
**kwargs f.__doc__ = self._list_for.__doc__.format(obj_name=obj_name)
) f.__name__ = "list_"+obj_name
return f
elif item.startswith('count_'): elif item.startswith('count_'):
return lambda *args, **kwargs: self._count_for( obj_name = item[len('count_'):]
item[len('count_'):], def f(*args, **kwargs):
*args, return self._count_for(obj_name, *args, **kwargs)
**kwargs f.__doc__ = self._count_for.__doc__.format(obj_name=obj_name)
) f.__name__ = "count_"+obj_name
return f
elif item.startswith('view_'): elif item.startswith('view_'):
return lambda *args, **kwargs: self._view_for( obj_name = item[len('view_'):]
item[len('view_'):], def f(*args, **kwargs):
*args, return self._view_for(obj_name, *args, **kwargs)
**kwargs f.__doc__ = self._view_for.__doc__.format(obj_name=obj_name)
) f.__name__ = "view_"+obj_name
return f
else: else:
raise AttributeError(item) raise AttributeError(item)