Source code for alab_management.resource_manager.resource_manager

"""
TaskLauncher is the core module of the system,
which actually executes the tasks.
"""

import logging
import time
from contextlib import contextmanager
from datetime import datetime
from traceback import format_exc
from typing import Any, cast

import dill
from bson import ObjectId

from alab_management.device_view.device_view import DeviceView
from alab_management.logger import DBLogger
from alab_management.resource_manager.enums import _EXTRA_REQUEST
from alab_management.resource_manager.resource_requester import (
    RequestMixin,
    RequestStatus,
)
from alab_management.sample_view.sample import SamplePosition
from alab_management.sample_view.sample_view import SamplePositionRequest, SampleView
from alab_management.task_view import TaskView
from alab_management.task_view.task_enums import CancelingProgress, TaskStatus
from alab_management.utils.data_objects import DocumentNotUpdatedError, get_collection
from alab_management.utils.logger import set_up_rich_handler
from alab_management.utils.module_ops import load_definition

cli_logger = logging.getLogger(__name__)
set_up_rich_handler(cli_logger)


[docs] class ResourceManager(RequestMixin): """ TaskManager will. (1) find all the ready tasks and submit them, (2) handle all the resource requests """ def __init__(self): load_definition() self.task_view = TaskView() self.sample_view = SampleView() self.device_view = DeviceView() self._request_collection = get_collection("requests") self._pause_resource_assigning = False self.logger = DBLogger(task_id=None) super().__init__() time.sleep(1) # allow some time for other modules to launch
[docs] @contextmanager def pause_resource_assigning(self): """Pause the resource assigning.""" try: self._pause_resource_assigning = True cli_logger.info("Pausing resource assigning.") yield finally: self._pause_resource_assigning = False cli_logger.info("Resuming resource assigning.")
[docs] def run(self): """Start the loop.""" while True: self._loop() time.sleep(0.5)
def _loop(self): self.handle_released_resources() if not self._pause_resource_assigning: self.handle_requested_resources()
[docs] def handle_released_resources(self): """Release the resources.""" for request in self.get_requests_by_status(RequestStatus.NEED_RELEASE): devices = request["assigned_devices"] sample_positions = request["assigned_sample_positions"] self._release_devices(devices) self._release_sample_positions(sample_positions) self.update_request_status( request_id=request["_id"], status=RequestStatus.RELEASED, original_status=RequestStatus.NEED_RELEASE, )
[docs] def handle_requested_resources(self): """ Check if there are any requests that are in PENDING status. If so, try to assign the resources to it. """ requests = list(self.get_requests_by_status(RequestStatus.PENDING)) # prioritize the oldest requests at the highest priority value requests.sort(key=lambda x: x["submitted_at"]) requests.sort(key=lambda x: x["priority"], reverse=True) for request in requests: self._handle_requested_resources(request)
def _handle_requested_resources(self, request_entry: dict[str, Any]): try: resource_request = request_entry["request"] task_id = request_entry["task_id"] task_status = self.task_view.get_status(task_id=task_id) if task_status != TaskStatus.REQUESTING_RESOURCES or task_id in { task["task_id"] for task in self.task_view.get_tasks_to_be_canceled( canceling_progress=CancelingProgress.WORKER_NOTIFIED ) }: # this implies the Task has been cancelled or errored somewhere else in the chain -- we should # not allocate any resources to the broken Task. self.update_request_status( request_id=request_entry["_id"], status=RequestStatus.CANCELED, original_status=RequestStatus.PENDING, ) return devices = self.device_view.request_devices( task_id=task_id, device_names_str=[ entry["device"]["content"] for entry in resource_request if entry["device"]["identifier"] == "name" ], device_types_str=[ entry["device"]["content"] for entry in resource_request if entry["device"]["identifier"] == "type" ], ) # some devices are not available now # the request cannot be fulfilled if devices is None: return # replace device placeholder in sample position request # and make it into a single list parsed_sample_positions_request = [] for request in resource_request: if request["device"]["identifier"] == _EXTRA_REQUEST: device_prefix = "" else: device_name = devices[request["device"]["content"]]["name"] device_prefix = f"{device_name}{SamplePosition.SEPARATOR}" for pos in request["sample_positions"]: prefix = pos["prefix"] # if this is a nested resource request, lets not prepend the device name twice. if not prefix.startswith(device_prefix): prefix = device_prefix + prefix parsed_sample_positions_request.append( SamplePositionRequest(prefix=prefix, number=pos["number"]) ) self._request_collection.update_one( {"_id": request_entry["_id"]}, { "$set": { "parsed_sample_positions_request": [ dict(spr) for spr in parsed_sample_positions_request ] } }, ) sample_positions = self.sample_view.request_sample_positions( task_id=task_id, sample_positions=parsed_sample_positions_request ) if sample_positions is None: return # in case some errors happen, we will raise the error in the task process instead of the main process except Exception as error: # pylint: disable=broad-except # we will store the error in the database for easier debugging error.args = (format_exc(),) returned_value = self._request_collection.update_one( {"_id": request_entry["_id"], "status": RequestStatus.PENDING.name}, { "$set": { "status": RequestStatus.ERROR.name, "error": dill.dumps(error), "assigned_devices": None, "assigned_sample_positions": None, } }, ) if returned_value.modified_count != 1: raise DocumentNotUpdatedError( f"Error updating request {request_entry['_id']}: cannot update the request status from PENDING " f"to ERROR." ) from error return # if both devices and sample positions can be satisfied request_entry = self._request_collection.find_one( {"_id": request_entry["_id"], "status": RequestStatus.PENDING.name} ) if request_entry is not None: # label the resources as occupied self._occupy_devices(devices=devices, task_id=task_id) self._occupy_sample_positions( sample_positions=sample_positions, task_id=task_id ) returned_value = self._request_collection.update_one( {"_id": request_entry["_id"], "status": RequestStatus.PENDING.name}, { "$set": { "assigned_devices": devices, "assigned_sample_positions": sample_positions, "status": RequestStatus.FULFILLED.name, "fulfilled_at": datetime.now(), } }, ) # if the request status cannot be updated (due to status change), release the resources if returned_value.modified_count != 1: self._release_devices(devices) self._release_sample_positions(sample_positions) def _occupy_devices(self, devices: dict[str, dict[str, Any]], task_id: ObjectId): for device in devices.values(): self.device_view.occupy_device( device=cast(str, device["name"]), task_id=task_id ) def _occupy_sample_positions( self, sample_positions: dict[str, list[dict[str, Any]]], task_id: ObjectId ): for sample_positions_ in sample_positions.values(): for sample_position_ in sample_positions_: self.sample_view.lock_sample_position( task_id, cast(str, sample_position_["name"]) ) def _release_devices(self, devices: dict[str, dict[str, Any]]): for device in devices.values(): if device["need_release"]: self.device_view.release_device(device["name"]) def _release_sample_positions( self, sample_positions: dict[str, list[dict[str, Any]]] ): for sample_positions_ in sample_positions.values(): for sample_position in sample_positions_: if sample_position["need_release"]: self.sample_view.release_sample_position(sample_position["name"])