re2oapi/re2oapi/api.py
2018-05-24 12:37:05 +00:00

100 lines
3.4 KiB
Python

import requests
from requests.exceptions import HTTPError
import datetime
from . import endpoints
from . import exceptions
TIME_FOR_RENEW = 60
class Re2oAPI:
def __init__(self, hostname, username, password, use_tls=True):
self.use_tls = use_tls
self.hostname = hostname
self._username = username
self._password = password
self.token = self._obtain_token()
def _need_renew_token(self):
return self.token['expiration'] > datetime.datetime.utcnow() + \
datetime.timedelta(seconds=TIME_FOR_RENEW)
def _obtain_token(self):
response = requests.post(
self.get_url_for('token', forma=None),
data={'username': self._username, 'password': self._password}
)
if response.status_code == requests.codes.bad_request:
raise exceptions.InvalidCredentials()
response.raise_for_status()
response = response.json()
return {
'token': response['token'],
'expiration': datetime.datetime.strptime(
response['expiration'],
'%Y-%m-%dT%H:%M:%S.%fZ'
)
}
def get_token(self):
if self._need_renew_token():
self.token = self._obtain_token()
return self.token['token']
def get(self, url, *args, **kwargs):
headers = kwargs.pop('headers', {})
headers.update({
'Authorization': 'Token {}'.format(self.get_token())
})
response = requests.get(url, headers=headers, *args, **kwargs)
response.raise_for_status()
return response.json()
def post(self, url, *args, **kwargs):
headers = kwargs.pop('headers', {})
headers.update({
'Authorization': 'Token {}'.format(self.get_token())
})
response = requests.post(url, headers=headers, *args, **kwargs)
response.raise_for_status()
return response.json()
def get_url_for(self, name, forma='json', **kwargs):
url = '{proto}://{host}{endpoint}'.format(
proto=('https' if self.use_tls else 'http'),
host=self.hostname,
endpoint=endpoints.get_endpoint_for(name, **kwargs)
)
if forma:
url = '{url}.{forma}/'.format(url=url[:-1], forma=forma)
return url
def __getattr__(self, item):
if item.startswith('list_'):
def f(max_results=None, **kwargs):
response = self.get(
self.get_url_for('%s-list' % item[len('list_'):], **kwargs),
)
results = response['results']
while response['next'] is not None and \
(max_results is None or len(results) < max_results):
response = self.get(response['next'])
results += response['results']
return results[:max_results] if max_results else results
return f
elif item.startswith('count_'):
def f(**kwargs):
return self.get(
self.get_url_for('%s-list' % item[len('count_'):], **kwargs),
)['count']
return f
elif item.startswith('view_'):
def f(**kwargs):
return self.get(
self.get_url_for('%s-details' % item[len('view_'):], **kwargs),
)
return f
else:
raise AttributeError(item)