Source code for wxflow.scheduler.scheduler

from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

from ..attrdict import AttrDict
from ..factory import Factory
from ..timetools import timedelta_to_HMS, to_timedelta


[docs] class Scheduler: scheduler_factory = Factory('Scheduler')
[docs] def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any) -> None: """ Initializes the scheduler with the provided configuration. Parameters ---------- config : Dict Configuration dictionary for the scheduler. The expected keys and their usage are: - 'scheduler_type' (str, required): Specifies the type of scheduler to use. Accepted values are 'slurm' or 'pbs'. - 'job_name' (str, required): Name of the job. Used in both Slurm and PBS. - 'partition' (str, optional): Partition or queue to submit the job to. Used in Slurm as 'partition', in PBS as 'queue'. - 'nodes' (int, optional): Number of nodes to request. Used in both Slurm and PBS. - 'ntasks' (int, optional): Number of tasks. Used in Slurm. - 'ppn' (int, optional): Processors per node. Used in PBS. - 'time' (str, optional): Walltime limit for the job (e.g., '01:00:00'). Used in both. - 'output' (str, optional): Path for standard output file. Used in both. - 'error' (str, optional): Path for standard error file. Used in both. - 'account' (str, optional): Account to charge for resources. Used in both. - 'mail_user' (str, optional): Email address for notifications. Used in both. - 'mail_type' (str, optional): Type of email notifications. Used in both. - 'native' (str, optional): Any additional scheduler-specific options can be included as needed. Notes ----- - Required keys: 'scheduler_type', 'job_name' - Optional keys: All others listed above. - The dictionary may contain other scheduler-specific options as needed. *args : Any Additional positional arguments. **kwargs : Any Additional keyword arguments. """ # Cache incoming config self._config = AttrDict(config) self._config_to_specs() self.batch_card = [] # Set attributes from the user provided information for arg in args: setattr(self, str(arg), arg) for key, value in kwargs.items(): setattr(self, key, value)
def _config_to_specs(self) -> None: """ Deep copies and converts the internal configuration dictionary to a standardized specification format. This method processes the internal `_config` attribute and assigns the self.specs attribute with the following transformations: - Converts the 'memory' field to a string in megabytes (e.g., '1024M'). - Converts the 'walltime' field to a string in HH:MM:SS format. - Ensures the 'env' field (environment variables) is a list. - Ensures the 'native' field (native scheduler directives) is a list. """ self.specs = self._config.deepcopy() # Convert 'memory' to MB if 'memory' in self._config: self.specs.memory = str(self.memory_in_megabytes(self._config.memory)) + 'M' # Convert 'walltime' to HH:MM:SS format if 'walltime' in self._config: self.specs.walltime = self.walltime_in_string(self._config.walltime) # Ensure environment variables are a list if 'env' in self._config: if not isinstance(self._config.env, (list, tuple)): self.specs.env = [self._config.env] # Ensure native scheduler directives are a list if 'native' in self._config: if not isinstance(self._config.native, (list, tuple)): self.specs.native = [self._config.native] return
[docs] def dump(self, filename: Optional[str] = None) -> None: """ Dumps the batch card to a file or prints it to stdout. If a filename is provided, writes the contents of `self.batch_card` to the specified file. Otherwise, prints the batch card to standard output. Parameters ---------- filename : Optional[str], default=None The path to the file where the batch card should be written. If None, the batch card is printed to stdout. Returns ------- None Raises ------ Exception If an error occurs while writing to the specified file. """ if filename is not None: try: with open(filename, 'w') as fh: for item in self.batch_card: fh.write(f"{item}\n") except Exception as e: raise Exception(f"Unknown exception in writing scheduler directives to {filename} from {e}") else: print(self.get_batch_card)
@property def get_batch_card(self) -> str: return '\n'.join(self.batch_card) @property def get_accounting(self) -> None: """ Generate the accounting specific items of the job card. E.g. jobname, queuing, partitions, accounts etc. """ pass @property def get_resources(self) -> None: """ Generate the resource specific items of the job card. E.g. nodes, memory, walltime, exclusive, etc. """ pass @property def get_env(self) -> None: """ Export environment variables """ pass @property def get_native(self) -> List[str]: # """ # Generate the scheduler specific native directives verbatim from the user input. # """ strings = [] if 'native' in self.specs: for item in self.specs.native: strings.append(f"{item}") return strings
[docs] @staticmethod def memory_in_bytes(memory: str) -> float: """ Converts bytes, k, M, G, T (case-insensitive) to number of bytes Default units of input memory string is bytes Uses powers of 1024 for scaling (kilobytes, megabytes, etc.) 1024 Bytes = 1 KB Parameters ---------- memory: str Memory string to convert, e.g., '1024', '1K', '512M', '2G', '1T' Returns ------- int: memory in bytes """ scale = {'B': 0, 'K': 1, 'M': 2, 'G': 3, 'T': 4} multiplier = 1 if memory[-1].upper() in scale: multiplier = 1024 ** scale[memory[-1]] memory = memory[:-1] return int(memory) * multiplier
[docs] @staticmethod def memory_in_megabytes(memory: str) -> int: """ Converts input memory in bytes into Megabytes 1 MB = 1048576 Bytes """ return int(Scheduler.memory_in_bytes(memory) / 1048576.0)
[docs] @staticmethod def walltime_in_string(walltime: Union[str, timedelta]) -> str: if isinstance(walltime, timedelta): return timedelta_to_HMS(walltime) elif isinstance(walltime, str): return timedelta_to_HMS(to_timedelta(walltime)) else: raise ValueError(f"Invalid walltime format: {walltime}. Expected a string or timedelta.")