Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding Benchmark scripts and outline for run-pipeline.py script #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .rosetta-ci/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.pyc
benchmark.mac.ini
benchmark.linux.ini
benchmark.ubuntu.ini
results
410 changes: 410 additions & 0 deletions .rosetta-ci/benchmark.py

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions .rosetta-ci/benchmark.template.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Benchmark script configuration file. Some of the tests require some system specific options to run. Please see benchmark.ini.template for list of available options.
#

[DEFAULT]

[main] # additional config-options for various tests. All this fields will be pass as keys in 'config' function argument

# how many jobs daemon can run on host machine (this is not related to HPC jobs)
cpu_count = 24

# how many memory in GB daemon can use on host machine (approximation, float)
memory = 64

# user name and email for user who submitted this test
user_name = Jane Roe
user_email = [email protected]

# HPC Driver, might have one of the following values: MultiCore, Condor, Slurm or none if no HPC Driver should be configured
hpc_driver = MultiCore

# when running by daemons branch:revision will be set to appropriate values to represent currently checked version of main repository
branch = unknown
revision = 42

# path to directory where test results will be stored
results_root = ${_here_}/results

release_root = ./results/_release_

[slurm]
# head-node host name, if specified will be used to submit jobs
head_node =


[mount]
# list of key:path pairs that will be avalible as config.mounts during test run

# path to releases, leave empty if release production should not be supported by this daemon
release_root = ${_user_home_}/release
5 changes: 5 additions & 0 deletions .rosetta-ci/hpc_drivers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# :noTabs=true:

from .multicore import MultiCore_HPC_Driver
from .slurm import Slurm_HPC_Driver
210 changes: 210 additions & 0 deletions .rosetta-ci/hpc_drivers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# -*- coding: utf-8 -*-
# :noTabs=true:

import os, sys, subprocess, stat
import time as time_module
import signal as signal_module

class NT: # named tuple
def __init__(self, **entries): self.__dict__.update(entries)
def __repr__(self):
r = 'NT: |'
for i in dir(self):
if not i.startswith('__') and not isinstance(getattr(self, i), types.MethodType): r += '{} --> {}, '.format(i, getattr(self, i))
return r[:-2]+'|'



class HPC_Exception(Exception):
def __init__(self, value): self.value = value
def __str__(self): return self.value



def execute(message, command_line, return_='status', until_successes=False, terminate_on_failure=True, silent=False, silence_output=False, tracer=print):
if not silent: tracer(message); tracer(command_line); sys.stdout.flush();
while True:

