alab_management.task_view.task_view module#

Tasks view is a convienent wrapper over the tasks collection in the database. It provides some convenience methods to query and manipulate the tasks collection.

class TaskView[source]#

Bases: object

Task view manages the status, parameters of a task.

create_subtask(task_id, subtask_type, samples, parameters)[source]#

Create a subtask entry for a task.

create_task(task_type, samples, parameters, prev_tasks=None, next_tasks=None, task_id=None)[source]#

Insert a task into the task collection.

Parameters:
  • task_type (str) – the type of task, which should be a type name of class inherited from BaseTask

  • samples (list[dict[str, Any]]) – the samples that this task will handle, which will be passed to Task object the same as parameters.

  • parameters (dict[str, Any]) – the required tasks for this task

  • prev_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ObjectId that refer to prev tasks of this task (which must be completed before current task)

  • next_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ObjectId that refer to next tasks of this task (which cannot start until this task finishes)

Return type:

ObjectId

Returns:

the assigned id for this task

encode_task(task_entry)[source]#

Rename _id to task_id Translate task’s type into corresponding python class.

Return type:

dict[str, Any]

exists(task_id)[source]#

Check if a task id exists.

Return type:

bool

get_ready_tasks()[source]#

Return a list of ready tasks.

Return type:

list[dict[str, Any]]

Returns:

List of task entry: {“task_id”: ObjectId, “type”: BaseTask}

get_status(task_id)[source]#

Get the status of a task.

Return type:

TaskStatus

get_task(task_id, encode=False)[source]#

Get a task by its task id, which will return all the info stored in the database.

Parameters:
  • task_id (ObjectId) – the task_id of interest. If not found, will return None

  • encode (bool) – whether to encode the task using self.encode_task method

Return type:

dict[str, Any]

Returns:

the task entry

get_task_with_sample(sample_id)[source]#

Get a task that contains the sample with the provided id.

Return type:

list[dict[str, Any]] | None

get_tasks_by_status(status)[source]#

Return a list of tasks with given status.

Return type:

list[dict[str, Any]]

Returns:

List of task entry: {“task_id”: ObjectId, “type”: BaseTask}

get_tasks_to_be_canceled(canceling_progress)[source]#

Get a list of tasks that are in the process of being canceled.

Parameters:

canceling_progress (CancelingProgress | None) – the progress of the task being canceled. If None, return all tasks that are in the process of being canceled.

Return type:

list[dict[str, Any]]

mark_task_as_canceling(task_id)[source]#

Try to cancel a task by setting the field “stopping” to True.

Return type:

bool

set_message(task_id, message)[source]#

Set message for one task. This is displayed on the dashboard.

set_task_actor_id(task_id, message_id)[source]#

Set task actor id for the task when it is submitted.

Parameters:
  • task_id (ObjectId) – the task id of the task

  • message_id (str) – a uid generated by dramatiq (message_id)

try_to_mark_task_ready(task_id)[source]#

Check if one task’s parent tasks are all completed, if so, mark it as READY.

update_canceling_progress(task_id, canceling_progress, original_progress)[source]#

Update the canceling progress of a task.

Return type:

bool

update_result(task_id, name=None, value=None)[source]#

Update result to completed job.

Parameters:
  • task_id (ObjectId) – the id of task to be updated name: the name of the result to be updated. If None, will update the entire result field. Otherwise, will update the field result.name. value: the value to be stored. This must be bson-encodable (ie can be written into MongoDB!)

  • name (Optional[str]) – the name of the result to be updated. If None, will update the entire result field. Otherwise, will update the field result.name.

  • value (Optional[Any]) – the value to be stored. This must be bson-encodable (i.e. can be written into MongoDB!)

update_status(task_id, status)[source]#

Update the status of one task.

If the status is COMPLETED, we will also try to mark its next tasks to READY, if all of its previous tasks are completed.

Parameters:
  • task_id (ObjectId) – the id of task to be updated

  • status (TaskStatus) – the new status of the task

update_subtask_result(task_id, subtask_id, result)[source]#

Update result of completed subtask within task job.

Parameters:
  • task_id (ObjectId) – the id of task to be updated

  • subtask_id (ObjectId) – the id of subtask within task to be updated

  • result (Any) – the result returned by the task (which can be dumped into MongoDB)

update_subtask_status(task_id, subtask_id, status)[source]#

Update the status of a subtask.

update_task_dependency(task_id, prev_tasks=None, next_tasks=None)[source]#

Add prev tasks and next tasks to one task entry, which will not overwrite old pre_task and next_tasks.

Parameters:
  • task_id (ObjectId) – the id of task to be updated

  • prev_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ids of prev_tasks

  • next_tasks (Union[ObjectId, list[ObjectId], None]) – one or a list of ids of next_tasks