Source code for wxflow.scheduler.pbs

import os
from typing import List

from .scheduler import Scheduler

__all__ = ['PBS']


[docs] class PBS(Scheduler): """ Class to construct PBS job cards. There are several PBS implementations. This implementation supports the PBS Pro implementation from https://www.altair.com/pdfs/pbsworks/PBSUserGuide2021.1.pdf """ DIRECTIVE = '#PBS' _MAPPING = {'account': '-A', 'queue': '-q', 'jobname': '-N', 'stdout': '-o', 'stderr': '-e', 'join': '-j', 'walltime': '-l walltime', 'env': '-V', 'nodes': '--nodes', 'tasks': '--ntasks', 'tasks_per_core': '--ntasks_per-core', 'tasks_per_node': '--ntasks_per-node', 'memory': '--mem', 'shell': '-S', 'debug': '-l debug', 'mail': '-M'}
[docs] def __init__(self, config: dict, *args, **kwargs): """ Parameters ---------- config args kwargs Returns ------- object """ super().__init__(config, *args, **kwargs) batch_card = [] batch_card += self.get_accounting batch_card += self.get_resources batch_card += self.get_native batch_card += self.get_env # Add the directive to the batch card self.batch_card = [f"{self.DIRECTIVE} {item}" for item in batch_card]
@property def get_accounting(self): """ Generate the accounting specific items of the job card. E.g. jobname, queuing, partitions, accounts etc. """ strings = [] if 'shell' in self.specs: strings.append(f"{self._MAPPING['shell']} {self.specs.shell}") if 'jobname' in self.specs: strings.append(f"{self._MAPPING['jobname']} {self.specs.jobname}") if 'account' in self.specs: strings.append(f"{self._MAPPING['account']} {self.specs.account}") if 'queue' in self.specs: strings.append(f"{self._MAPPING['queue']} {self.specs.queue}") if 'join' in self.specs and self.specs.join: strings.append(f"{self._MAPPING['join']} oe") if 'stdout' in self.specs: strings.append(f"{self._MAPPING['stdout']} {self.specs.stdout}") elif 'stderr' in self.specs: strings.append(f"{self._MAPPING['stderr']} {self.specs.stderr}") return strings @property def get_resources(self): """ Generate the resource specific items of the job card. E.g. nodes, memory, walltime, exclusive, etc. """ strings = [] if 'walltime' in self.specs: strings.append(f"{self._MAPPING['walltime']}={self.specs.walltime}") if 'debug' in self.specs: strings.append(f"{self._MAPPING['debug']}={self.specs.debug}") select = self.get_select if select: strings.append(f"-l select={select}") place = self.get_place if place: strings.append(f"-l place={place}") return strings @property def get_env(self): """ Export environment variables """ strings = [] if 'env' in self.specs: if any(item.upper() == 'ALL' for item in self.specs.env): strings.append(f"-V") else: env_list = [] for item in self.specs.env: env_list.append(f"{item}={os.getenv(item)}") strings.append(f"-v {','.join(env_list)}") return strings @property def get_select(self): """ Construct the "select" resource request for the job Returns ------- #PBS -l select=<get_select> """ strings = [] if 'nodes' in self.specs: strings.append(f"{self.specs.nodes}") if 'tasks_per_node' in self.specs: strings.append(f"mpiprocs={self.specs.tasks_per_node}") if 'threads' in self.specs: strings.append(f"ompthreads={self.specs.threads}") if 'tasks' in self.specs: strings.append(f"ncpus={self.specs.tasks}") if 'memory' in self.specs: strings.append(f"mem={self.specs.memory}") return ':'.join(strings) @property def get_place(self): """ Construct the "place" placement request for the job Returns ------- #PBS -l place=<get_place> """ strings = [] if 'chunk' in self.specs: # Valid options are: free, pack, scatter, vscatter strings.append(f"{self.specs.chunk}") if 'exclusive' in self.specs and self.specs.exclusive: strings.append('excl') return ':'.join(strings) @property def get_native(self) -> List[str]: """ Generate the PBS specific native directives verbatim from the user input. """ strings = [] if 'native' in self.specs: for item in self.specs.native: strings.append(f"-l {item}") return strings
# Register PBS as a builder in the scheduler_factory Scheduler.scheduler_factory.register('PBS', PBS)