Skip to content

Commit

Permalink
Extend the run command (#11)
Browse files Browse the repository at this point in the history
* proper multi dependency handling
  • Loading branch information
fdroessler authored Apr 5, 2021
1 parent 79bf08c commit 71592e3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 10 deletions.
19 changes: 16 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,27 @@ The default is `sbatch` which submits jobs to slurm.
Dependencies
============

Each time `slurmpy.Slurm().run()` is called, it returns the job-id of the submitted job. This
can then be sent to a subsequent job:
Each time `slurmpy.Slurm().run()` is called, it returns the job-id of the submitted job. One or more
of these can be provided as arguments to a subsequent job:
```
s = Slurm()
s.run(..., depends_on=[job_id])
# or for multiple dependencies
s.run(..., depends_on=[job_id_1, job_id_2, job_id_3])
```
to indicate that this job should not run until the the job with `job_id` has finished successfully.
to indicate that this job should not run until the the job with `job_id` (or ids `job_id_1-3`) has finished successfully.

Additionally, SLURM allows you to define how a job depends on previous jobs e.g. should it run
only on success or always etc. For this `slurmpy.Slurm().run()` provides an additional argument `depends_how`:

```
s = Slurmp()
s.run(..., depends_on=[job_id], depends_how='afterany')
```
currently supported are the following options with "afterok" as default:
"after", "afterany", "afterburstbuffer", "aftercorr", "afternotok", "afterok", "expand"
More details can be found here: https://slurm.schedmd.com/sbatch.html in the `--dependency` section.

Install
=======
Expand Down
40 changes: 33 additions & 7 deletions slurmpy/slurmpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import tempfile
import atexit
import hashlib
import datetime
from datetime import datetime

TMPL = """\
#!/bin/bash
Expand All @@ -61,6 +61,16 @@
__script__"""

VALID_DEPENDENCY_TYPES = {
"after",
"afterany",
"afterburstbuffer",
"aftercorr",
"afternotok",
"afterok",
"expand",
}


def tmp(suffix=".sh"):
t = tempfile.mktemp(suffix=suffix)
Expand Down Expand Up @@ -121,7 +131,7 @@ def _tmpfile(self):
return "%s/%s.sh" % (self.scripts_dir, self.name)

def run(self, command, name_addition=None, cmd_kwargs=None,
_cmd="sbatch", tries=1, depends_on=None):
_cmd="sbatch", tries=1, depends_on=None, depends_how='afterok'):
"""
command: a bash command that you want to run
name_addition: if not specified, the sha1 of the command to run
Expand All @@ -133,12 +143,16 @@ def run(self, command, name_addition=None, cmd_kwargs=None,
tries: try to run a job either this many times or until the first
success.
depends_on: job ids that this depends on before it is run (users 'afterok')
depends_how: ability to change how a job depends on others
"""
if depends_how not in VALID_DEPENDENCY_TYPES:
raise ValueError(f"depends_how must be in {VALID_DEPENDENCY_TYPES}")
if name_addition is None:
name_addition = hashlib.sha1(command.encode("utf-8")).hexdigest()

if self.date_in_name:
name_addition += "-" + str(datetime.date.today())
name_addition += "-" + datetime.strftime(datetime.now(),
format='%y-%m-%d-%H-%M-%S')
name_addition = name_addition.strip(" -")

if cmd_kwargs is None:
Expand All @@ -162,11 +176,23 @@ def run(self, command, name_addition=None, cmd_kwargs=None,
job_id = None
for itry in range(1, tries + 1):
args = [_cmd]
args.extend([("--dependency=afterok:%d" % int(d))
for d in depends_on])
# sbatch (https://slurm.schedmd.com/sbatch.html) job dependency has the following format:
# -d, --dependency=<dependency_list>
# <dependency_list> is of the form <type:job_id[:job_id][,type:job_id[:job_id]]>
# Create job dependency string
dependency_string = "".join([f":{d}" for d in depends_on])
if depends_on:
dependency_string= f"{depends_how}{dependency_string}"
if itry > 1:
mid = "--dependency=afternotok:%d" % job_id
args.append(mid)
mid = f"afternotok:{job_id}"
# Merge retry dependency to job dependencies
if dependency_string:
dependency_string = f"{dependency_string},{mid}"
else:
dependency_string= mid
# Add dependency option to sbatch
if dependency_string:
args.extend([f"--dependency={dependency_string}" ])
args.append(sh.name)
res = subprocess.check_output(args).strip()
print(res, file=sys.stderr)
Expand Down

0 comments on commit 71592e3

Please sign in to comment.