alab_management.task_view.task module#

Define the base class of task, which will be used for defining more tasks.

class BaseTask(samples=None, task_id=None, lab_view=None, priority=TaskPriority.NORMAL, _offline_mode=True, *args, **kwargs)[source]#

Bases: ABC

The abstract class of task.

All the tasks should inherit from this class.

add_to(samples)[source]#

Used to add basetask to a SampleBuilder’s tasklist during Experiment construction.

Args: samples (Union[SampleBuilder, List[SampleBuilder]]): One or more SampleBuilder’s which will have this task appended to their tasklists.

classmethod from_kwargs(samples, task_id, **subclass_kwargs)[source]#

Used to create a new task object from the provided arguments.

This is used in the add_to and ExperimentBuilder.add_task method to create a new task object and validate it before adding it to an experiment or sample builder.

Return type:

BaseTask

get_message()[source]#

Gets the task message to be displayed on the dashboard.

property is_offline: bool#

Returns True if this task is in offline, False if it is a live task.

property priority: int#

Returns the priority of this task.

property result_specification: type[BaseModel] | None#

Returns a pydantic model describing the results to be generated by this task. If specified, this model will be used by task_actor to validate the results after the task is completed. If any error occurs, a warning will be printed. If there is a LargeResult in the result, it will ensured to be stored in the database.

Raises:

NotImplementedError – The subclass must implement this method.:

Returns:

BaseModel

Return type:

A Pydantic model type describing the results to be generated by this task.

abstract run()[source]#

Run the task. In this function, you can request lab resources from lab manager and log data to database with logger.

request_resources will not return until all the requested resources are available. So the task will pend until it gets the requested resources, which prevent the conflict in the resource allocation.

When a device get the requested device and sample positions, it also takes over the ownership of these resources, i.e., other task cannot use the device or request the sample positions this task has requested.

We use a context manager to manage the ownership of the resources. when a task is completed, all the devices and sample positions will be released automatically.

Here is an example about how to define the task

# request devices and sample positions from lab manager. The `$` represents
# the name of assigned devices in the sample positions we try to request,
# 4 is the number of sample positions.
with self.lab_view.request_resources({Furnace: [("$.inside", 4)]}) as devices_and_positions:
    devices, sample_positions = devices_and_positions
    furnace = devices[Furnace]
    inside_furnace = sample_positions[Furnace]["$.inside"]

    for sample in self.samples:
        # in a task, we can call other tasks, which will share the same
        # task id, requested devices and sample positions.
        moving_task = Moving(sample=sample,
                             task_id=self.task_id,
                             dest=inside_furnace[0],
                             lab_view=self.lab_view,
                             logger=self.logger)
        moving_task.run()

    # send command to device
    furnace.run_program(self.setpoints)

    while furnace.is_running():
        # log the device data, which is current temperature of the furnace
        self.logger.log_device_signal({
            "device": furnace.name,
            "temperature": furnace.get_temperature(),
        })
run_subtask(task, samples=None, **kwargs)[source]#

Run a subtask of this current task. Returns the result, if any, of the subtask.

property samples: list[str]#

Returns the list of samples associated with this task.

set_message(message)[source]#

Sets the task message to be displayed on the dashboard.

validate()[source]#

Validate the task.

This function will be called before the task is executed. Should return False if the task has values that make it impossible to execute. For example, a Heating subclass of BaseTask might return False if the set temperature is too high for the furnace.

By default, this function returns True unless it is overridden by a subclass.

Return type:

bool

class LargeResult(**data)[source]#

Bases: BaseModel

A Pydantic model for a large result (file >16 MB). Stored in either gridFS or other filesystems (Cloud AWS S3, etc.).

classmethod check_file_like_data(values)[source]#

Check if file_like_data has a .read() method.

check_if_stored()[source]#

Check if the large result is stored in the storage system.

file_like_data: Optional[Any]#
classmethod from_file_like_data(file_like_data, storage_type=<object object>)[source]#

Create a LargeResult object from a file-like object. File-like object must have a .read() method. If file is failed to be stored, will raise a ValueError.

Parameters:
  • file_like_data (Any) – the file-like data

  • storage_type (str) – the storage type, default to the default storage type in the config

Returns:

LargeResult: the LargeResult object

classmethod from_local_file(local_path, storage_type=<object object>)[source]#

Create a LargeResult object from a local file and store it. If file is failed to be stored, will raise a ValueError.

Parameters:
  • local_path (str | Path) – the path to the local file

  • storage_type (str) – the storage type, default to the default storage type in the config

Returns:

LargeResult: the LargeResult object

identifier: str | ObjectId | None#
local_path: str | Path | None#
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'file_like_data': FieldInfo(annotation=Union[Any, NoneType], required=False, default=None), 'identifier': FieldInfo(annotation=Union[str, ObjectId, NoneType], required=False, default=None), 'local_path': FieldInfo(annotation=Union[str, Path, NoneType], required=False, default=None), 'storage_type': FieldInfo(annotation=str, required=False, default_factory=<lambda>)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

retrieve()[source]#

Retrieve the large result from the storage system.

storage_type: str#
store()[source]#

Store the large result in the storage system. This method should block until the result is confirmed to be stored. This method should have a timeout regardless of the storage system to not block indefinitely.

add_reroute_task(supported_sample_positions, task, **kwargs)[source]#

Register a reroute task.

add_task(task)[source]#

Register a task.

get_all_tasks()[source]#

Get all the tasks in the registry.

Return type:

dict[str, type[BaseTask]]

get_task_by_name(name)[source]#

Get a task by name.

Return type:

type[BaseTask]