165 lines
5.4 KiB
Python
165 lines
5.4 KiB
Python
import requests
|
|
from requests.exceptions import HTTPError
|
|
import datetime
|
|
import iso8601
|
|
from pathlib import Path
|
|
import json
|
|
import stat
|
|
|
|
from . import endpoints
|
|
from . import exceptions
|
|
|
|
TIME_FOR_RENEW = 60
|
|
|
|
|
|
class Re2oAPIClient:
|
|
|
|
def __init__(self, hostname, username, password, token_file=None, use_tls=True):
|
|
self.use_tls = use_tls
|
|
self.token_file = token_file or Path.home() / '.re2o.token'
|
|
self.hostname = hostname
|
|
self._username = username
|
|
self._password = password
|
|
try:
|
|
self.token = self._get_token_from_file()
|
|
except Exception:
|
|
self.token = self._get_token_from_server()
|
|
|
|
def __del__(self):
|
|
try:
|
|
self._save_token_to_file()
|
|
except Exception:
|
|
pass
|
|
|
|
@property
|
|
def need_renew_token(self):
|
|
return self.token['expiration'] > \
|
|
datetime.datetime.now(datetime.timezone.utc) + \
|
|
datetime.timedelta(seconds=TIME_FOR_RENEW)
|
|
|
|
def _get_token_from_file(self):
|
|
if not self.token_file.is_file():
|
|
raise exceptions.TokenFileNotFound(str(self.token_file))
|
|
|
|
with self.token_file.open() as f:
|
|
data = json.load(f)
|
|
|
|
try:
|
|
token_data = data[self.hostname][self._username]
|
|
return {
|
|
'token': token_data['token'],
|
|
'expiration': iso8601.parse_date(token_data['expiration'])
|
|
}
|
|
except KeyError:
|
|
raise exceptions.TokenNotInTokenFile(
|
|
"{user}@{host} in {token_file}".format(user=self._username,
|
|
hostname=self.hostname,
|
|
token_file=str(self.token_file))
|
|
)
|
|
|
|
def _save_token_to_file(self):
|
|
try:
|
|
with self.token_file.open() as f:
|
|
data = json.load(f)
|
|
except Exception:
|
|
data = {}
|
|
|
|
if self.hostname not in data.keys():
|
|
data[self.hostname] = {}
|
|
data[self.hostname][self._username] = {
|
|
'token': self.token['token'],
|
|
'expiration': self.token['expiration'].isoformat()
|
|
}
|
|
|
|
with self.token_file.open('w') as f:
|
|
json.dump(data, f)
|
|
self.token_file.chmod(stat.S_IWRITE | stat.S_IREAD)
|
|
|
|
def _get_token_from_server(self):
|
|
response = requests.post(
|
|
self.get_url_for('token', forma=None),
|
|
data={'username': self._username, 'password': self._password}
|
|
)
|
|
if response.status_code == requests.codes.bad_request:
|
|
raise exceptions.InvalidCredentials()
|
|
response.raise_for_status()
|
|
response = response.json()
|
|
return {
|
|
'token': response['token'],
|
|
'expiration': iso8601.parse_date(response['expiration'])
|
|
}
|
|
|
|
def get_token(self):
|
|
if self.need_renew_token:
|
|
self.token = self._get_token_from_server()
|
|
return self.token['token']
|
|
|
|
def get(self, url, *args, **kwargs):
|
|
headers = kwargs.pop('headers', {})
|
|
headers.update({
|
|
'Authorization': 'Token {}'.format(self.get_token())
|
|
})
|
|
response = requests.get(url, headers=headers, *args, **kwargs)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def post(self, url, *args, **kwargs):
|
|
headers = kwargs.pop('headers', {})
|
|
headers.update({
|
|
'Authorization': 'Token {}'.format(self.get_token())
|
|
})
|
|
response = requests.post(url, headers=headers, *args, **kwargs)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_url_for(self, name, forma='json', **kwargs):
|
|
url = '{proto}://{host}{endpoint}'.format(
|
|
proto=('https' if self.use_tls else 'http'),
|
|
host=self.hostname,
|
|
endpoint=endpoints.get_endpoint_for(name, **kwargs)
|
|
)
|
|
if forma:
|
|
url = '{url}.{forma}/'.format(url=url[:-1], forma=forma)
|
|
return url
|
|
|
|
def __getattr__(self, item):
|
|
|
|
if item.startswith('list_'):
|
|
|
|
def f(max_results=None, params={}, **kwargs):
|
|
if not 'page_size' in params.keys():
|
|
params['page_size'] = 'all'
|
|
response = self.get(
|
|
self.get_url_for('%s-list' % item[len('list_'):], **kwargs),
|
|
params=params
|
|
)
|
|
results = response['results']
|
|
while response['next'] is not None and \
|
|
(max_results is None or len(results) < max_results):
|
|
response = self.get(response['next'])
|
|
results += response['results']
|
|
return results[:max_results] if max_results else results
|
|
return f
|
|
|
|
elif item.startswith('count_'):
|
|
|
|
def f(params={}, **kwargs):
|
|
if not 'page_size' in params.keys():
|
|
params['page_size'] = 1
|
|
return self.get(
|
|
self.get_url_for('%s-list' % item[len('count_'):], **kwargs),
|
|
params=params
|
|
)['count']
|
|
return f
|
|
|
|
elif item.startswith('view_'):
|
|
|
|
def f(params={}, **kwargs):
|
|
return self.get(
|
|
self.get_url_for('%s-details' % item[len('view_'):], **kwargs),
|
|
params=params
|
|
)
|
|
return f
|
|
|
|
else:
|
|
raise AttributeError(item)
|