"""HTTP Services
This module implements services using HTTP(s) for communication
"""
# import datetime
import gzip
import io
import warnings
import zipfile
# from collections import OrderedDict
import magic
import pytz
import relatime
import requests
from tzlocal import get_localzone
try:
from requests.packages.urllib3 import exceptions # pylint: disable=ungrouped-imports
except ImportError: # pragma nocover
# Apparently, some linux distros strip the packages out of requests
# I'm not going to tell you what I think of that, just going to deal with it
from urllib3 import exceptions
from . import __version__, WebService
[docs]class HttpService(WebService): # pylint: disable=abstract-method
"""A simple service based on HTTP requests. This class should not be used directly"""
_session = None
user_agent = "python-libweb/{0}".format(__version__)
@property
def session(self):
"""Return a requests Session object which sets a User-Agent header"""
if self._session is None:
self._session = requests.Session()
self._session.headers.update({"User-Agent": self.user_agent})
# if self.proxies:
# self._session.proxies = self.proxies
return self._session
[docs] def unzip_content(self, request, *args, **kwargs): # pylint: disable=unused-argument
"""Automatically detect and decompress zip or gzip content
Override this to provide support for additional compressed content types
"""
content = request.content
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as magick:
mime = magick.id_buffer(content)
if mime == "application/zip":
self.logger.debug("Unzipping 'application/zip'")
zip_buffer = io.BytesIO(content)
with zipfile.ZipFile(zip_buffer) as zfo:
fname = zfo.namelist()[0]
with zfo.open(fname) as fobj:
request.orig_content = fobj.read()
elif mime == "application/x-gzip":
self.logger.debug("Unzipping 'application/x-gzip'")
gz_buffer = io.BytesIO(content)
with gzip.GzipFile(fileobj=gz_buffer) as gzo:
request.orig_content = gzo.read()
else:
request.orig_content = content
return request
[docs] def process_params(self, orig_params): # pylint: disable=no-self-use
"""Process parameters into usable pieces.
Override this if you provide any config parameters that may require
interpreation, such as the relatime parameter
"""
params = orig_params.copy()
for (key, value) in params.items():
if hasattr(value, "items"):
conf = params.pop(key)
if "relatime" in conf:
local_tz = get_localzone()
dto = relatime.timeParser(conf["relatime"], timezone=str(local_tz))
target_tz = pytz.timezone(conf.get("timezone", "UTC"))
dto = dto.astimezone(target_tz)
dto = dto.replace(tzinfo=None)
time_format = conf.get("format", "%Y-%m-%dT%H:%M:%S.%fZ")
if time_format.lower() == "as_epoch":
try:
timestamp = dto.timestamp()
except AttributeError:
import calendar
timestamp = calendar.timegm(dto.timetuple())
params[key] = str(int(timestamp))
else:
params[key] = dto.strftime(time_format)
return params
[docs] def get_auth(self, auth):
"""Find and apply authentication
Override this if you need to support additional styles of authentication
"""
kwargs = dict()
if auth and self.creds:
if not hasattr(auth, "items"):
creds = self.creds.get(auth)
kwargs["auth"] = tuple(creds)
else:
creds = self.creds.get(auth.get("name"))
if not isinstance(creds, list):
creds = [creds]
if "headers" in auth:
kwargs["headers"] = kwargs.get("headers", {})
for (key, value) in zip(auth.get("headers"), creds):
kwargs["headers"][key] = value
elif "params" in auth:
kwargs["params"] = kwargs.get("params", {})
for (key, value) in zip(auth.get("params"), creds):
kwargs["params"][key] = value
elif "postdata" in auth:
kwargs["data"] = kwargs.get("data", {})
for (key, value) in zip(auth.get("postdata"), creds):
kwargs["data"][key] = value
return kwargs
[docs] def build_request(self, url, method="GET", **kwargs): # pylint: disable=no-self-use
"""Apply request hooks to automatically transform request content
Override this if you need to customze the Request object generated.
"""
hooks = kwargs.pop("hooks", [])
request = requests.Request(method, url, **kwargs)
for (event, hook) in hooks:
request.register_hook(event, hook)
return request
[docs] def prepare_request(self, request):
"""Applies session state to the request"""
return self.session.prepare_request(request)
[docs] def send_request(self, request, verify_ssl=True):
"""Suppress SSL if necessary and send the request"""
with warnings.catch_warnings():
if not verify_ssl: # pragma nocover
warnings.simplefilter("ignore", exceptions.InsecureRequestWarning)
return self.session.send(request, verify=verify_ssl)
def _req(self, url, **conf):
"""Helper function for assembling and submitting requests"""
kwargs = {
"headers": {},
"data": {},
"params": {},
"hooks": [],
"method": conf.get("method", "get").upper(),
}
for key in kwargs:
func = getattr(self, "process_{}".format(key), None)
kwargs[key] = conf.get(key, {})
if callable(func):
kwargs[key] = func(kwargs[key]) # pylint: disable=not-callable
for (key, value) in self.get_auth(conf.get("auth", {})).items():
if hasattr(value, "items"):
# if key not in kwargs:
# kwargs[key] = dict()
kwargs[key].update(value)
else:
kwargs[key] = value
kwargs = {key: value for (key, value) in kwargs.items() if value}
kwargs["hooks"] = list()
if conf.get("decompress", False):
kwargs["hooks"].append(("response", self.unzip_content))
raw_request = self.build_request(url, **kwargs)
request = self.prepare_request(raw_request)
return self.send_request(request, verify_ssl=conf.get("verify_ssl", True))
[docs] def make_requests(self):
"""Iterate over configuration for multiple requests"""
query = self.conf
url_list = query.pop("url")
if not isinstance(url_list, list):
url_list = [url_list]
for url in url_list:
url = url.format(**self.opts)
for key in ("params", "data", "headers"): # pragma nocover
if key in query:
settings = query.pop(key)
for (setting, value) in settings.items():
settings.update({setting: value.format(**self.opts)})
query[key] = settings
request = self._req(url, **query)
ignored_status_codes = [int(sc) for sc in query.get("ignored_status_codes", [])]
if request.status_code not in ignored_status_codes:
request.raise_for_status()
yield request