alab_management.task_view.task_view module#

Tasks view is a convienent wrapper over the tasks collection in the database. It provides some convenience methods to query and manipulate the tasks collection.

class TaskView[source]#

Bases: object

Task view manages the status, parameters of a task.

create_subtask(task_id, subtask_type, samples, parameters)[source]#

Create a subtask entry for a task.

create_task(task_type, samples, parameters, prev_tasks=None, next_tasks=None, task_id=None)[source]#

Insert a task into the task collection.

  • task_type (str) – the type of task, which should be a type name of class inherited from BaseTask

  • samples (list[dict[str, Any]]) – the samples that this task will handle, which will be passed to Task object the same as parameters.

  • parameters (dict[str, Any]) – the required tasks for this task

  • prev_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ObjectId that refer to prev tasks of this task (which must be completed before current task)

  • next_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ObjectId that refer to next tasks of this task (which cannot start until this task finishes)

Return type:



the assigned id for this task


Rename _id to task_id Translate task’s type into corresponding python class.

Return type:

dict[str, Any]


Check if a task id exists.

Return type:



Return a list of ready tasks.

Return type:

list[dict[str, Any]]


List of task entry: {“task_id”: ObjectId, “type”: BaseTask}


Get the status of a task.

Return type:


get_task(task_id, encode=False)[source]#

Get a task by its task id, which will return all the info stored in the database.

  • task_id (ObjectId) – the task_id of interest. If not found, will return None

  • encode (bool) – whether to encode the task using self.encode_task method

Return type:

dict[str, Any]


the task entry


Get a task that contains the sample with the provided id.

Return type:

list[dict[str, Any]] | None


Return a list of tasks with given status.

Return type:

list[dict[str, Any]]


List of task entry: {“task_id”: ObjectId, “type”: BaseTask}


Get a list of tasks that are in the process of being canceled.


canceling_progress (CancelingProgress | None) – the progress of the task being canceled. If None, return all tasks that are in the process of being canceled.

Return type:

list[dict[str, Any]]


Try to cancel a task by setting the field “stopping” to True.

Return type:


set_message(task_id, message)[source]#

Set message for one task. This is displayed on the dashboard.

set_task_actor_id(task_id, message_id)[source]#

Set task actor id for the task when it is submitted.

  • task_id (ObjectId) – the task id of the task

  • message_id (str) – a uid generated by dramatiq (message_id)


Check if one task’s parent tasks are all completed, if so, mark it as READY.

update_canceling_progress(task_id, canceling_progress, original_progress)[source]#

Update the canceling progress of a task.

Return type:


update_result(task_id, name=None, value=None)[source]#

Update result to completed job.

  • task_id (ObjectId) – the id of task to be updated name: the name of the result to be updated. If None, will update the entire result field. Otherwise, will update the field value: the value to be stored. This must be bson-encodable (ie can be written into MongoDB!)

  • name (Optional[str]) – the name of the result to be updated. If None, will update the entire result field. Otherwise, will update the field

  • value (Optional[Any]) – the value to be stored. This must be bson-encodable (i.e. can be written into MongoDB!)

update_status(task_id, status)[source]#

Update the status of one task.

If the status is COMPLETED, we will also try to mark its next tasks to READY, if all of its previous tasks are completed.

  • task_id (ObjectId) – the id of task to be updated

  • status (TaskStatus) – the new status of the task

update_subtask_result(task_id, subtask_id, result)[source]#

Update result of completed subtask within task job.

  • task_id (ObjectId) – the id of task to be updated

  • subtask_id (ObjectId) – the id of subtask within task to be updated

  • result (Any) – the result returned by the task (which can be dumped into MongoDB)

update_subtask_status(task_id, subtask_id, status)[source]#

Update the status of a subtask.

update_task_dependency(task_id, prev_tasks=None, next_tasks=None)[source]#

Add prev tasks and next tasks to one task entry, which will not overwrite old pre_task and next_tasks.

  • task_id (ObjectId) – the id of task to be updated

  • prev_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ids of prev_tasks

  • next_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ids of next_tasks