Source code for xcc.connection
"""
This module contains the :class:`~xcc.Connection` class.
"""
from __future__ import annotations
from itertools import chain
from typing import Dict, List, Optional, Union
import requests
from ._version import __version__
from .settings import Settings
[docs]class Connection:
"""Represents a connection to the Xanadu Cloud.
Args:
refresh_token (str, optional): JWT refresh token, such as a Xanadu Cloud
API key, that is used to fetch access tokens from the Xanadu Cloud
access_token (str, optional): JWT access token that is used to
authenticate requests to the Xanadu Cloud; missing, invalid, or
expired access tokens are replaced using the refresh token
host (str): hostname of the Xanadu Cloud server
port (int): port of the Xanadu Cloud server
tls (bool): whether to use HTTPS for the connection
headers (Dict[str, str], optional): HTTP request headers to override
Raises:
ValueError: if both the refresh token and access token are ``None``
**Example:**
The following example shows how to use the :class:`~xcc.Connection` class
to access the Xanadu Cloud. First, a connection is instantiated with a
Xanadu Cloud API key:
>>> import xcc
>>> connection = xcc.Connection(refresh_token="Xanadu Cloud API key goes here")
This connection can be tested using :meth:`~xcc.Connection.ping`. If there
is an issue with the connection, a :exc:`requests.models.HTTPError` will be
raised.
>>> connection.ping()
<Response [200]>
The Xanadu Cloud can now be directly accessed to, for example, retrieve
information about the X8_01 device:
>>> response = connection.request(method="GET", path="/devices/X8_01")
>>> response
<Response [200]>
>>> import json
>>> print(json.dumps(response.json(), indent=4))
{
"expected_uptime": {
"monday": [
"15:00:00+00:00",
"22:59:59+00:00"
],
"tuesday": [
"15:00:00+00:00",
"22:59:59+00:00"
],
"thursday": [
"15:00:00+00:00",
"22:59:59+00:00"
],
"wednesday": [
"15:00:00+00:00",
"22:59:59+00:00"
]
},
"created_at": "2021-01-27T15:15:25.801308Z",
"target": "X8_01",
"status": "online"
}
"""
[docs] @staticmethod
def load(settings: Optional[Settings] = None, **kwargs) -> Connection:
"""Loads a connection using the given :class:`xcc.Settings` instance,
or a new :class:`xcc.Settings` instance if one is not provided.
Args:
settings (Settings, optional): Xanadu Cloud connection settings
kwargs: keyword arguments to override in the call to :func:`xcc.Connection.__init__()`
Returns:
Connection: connection initialized from the configuration of the
provided (or created) :class:`xcc.Settings` instance
"""
settings = settings or Settings()
return Connection(
**{
"refresh_token": settings.REFRESH_TOKEN,
"access_token": settings.ACCESS_TOKEN,
"host": settings.HOST,
"port": settings.PORT,
"tls": settings.TLS,
**kwargs,
}
)
def __init__(
self,
refresh_token: Optional[str] = None,
access_token: Optional[str] = None,
host: str = "platform.xanadu.ai",
port: int = 443,
tls: bool = True,
headers: Optional[Dict[str, str]] = None,
) -> None:
if refresh_token is None and access_token is None:
raise ValueError(
"A refresh token (e.g., Xanadu Cloud API key) or an access "
"token must be provided to connect to the Xanadu Cloud."
)
self._access_token = access_token
self._refresh_token = refresh_token
self._tls = tls
self._host = host
self._port = port
self._headers = headers or {}
@property
def access_token(self) -> Optional[str]:
"""Returns the access token used to authenticate requests to the Xanadu Cloud."""
return self._access_token
@property
def refresh_token(self) -> Optional[str]:
"""Returns the refresh token used to fetch access tokens."""
return self._refresh_token
@property
def tls(self) -> bool:
"""Returns whether HTTPS is used for the connection to the Xanadu Cloud."""
return self._tls
@property
def scheme(self) -> str:
"""Returns the scheme of the URL used to send requests to the Xanadu Cloud."""
return "https" if self._tls else "http"
@property
def host(self) -> str:
"""Returns the host of the URL used to send requests to the Xanadu Cloud."""
return self._host
@property
def port(self) -> int:
"""Returns the port of the URL used to send requests to the Xanadu Cloud"""
return self._port
@property
def api_version(self) -> str:
"""Returns the "Accept-Version" header included in requests to the Xanadu Cloud."""
return self._headers.get("Accept-Version", "0.4.0")
@property
def user_agent(self) -> str:
"""Returns the "User-Agent" header included in requests to the Xanadu Cloud."""
return self._headers.get("User-Agent", f"XCC/{__version__} (API)")
@property
def headers(self) -> Dict[str, str]:
"""Returns the headers included in requests to the Xanadu Cloud."""
return {
"Accept-Version": self.api_version,
"Authorization": f"Bearer {self.access_token}",
"User-Agent": self.user_agent,
**self._headers,
}
def __repr__(self) -> str:
"""Returns a printable representation of a connection."""
return (
f"<{self.__class__.__name__}: "
f"refresh_token={self.refresh_token}, "
f"access_token={self.access_token}, "
f"url={self.url()}>"
)
[docs] def url(self, path: str = "") -> str:
"""Returns the URL to a Xanadu Cloud endpoint.
Args:
path (str): path component of the URL
Returns:
str: URL containing a scheme, host, port, the provided path
"""
return f"{self.scheme}://{self.host}:{self.port}/" + path.lstrip("/")
[docs] def ping(self) -> requests.Response:
"""Pings the Xanadu Cloud.
Returns:
requests.Response: HTTP response of the ping HTTP request
Raises:
requests.exceptions.RequestException: if there was an issue sending
the ping request to the Xanadu Cloud or the status code of the
HTTP response indicates that an error occurred (i.e., 4XX or 5XX)
"""
return self.request(method="GET", path="/healthz")
[docs] def request(
self, method: str, path: str, *, headers: Optional[Dict[str, str]] = None, **kwargs
) -> requests.Response:
"""Sends an HTTP request to the Xanadu Cloud.
Args:
method (str): HTTP request method
path (str): HTTP request path
headers (Mapping[str, str]): extra headers to pass to the request
**kwargs: optional arguments to pass to :func:`requests.request()`
Returns:
requests.Response: HTTP response to the HTTP request
Raises:
requests.exceptions.RequestException: if there was an issue sending
the HTTP request or the status code of the HTTP response
indicates that an error occurred (i.e., 4XX or 5XX)
.. note::
A second HTTP request will be made to the Xanadu Cloud if the HTTP
response to the first request has a 401 status code. The second
request will be identical to the first one except that a fresh
access token will be used.
"""
url = self.url(path)
if headers:
headers = {**self.headers, **headers}
else:
headers = self.headers
response = self._request(method=method, url=url, headers=headers, **kwargs)
if response.status_code == 401:
self.update_access_token()
response = self._request(method=method, url=url, headers=self.headers, **kwargs)
if kwargs.get("stream", False) is True:
# Avoid eagerly fetching the content for streaming requests.
response.raise_for_status()
return response
try:
body = response.json()
except Exception: # pylint: disable=broad-except
# Until https://github.com/psf/requests/pull/5856 is deployed, the
# requests package (2.26.0) can raise one of several different types
# of exceptions when parsing JSON (including a third-party type).
response.raise_for_status()
else:
# The details of a validation error are encoded in the "meta" field.
if response.status_code == 400 and body.get("code", "") == "validation-error":
meta: Dict[str, Union[List[str], Dict[str, List[str]]]] = body.get("meta", {})
if meta:
errors = []
for entry in meta.values():
if isinstance(entry, list):
errors.extend(entry)
else:
errors.extend(chain.from_iterable(entry.values()))
message = "; ".join(errors)
raise requests.exceptions.HTTPError(message, response=response)
# Otherwise, the details of the error may be encoded in the "detail" field.
if not response.ok and "detail" in body:
message = body["detail"]
raise requests.exceptions.HTTPError(message, response=response)
response.raise_for_status()
return response
[docs] def update_access_token(self) -> None:
"""Updates the access token of a connection using its refresh token.
Raises:
requests.exceptions.RequestException: if there was an issue sending
the HTTP request for the access token or the status code of the
HTTP response indicates that an error occurred (i.e., 4XX or 5XX)
"""
url = self.url("/auth/realms/platform/protocol/openid-connect/token")
data = {
"grant_type": "refresh_token",
"refresh_token": self._refresh_token,
"client_id": "public",
}
response = self._request(method="POST", url=url, data=data)
try:
body = response.json()
except Exception as exc: # pylint: disable=broad-except
# See Connection.request() for why exceptions are broadly caught.
response.raise_for_status()
# The following ValueError is only raised if the Xanadu Cloud
# authentication service is acting unexpectedly.
raise ValueError("Xanadu Cloud returned an invalid access token response.") from exc
else:
# It is worth investing in a helpful error message for invalid API
# keys since most users will likely encounter it at some point.
if response.status_code == 400 and body.get("error", "") == "invalid_grant":
raise requests.exceptions.HTTPError(
"Refresh token (e.g., Xanadu Cloud API key) is invalid", response=response
)
response.raise_for_status()
self._access_token = body.get("access_token")
def _request(self, method: str, url: str, **kwargs) -> requests.Response:
"""Sends an HTTP request.
Args:
method (str): HTTP request method
path (str): HTTP request path
**kwargs: optional arguments to pass to :func:`requests.request()`
Returns:
requests.Response: HTTP response to the HTTP request
Raises:
requests.exceptions.RequestException: if there was an issue sending
the HTTP request
.. note::
No validation is performed on the status code of the HTTP response.
.. warning::
The delay between when this function is called and when a timeout
exception is raised will be (slightly greater than) a multiple of
the ``timeout`` parameter passed to :func:`requests.request()`.
Specifically, if the timeout value is :math:`t` and there are
:math:`r` resource records listed for the hostname and port of the
Xanadu Cloud, then :math:`\\approx tr` seconds will elapse before a
timeout is detected.
"""
try:
timeout = kwargs.pop("timeout", 10)
return requests.request(method=method, url=url, timeout=timeout, **kwargs)
except requests.exceptions.Timeout as exc:
message = f"{method} request to '{url}' timed out"
raise requests.exceptions.RequestException(message) from exc
except requests.exceptions.ConnectionError as exc:
if "Name or service not known" in str(exc):
message = f"Failed to resolve hostname '{self.host}'"
raise requests.exceptions.RequestException(message) from exc
raise exc
_modules/xcc/connection
Download Python script
Download Notebook
View on GitHub