p = subprocess.Popen(command_line, bufsize=0, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, errors = p.communicate()

output = output + errors

output = output.decode(encoding="utf-8", errors="replace")

exit_code = p.returncode

if exit_code and not (silent or silence_output): tracer(output); sys.stdout.flush();

if exit_code and until_successes: pass # Thats right - redability COUNT!
else: break

tracer( "Error while executing {}: {}\n".format(message, output) )
tracer("Sleeping 60s... then I will retry...")
sys.stdout.flush();
time.sleep(60)

if return_ == 'tuple': return(exit_code, output)

if exit_code and terminate_on_failure:
tracer("\nEncounter error while executing: " + command_line)
if return_==True: return True
else: print("\nEncounter error while executing: " + command_line + '\n' + output); sys.exit(1)

if return_ == 'output': return output
else: return False


def Sleep(time_, message, dict_={}):
''' Fancy sleep function '''
len_ = 0
for i in range(time_, 0, -1):
#print "Waiting for a new revision:%s... Sleeping...%d \r" % (sc.revision, i),
msg = message.format( **dict(dict_, time_left=i) )
print( msg, end='' )
len_ = max(len_, len(msg))
sys.stdout.flush()
time_module.sleep(1)

print( ' '*len_ + '\r', end='' ) # erazing sleep message


# Abstract class for HPC job submission
class HPC_Driver:
def __init__(self, working_dir, config, tracer=lambda x:None, set_daemon_message=lambda x:None):
self.working_dir = working_dir
self.config = config
self.cpu_usage = 0.0 # cummulative cpu usage in hours
self.tracer = tracer
self.set_daemon_message = set_daemon_message

self.cpu_count = self.config['cpu_count'] if type(config) == dict else self.config.getint('DEFAULT', 'cpu_count')

self.jobs = [] # list of all jobs currently running by this driver, Job class is driver depended, could be just int or something more complex

self.install_signal_handler()


def __del__(self):
self.remove_signal_handler()


def execute(self, executable, arguments, working_dir, log_dir=None, name='_no_name_', memory=256, time=24, shell_wrapper=False, block=True):
''' Execute given command line on HPC cluster, must accumulate cpu hours in self.cpu_usage '''
if log_dir==None: log_dir=self.working_dir

if shell_wrapper:
shell_wrapper_sh = os.path.abspath(self.working_dir + '/hpc.{}.shell_wrapper.sh'.format(name))
with file(shell_wrapper_sh, 'w') as f: f.write('#!/bin/bash\n{} {}\n'.format(executable, arguments)); os.fchmod(f.fileno(), stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
executable, arguments = shell_wrapper_sh, ''

return self.submit_serial_hpc_job(name=name, executable=executable, arguments=arguments, working_dir=working_dir, log_dir=log_dir, jobs_to_queue=1, memory=memory, time=time, block=block, shell_wrapper=shell_wrapper)



@property
def number_of_cpu_per_node(self):
must_be_implemented_in_inherited_classes

@property
def maximum_number_of_mpi_cpu(self):
must_be_implemented_in_inherited_classes


def submit_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
print('submit_hpc_job is DEPRECATED and will be removed in near future, please use submit_serial_hpc_job instead!')
must_be_implemented_in_inherited_classes


def submit_serial_hpc_job(self, name, executable, arguments, working_dir, jobs_to_queue, log_dir, memory=512, time=12, block=True, shell_wrapper=False):
must_be_implemented_in_inherited_classes


def submit_mpi_hpc_job(self, name, executable, arguments, working_dir, log_dir, memory=512, time=12, block=True, process_coefficient="1", requested_nodes=1, requested_processes_per_node=1):
''' submit jobs as MPI job
process_coefficient should be string representing fraction of process to launch on each node, for example '3 / 4' will start only 75% of MPI process's on each node
'''
must_be_implemented_in_inherited_classes


def cancel_all_jobs(self):
''' Cancel all HPC jobs known to this driver, use this as signal handler for script termination '''
for j in self.jobs: self.cancel_job(j)

def block_until(self, silent, fn, *args, **kwargs):
'''
**fn must have the driver as the first argument**
example:
def fn(driver):
jobs = list(driver.jobs)
jobs = [job for job in jobs if not driver.complete(job)]
if len(jobs) <= 8:
return False # stops sleeping
return True # continues sleeping

for x in range(100):
hpc_driver.submit_hpc_job(...)
hpc_driver.block_until(False, fn)
'''
while fn(self, *args, **kwargs):
sys.stdout.flush()
time_module.sleep(60)
if not silent:
Sleep(1, '"Waiting for HPC job(s) to finish, sleeping {time_left}s\r')

def wait_until_complete(self, jobs=None, callback=None, silent=False):
''' Helper function, wait until given jobs list is finished, if no argument is given waits until all jobs known by driver is finished '''
jobs = jobs if jobs else self.jobs

while jobs:
for j in jobs[:]:
if self.complete(j): jobs.remove(j)

if jobs:
#total_cpu_queued = sum( [j.jobs_queued for j in jobs] )
#total_cpu_running = sum( [j.cpu_running for j in jobs] )
#self.set_daemon_message("Waiting for HPC job(s) to finish... [{} process(es) in queue, {} process(es) running]".format(total_cpu_queued, total_cpu_running) )
#self.tracer("Waiting for HPC job(s) [{} process(es) in queue, {} process(es) running]... \r".format(total_cpu_queued, total_cpu_running), end='')
#print "Waiting for {} HPC jobs to finish... [{} jobs in queue, {} jobs running]... Sleeping 32s... \r".format(total_cpu_queued, cpu_queued+cpu_running, cpu_running),

self.set_daemon_message("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) )
#self.tracer("Waiting for HPC {} job(s) to finish...".format( len(jobs) ) )

sys.stdout.flush()

if callback: callback()

if silent: time_module.sleep(64*1)
else: Sleep(64, '"Waiting for HPC {n_jobs} job(s) to finish, sleeping {time_left}s \r', dict(n_jobs=len(jobs)))



_signals_ = [signal_module.SIGINT, signal_module.SIGTERM, signal_module.SIGABRT]
def install_signal_handler(self):
def signal_handler(signal_, frame):
self.tracer('Recieved signal:{}... Canceling HPC jobs...'.format(signal_) )
self.cancel_all_jobs()
self.set_daemon_message( 'Remote daemon got terminated with signal:{}'.format(signal_) )
sys.exit(1)

for s in self._signals_: signal_module.signal(s, signal_handler)


def remove_signal_handler(self): # do we really need this???
try:
for s in self._signals_: signal_module.signal(s, signal_module.SIG_DFL)
#print('remove_signal_handler: done!')

except TypeError:
#print('remove_signal_handler: interpreted terminating, skipping remove_signal_handler...')
pass


def cancel_job(self, job_id):
must_be_implemented_in_inherited_classes


def complete(self, job_id):
''' Return job completion status. Return True if job complered and False otherwise
'''
must_be_implemented_in_inherited_classes
Loading