Source code for xcc.job

"""
This module contains the :class:`~xcc.Job` class.
"""

from __future__ import annotations

import io
import time
from datetime import datetime, timedelta
from itertools import count, takewhile
from typing import Any, Collection, List, Mapping, Optional, Sequence, Union

import dateutil.parser
import numpy as np

from .connection import Connection
from .util import cached_property


[docs]class Job: """Represents a job on the Xanadu Cloud. Args: id_ (str): ID of the job connection (Connection): connection to the Xanadu Cloud .. note:: For performance reasons, the properties of a job are lazily fetched and stored in a cache. This cache can be cleared at any time by calling :meth:`Job.clear`. .. warning:: The :class:`xcc.Job` class transparently contacts the Xanadu Cloud when an uncached job property is accessed. This means that requesting a job property for the first time may take longer than expected. **Example:** The following example shows how to use the :class:`Job` class to submit and track a job for a simulator on the Xanadu Cloud. First, a connection is established to the Xanadu Cloud: >>> import xcc >>> connection = xcc.Connection.load() Next, the parameters of the desired job are prepared. At present, this includes the name, target, circuit, and language of the job: >>> import inspect >>> name = "example" >>> target = "simulon_gaussian" >>> circuit = inspect.cleandoc( f\"\"\" name {name} version 1.0 target {target} (shots=3) MeasureFock() | [0, 1, 2, 3] \"\"\" ) >>> language = "blackbird:1.0" The job is then submitted to the Xanadu Cloud using the :func:`Job.submit` function: >>> job = xcc.Job.submit( connection, name=name, target=target, circuit=circuit, language=language, ) At this point, the Xanadu Cloud has received the job; however, it may take some time before the result of the job is available. To wait until the job is finished, a blocking call is made to :meth:`Job.wait`: >>> job.wait() Finally, the status, result, running time, etc. of the job are retrieved by accessing the corresponding properties of the job: >>> job.status 'complete' >>> job.result {'output': [array([[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]])]} >>> job.running_time datetime.timedelta(microseconds=123456) """
[docs] @staticmethod def list( connection: Connection, limit: int = 5, ids: Optional[Collection[str]] = None, status: Optional[str] = None, ) -> Sequence[Job]: """Returns jobs submitted to the Xanadu Cloud. Args: connection (Connection): connection to the Xanadu Cloud limit (int): maximum number of jobs to retrieve ids (Collection[str], optional): IDs of the jobs to retrieve; if at least one ID is specified, ``limit`` will be set to the length of the ID collection status (str, optional): filter jobs by the given status (if not ``None``) Returns: Sequence[Job]: jobs which were submitted on the Xanadu Cloud by the user associated with the Xanadu Cloud connection """ size = len(ids) if ids else limit params = {"size": size, "id": ids, "status": status} response = connection.request("GET", "/jobs", params=params) jobs = [] for details in response.json()["data"]: job = Job(details["id"], connection=connection) job._details = details # pylint: disable=protected-access jobs.append(job) return jobs
[docs] @staticmethod def submit( connection: Connection, name: Optional[str], target: str, circuit: str, language: str, ) -> Job: """Submits a job to the Xanadu Cloud. Args: connection (Connection): connection to the Xanadu Cloud name (str, optional): name of the job target (str): target of the job circuit (str): circuit of the job language (str): language of the job Returns: Job: job submitted to the Xanadu Cloud """ payload = { "name": name, "target": target, "circuit": circuit, "language": language, } response = connection.request("POST", "/jobs", json=payload) details = response.json() job = Job(details["id"], connection=connection) # pylint: disable=protected-access job._details = details job._circuit = {"circuit": circuit} return job
def __init__(self, id_: str, connection: Connection) -> None: self._id = id_ self._connection = connection @property def id(self) -> str: # pylint: disable=invalid-name """Returns the ID of a job.""" return self._id @property def overview(self) -> Mapping[str, Any]: """Returns an overview of a job. Returns: Mapping[str, Any]: mapping from field names to values for this job as determined by the needs of a Xanadu Cloud user. """ return { "id": self.id, "name": self.name, "status": self.status, "target": self.target, "language": self.language, "created_at": self.created_at, "finished_at": self.finished_at, "running_time": self.running_time, "metadata": self.metadata, } @cached_property def result(self) -> Mapping[str, Union[np.ndarray, List[np.ndarray]]]: """Returns the result of a job. Returns: Mapping[str, Union[np.ndarray, List[np.ndarray]]]: The result of this job. .. seealso:: Implemented in terms of :meth:`Job.get_result`. .. note:: NumPy integers will be converted to ``np.int64`` objects to facilitate safer post-processing. """ return self.get_result(integer_overflow_protection=True) @property def created_at(self) -> datetime: """Returns when a job was created. Returns: datetime: datetime when this job was created """ return dateutil.parser.isoparse(self._details["created_at"]) @property def started_at(self) -> Optional[datetime]: """Returns when a job started. Returns: datetime, optional: datetime when this job started """ datetime_ = self._details["started_at"] return None if datetime_ is None else dateutil.parser.isoparse(datetime_) @property def finished_at(self) -> Optional[datetime]: """Returns when a job finished. Returns: datetime, optional: datetime when this job finished """ datetime_ = self._details["finished_at"] return None if datetime_ is None else dateutil.parser.isoparse(datetime_) @property def running_time(self) -> Optional[timedelta]: """Returns the running time of a job. Returns: timedelta, optional: running time of this job """ seconds = self._details["running_time"] return None if seconds is None else timedelta(seconds=seconds) @property def circuit(self) -> str: """Returns the circuit of a job. Returns: str: circuit of this job """ return self._circuit["circuit"] @property def language(self) -> str: """Returns the language of a job. Returns: str: language of this job """ return self._details["language"] @property def name(self) -> Optional[str]: """Returns the name of a job. Returns: str, optional: name of this job """ return self._details["name"] @property def target(self) -> str: """Returns the target of a job. Returns: str: target of this job """ return self._details["target"] @property def status(self) -> str: """Returns the current status of a job. Returns: str: status of this job ("open", "queued", "cancelled", "failed", "cancel_pending", or "complete") """ return self._details["status"] @property def finished(self) -> bool: """Returns whether this job has finished. Returns: bool: ``True`` iff the job status is "cancelled", "complete", or "failed" """ return self.status in ("cancelled", "complete", "failed") @property def metadata(self) -> Mapping[str, Any]: """Returns the metadata of a job. Returns: Mapping[str, Any]: metadata of this job .. note:: Error details for failed jobs are reported here. """ return self._details["meta"] @cached_property def _details(self) -> Mapping[str, Any]: """Returns the details of a job. Returns: Mapping[str, Any]: mapping from field names to values for this job as determined by the Xanadu Cloud job endpoint. .. note:: These fields are not intended to be directly accessed by external callers. Instead, they should be individually retrieved through their associated public properties. """ return self._connection.request("GET", f"/jobs/{self.id}").json() @cached_property def _circuit(self) -> Mapping[str, str]: """Returns the circuit of a job. Returns: Mapping[str, str]: mapping with a "circuit" field .. note:: The circuit should be retrieved through the :attr:`Device.circuit` property. """ return self._connection.request("GET", f"/jobs/{self.id}/circuit").json() def __repr__(self) -> str: """Returns a printable representation of a job.""" return f"<{self.__class__.__name__}: id={self.id}>"
[docs] def get_result( self, integer_overflow_protection: bool = True, ) -> Mapping[str, Union[np.ndarray, List[np.ndarray]]]: """Returns the result of a job. Args: integer_overflow_protection (bool): convert all NumPy integers into 64-bit NumPy integers during post-processing; setting this option to ``False`` can significantly reduce memory consumption at the risk of introducing bugs during e.g. cubing operations Returns: Mapping[str, Union[np.ndarray, List[np.ndarray]]]: The result of this job. Each job result has an "output" key associated with a list of NumPy arrays representing the output of the job; all other keys represent metadata related to the interpretation of the job output. Raises: TypeError: if the job result is not stored in the .npz file format .. warning:: The value returned by this method is not cached. """ # Streaming the response in chunks is necessary to fetch large results. response = self._connection.request("GET", f"/jobs/{self.id}/result", stream=True) with io.BytesIO() as buffer: # The chunk size below (i.e., 2^15 bytes) is less than the maximum # size of a TCP packet (i.e., 2^16 bytes) but is otherwise arbitrary. for chunk in response.iter_content(chunk_size=32768): buffer.write(chunk) # Seeking to 0 prepares the buffer for reading. buffer.seek(0) result = np.load(buffer, allow_pickle=False) if not isinstance(result, np.lib.npyio.NpzFile): raise TypeError("Job result is not an .npz file") result = dict(result) if integer_overflow_protection: for key, array in result.items(): if np.issubdtype(array.dtype, np.integer): result[key] = array.astype(np.int64) # For convenience, move all positional arguments (i.e., those that have # an "arr_" prefix) to a separate key named "output". result["output"] = [] for i in takewhile(lambda i: f"arr_{i}" in result, count()): array = result.pop(f"arr_{i}") result["output"].append(array) return result
[docs] def cancel(self) -> None: """Cancels a job.""" if not self.finished: self._connection.request("PATCH", f"/jobs/{self.id}", json={"status": "cancelled"}) self.clear()
[docs] def clear(self) -> None: """Clears the details, circuit, and result caches of a job.""" del self._details del self._circuit del self.result
[docs] def wait(self, delay: float = 1) -> None: """Waits for a job to finish. Args: delay (float): number of seconds to wait between polling requests """ while not self.finished: time.sleep(delay) self.clear()