Source code for wxflow.scheduler.slurm
from typing import List
from .scheduler import Scheduler
__all__ = ['Slurm']
[docs]
class Slurm(Scheduler):
"""
Class to construct Slurm job cards.
For SBATCH reference see: https://slurm.schedmd.com/sbatch.html
"""
DIRECTIVE = '#SBATCH'
_MAPPING = {'account': '--account',
'partition': '--partition',
'queue': '--qos',
'jobname': '--job-name',
'stdout': '--output',
'stderr': '--error',
'walltime': '--time',
'env': '--export',
'nodes': '--nodes',
'tasks': '--ntasks',
'tasks_per_core': '--ntasks_per-core',
'tasks_per_node': '--ntasks_per-node',
'memory': '--mem',
'exclusive': '--exclusive',
'debug': '--verbose'}
[docs]
def __init__(self, config: dict, *args, **kwargs):
"""
Constructor for Slurm scheduler.
Parameters
----------
config : dict
"""
super().__init__(config, *args, **kwargs)
batch_card = []
batch_card += self.get_accounting
batch_card += self.get_resources
batch_card += self.get_native
batch_card += self.get_env
# Add the directives to the batch card. Each directive is an item in a list.
self.batch_card = [f"{self.DIRECTIVE} {item}" for item in batch_card]
@property
def get_accounting(self):
"""
Generate the accounting specific items of the job card.
E.g. jobname, queuing, partitions, accounts etc.
"""
strings = []
if 'jobname' in self.specs:
strings.append(f"{self._MAPPING['jobname']}={self.specs.jobname}")
if 'account' in self.specs:
strings.append(f"{self._MAPPING['account']}={self.specs.account}")
if 'queue' in self.specs:
strings.append(f"{self._MAPPING['queue']}={self.specs.queue}")
if 'partition' in self.specs and self.specs.partition:
strings.append(f"{self._MAPPING['partition']}={self.specs.partition}")
if 'join' in self.specs and self.specs.join:
if 'stdout' in self.specs:
joined_output = self.specs.stdout
else:
joined_output = self.specs.stderr if 'stderr' in self.specs else None
if joined_output is not None:
strings.append(f"{self._MAPPING['stderr']}={joined_output}")
else:
if 'stdout' in self.specs:
strings.append(f"{self._MAPPING['stdout']}={self.specs.stdout}")
if 'stderr' in self.specs:
strings.append(f"{self._MAPPING['stderr']}={self.specs.stderr}")
return strings
@property
def get_resources(self):
"""
Generate the resource specific items of the job card.
E.g. nodes, memory, walltime, exclusive, etc.
"""
strings = []
if 'memory' in self.specs:
strings.append(f"{self._MAPPING['memory']}={self.specs.memory}")
if 'walltime' in self.specs:
strings.append(f"{self._MAPPING['walltime']}={self.specs.walltime}")
if 'nodes' in self.specs:
strings.append(f"{self._MAPPING['nodes']}={self.specs.nodes}")
if 'tasks_per_core' in self.specs:
strings.append(f"{self._MAPPING['tasks_per_core']}={self.specs.tasks_per_core}")
if 'tasks_per_node' in self.specs:
strings.append(f"{self._MAPPING['tasks_per_node']}={self.specs.tasks_per_node}")
if 'exclusive' in self.specs and self.specs.exclusive:
strings.append(f"{self._MAPPING['exclusive']}")
if 'debug' in self.specs and self.specs.debug:
strings.append(f"{self._MAPPING['debug']}")
return strings
@property
def get_env(self):
"""
Export environment variables
"""
strings = []
if 'env' in self.specs:
for item in self.specs.env:
strings.append(f"{self._MAPPING['env']}={item}")
return strings
@property
def get_native(self) -> List[str]:
"""
Generate the Slurm specific native directives verbatim from the user input.
"""
strings = []
if 'native' in self.specs:
for item in self.specs.native:
strings.append(f"--{item}")
return strings
# Register Slurm as a builder in the scheduler_factory
Scheduler.scheduler_factory.register('Slurm', Slurm)