Source code for qci_client.auth.client
- """Client for QCi's auth API."""
 - from copy import deepcopy
 - from datetime import datetime, timezone
 - import os
 - from typing import Optional
 - import requests
 - from requests.compat import urljoin
 - from qci_client.auth import types
 - from qci_client.utilities import raise_for_status
 - TOKEN_EXPIRATION_MARGIN: float = 10 * 60.0  
 
- [docs]
 - class AuthClient:
 -     """Used to authenticate to QCi applications."""
 -     def __init__(
 -         self,
 -         *,
 -         url: Optional[str] = None,
 -         api_token: Optional[str] = None,
 -         timeout: Optional[float] = None,
 -     ):
 -         """
 -         Handles authentication against QCi cloud APIs.
 -         :param url: url basepath to API endpoint, including scheme, if None, then falls
 -             back to QCI_API_URL environment variable
 -         :param api_token: refresh token for authenticating to API, if None, then falls
 -             back to QCI_TOKEN environment variable
 -         :param timeout: number of seconds before timing out requests, None waits
 -             indefinitely
 -         """
 -         if not url:
 -             self._url = os.getenv("QCI_API_URL", "")
 -         else:
 -             self._url = url
 -         if not self._url:
 -             raise ValueError(
 -                 "must specify url argument or QCI_API_URL environment variable"
 -             )
 -         if self._url[-1] != "/":
 -             self._url = self._url + "/"
 -         if not api_token:
 -             self._refresh_token = os.getenv("QCI_TOKEN", "")
 -         else:
 -             self._refresh_token = api_token
 -         if not self._refresh_token:
 -             raise AssertionError(
 -                 "must specify api_token argument or QCI_TOKEN environment variable"
 -             )
 -         self._timeout = timeout
 -         self._access_token_info: Optional[types.AccessTokensPostResponseBody] = None
 -     @property
 -     def url(self) -> str:
 -         """Return API URL."""
 -         return self._url
 -     @property
 -     def api_token(self) -> str:
 -         """Return API token."""
 -         return self._refresh_token
 -     @property
 -     def timeout(self) -> Optional[float]:
 -         """Return timeout setting."""
 -         return self._timeout
 -     @property
 -     def access_tokens_url(self) -> str:
 -         """URL used for obtaining access tokens."""
 -         return self.url + "auth/v1/access-tokens/"
 -     @property
 -     def access_token_info(self) -> types.AccessTokensPostResponseBody:
 -         """Return user's access token info, retrieving anew when absent or expired."""
 -         if self._access_token_info:
 -             
 -             
 -             
 -             expiration = datetime.strptime(
 -                 self._access_token_info["expires_at_rfc3339"], "%Y-%m-%dT%H:%M:%SZ"
 -             ).replace(tzinfo=timezone.utc)
 -             seconds_to_expiration = (
 -                 expiration - datetime.now(timezone.utc)
 -             ).total_seconds()
 -             
 -             if seconds_to_expiration < TOKEN_EXPIRATION_MARGIN:
 -                 self._access_token_info = None
 -         if not self._access_token_info:
 -             
 -             self._access_token_info = self.post_access_tokens()
 -         return deepcopy(self._access_token_info)
 -     @property
 -     def access_token(self) -> str:
 -         """Return user's access token, refreshing if expired or near expiration."""
 -         return self.access_token_info["access_token"]
 -     @property
 -     def expires_at_rfc3339(self) -> str:
 -         """Return expiration of user's access token."""
 -         return self.access_token_info["expires_at_rfc3339"]
 -     @property
 -     def token_type(self) -> str:
 -         """Return type of user's access token."""
 -         return self.access_token_info["token_type"]
 -     @property
 -     def organization_id(self) -> str:
 -         """Return user's organization ID."""
 -         return self.access_token_info["organization_id"]
 -     @property
 -     def user_id(self) -> str:
 -         """Return user's user ID."""
 -         return self.access_token_info["user_id"]
 -     @property
 -     def headers_without_authorization(self) -> dict:
 -         """
 -         HTTP headers without bearer token in Authorization header, but with
 -         Content-Type, Connection, and optional X-Request-Timeout-Nano headers.
 -         """
 -         headers = {
 -             "Content-Type": "application/json",
 -             
 -             "Connection": "close",
 -         }
 -         if self.timeout is not None:
 -             
 -             headers["X-Request-Timeout-Nano"] = str(int(10**9 * self.timeout))
 -         return headers
 -     @property
 -     def headers(self) -> dict:
 -         """HTTP headers with bearer token in Authorization header."""
 -         headers = self.headers_without_authorization
 -         headers["Authorization"] = f"Bearer {self.access_token}"
 -         return headers
 -     @property
 -     def headers_without_connection_close(self):
 -         """Headers with cached bearer token, but without connection closing."""
 -         headers = self.headers
 -         headers.pop("Connection", None)
 -         return headers
 
- [docs]
 -     def get_access_tokens_health(self) -> types.AccessTokensHealthGetResponseBody:
 -         """GET health."""
 -         response = requests.get(
 -             urljoin(self.access_tokens_url, "health"),
 -             headers=self.headers_without_authorization,
 -             timeout=self.timeout,
 -         )
 -         raise_for_status(response=response)
 -         return response.json()
 
- [docs]
 -     def get_access_tokens_version(self) -> types.AccessTokensVersionGetResponseBody:
 -         """GET version."""
 -         response = requests.get(
 -             urljoin(self.access_tokens_url, "version"),
 -             headers=self.headers_without_authorization,
 -             timeout=self.timeout,
 -         )
 -         raise_for_status(response=response)
 -         return response.json()
 
- [docs]
 -     def post_access_tokens(self) -> types.AccessTokensPostResponseBody:
 -         """
 -         Authorize user via refresh token used to retrieve finite-lived access_token.
 -         """
 -         json: types.AccessTokensPostRequestBody = {"refresh_token": self._refresh_token}
 -         response = requests.post(
 -             self.access_tokens_url,
 -             headers=self.headers_without_authorization,
 -             json=json,
 -             timeout=self.timeout,
 -         )
 -         raise_for_status(response=response)
 -         return response.json()