subway package

subway.bootstrap module

actions when sub init in cli, including mkdir, touch files and render templates in templates fold

subway.bootstrap._mkdir(path, conf, key)[source]
subway.bootstrap.default_conf(path)[source]

generate default config dict for subway project

Parameters

path – str. conf["work_dir] = path

Returns

Dict. The default config dict.

subway.bootstrap.env_init(path, conf=None, fromfile=None, include_main=True)[source]

initialize a subway project including: mkdir .subway, and inputs/outputs check inputs/outputs dirs if possible create config.json and empty history.json within .subway render an example main.py

Parameters
  • path – str. absolute path for subway project dir

  • conf – Optional[Dict], Default None, indicates a simple default config is applied.

  • fromfile – Optional[str]. Default None, indicates a default example main.py will be rendered. If specified as a full path, the corresponding file will be copied into subway project as entry point.

  • include_main – Optional[bool], default True. If False, main.py is not rendered.

Returns

None.

subway.bootstrap.mkdirs(path, conf)[source]

mkdirs for “inputs_dir”, “outputs_dir”, “check_inputs_dir”, “check_outputs_dir”, if they are mentioned in conf dict

Parameters
  • path – str. full path for subway project.

  • conf – Dict. config dict for subway project.

Returns

None

subway.bootstrap.render_config(path, conf)[source]

generate config.json based on conf dict within .subway dir

Parameters
  • path – str. full path for subway project.

  • conf – Dict. config dict for subway project.

Returns

None

subway.bootstrap.render_history(path)[source]

generate empty history.json with only {} within .subway dir

Parameters

path – str. full path for subway project.

Returns

None

subway.bootstrap.render_main(path, conf, fromfile=None)[source]

render main.py as detailed as possible and make its permission 700 (executable).

Parameters
  • path – str. full path for subway project.

  • conf – Dict. config dict for subway project.

  • fromfile – Optional[str]. Default None, indicates a default example main.py will be rendered. If specified as a full path, the corresponding file will be copied into subway project as entry point.

Returns

None.

subway.cli module

cli interface for sub init and more

class subway.cli.SubwayCLI(_argv=None, _test=False)[source]

Bases: object

__call__()[source]

Call self as a function.

__init__(_argv=None, _test=False)[source]

Initialize self. See help(type(self)) for accurate signature.

_copy_setup(fname)[source]
static _get(s, k, _default=None)[source]

_get({"a": {"b":1}}, "a.b") = 1

Parameters
  • s – Dict.

  • k – str. Key string separating with “.”

  • _default – Any, default return when nothing match. Default value is None.

Returns

_history_clean()[source]
_meta_query_attr(key)[source]
_meta_query_key(key)[source]

meta function to generate member function for this class, generate function as follows

def query_checked(self):
    if self.jid:
        print("1" if self.history[self.jid]['state'] == "checked" else "0")
    else:
        r = []
        for j, s in self.history.items():
            if s['state'] == "checked":
                r.append(j)
        print(*r, sep="\n")
Parameters

key – str. The added function will be named as query_key

Returns

None

_meta_query_ts(tstype)[source]
_noid()[source]
static _set(s, k, v)[source]

_set({"a":{"b":{"c":2}, "zz":"pp"}}, "a.b.m", "q")={'a': {'b': {'c': 2, 'm': 'q'}, 'zz': 'pp'}}

Parameters
  • s – Dict.

  • k – Any.

  • v – Any.

Returns

Dict. after the set

c()
config()[source]
d()
debug()[source]
h()
help()[source]
i()
init()[source]
q()
query()[source]
query_condition()[source]
query_i()
query_info()[source]
query_input()[source]
query_l()
query_leaves()[source]
query_o()
query_output()[source]
query_r()
query_root()[source]
query_s()
query_state()[source]
query_t()
query_tree()[source]
r()
run()[source]

subway.config module

import config from config.json

subway.config._conf_abs_dir(prefix)[source]
subway.config.update_conf()[source]

write conf in memory back to config.json.

Returns

None.

subway.config.update_history()[source]

write history in memory back to history.json.

Returns

None.

subway.cons module

Workaround to configure work_dir in runtime.

subway.exceptions module

Exception classes for subway.

exception subway.exceptions.CLIException(message, code=10)[source]

Bases: subway.exceptions.SubwayException

__init__(message, code=10)[source]
Parameters
  • message – str.

  • code – int. 10 general, 11 jobid unmatch, 12 only valid for general without id 13 no such atrribute in history of conf

exception subway.exceptions.EndingBubble(message, code=10)[source]

Bases: subway.exceptions.SubwayException

exception subway.exceptions.MatchError(message, code=11)[source]

Bases: subway.exceptions.CLIException

__init__(message, code=11)[source]
Parameters
  • message – str.

  • code – int. 10 general, 11 jobid unmatch, 12 only valid for general without id 13 no such atrribute in history of conf

exception subway.exceptions.NoAttribute(message, code=13)[source]

Bases: subway.exceptions.CLIException

__init__(message, code=13)[source]
Parameters
  • message – str.

  • code – int. 10 general, 11 jobid unmatch, 12 only valid for general without id 13 no such atrribute in history of conf

exception subway.exceptions.SubwayException(message, code=10)[source]

Bases: Exception

__init__(message, code=10)[source]
Parameters
  • message – str.

  • code – int. 10 general, 11 jobid unmatch, 12 only valid for general without id 13 no such atrribute in history of conf

exception subway.exceptions.TestBubble(message, code=10)[source]

Bases: subway.exceptions.SubwayException

subway.framework module

Basic classes: submitter, checker and processor for subway main loop

class subway.framework.Checker[source]

Bases: abc.ABC

Empty base abstract class for checker.

abstract __call__()[source]

Call self as a function.

_abc_impl = <_abc_data object>
class subway.framework.PlainChk(params=None, **kwargs)[source]

Bases: subway.framework.Checker

The very general checker class.

__call__()[source]

DIY: not recommended. main process of checker.

Returns

None.

__init__(params=None, **kwargs)[source]
Parameters

params – list of dict

_abc_impl = <_abc_data object>
check_aborted(jobid)[source]

DIY: depends. Check job whose state is aborted. These jobs failed the main task, and this method should generate resolve task and resolve input if necessary. If resolve job is not under control of submitter, the default implementation is ok.

Parameters

jobid – str.

Returns

List[Tuple[str, Dict]]. The length of the list must be 0 or 1.

abstract check_checking(jobid)[source]

DIY: must. Check job whose state is checking. Jobs are sent to this method only when is_checked() returns True. Generate possible new jobs and inputs based on results from main task and possibly check task. The very core of checker class.

Parameters

jobid – str.

Returns

List[Tuple[str, Dict]]. [(jobid, resource dict), …]

check_finished(jobid)[source]

DIY: depends. Check job whose state is finished. Namely, the main task has been computed sucessfully, we need to check output of main task to further generate check job and associated check input. On the other hand, if there is no need for check job to be controlled by submitter (i.e. one simply check the main output in checker and generate new jobs), one can omit this method, since the default implementation is used for such case (no check job scenario).

Parameters

jobid – str.

Returns

List[Tuple[str, Dict]]. The length of the list must be 0 (no check job) or 1 (namely check job must be one to one correspondence with main job).

check_kickstart()[source]

DIY: strongly recommended. Generate jobs with inputs at the beginning. The default impl does nothing. If the user doesn’t define this method in subclass, then the user must add inputs files and possible items in empty history.json by hand.

Returns

List[Tuple[str, Dict]]. [(jobid, resource dict)]

check_resolving(jobid)[source]

DIY: depends. Check job whose state is resolving. Jobs are sent to this method only when is_resolved() returns True. If the user case has no consideration on error tolerant and aborted states, the method can be left as it is doing nothing.

Parameters

jobid – str.

Returns

List[Tuple[str, Dict]]. [(jobid, resource dict), …]

check_task(jobid=None)[source]

DIY: not recommended. Determine whether output is “converged” and if not, generate new input files for more calculation with the new jobid and resource returned. The inner process will dispatch to check_finished(), check_aborted() check_checking(), check_resolving().

Parameters

jobid – Optional[str, None]. Default None means kickstart generation for inputs tasks.

Returns

List[Tuple[str, Dict]]. [(next jobid, resource dict)]

ending_time(jobid)[source]

DIY: depends. This method is reponsible for 4 states: checked, resolved, frustrated, failed. The default impl is now.

Parameters

jobid – str.

Returns

float. timestamp.

finishing_time(jobid)[source]

DIY: depends. This method is reponsible for 2 states: finished and aborted. The default impl for finished state is last modifed time of output file.

Parameters

jobid – str.

Returns

float. timestamp.

is_aborted(jobid)[source]

DIY: depends. Whether a running task is failed.

Parameters

jobid – str.

Returns

bool.

is_checked(jobid)[source]

DIY: depends. Whether a checking task is finished.

Parameters

jobid – str.

Returns

bool.

is_failed(jobid)[source]

DIY: depends. Whether a resolving task is failed.

Parameters

jobid – str.

Returns

bool.

is_finished(jobid)[source]

DIY: strongly recommended. Whether a running task is finished. The default impl is to check whether output file exists. It is not a good criteria for finished job in most cases.

Parameters

jobid – str.

Returns

is_frustrated(jobid)[source]

DIY: depends. Whether a checking task is failed.

Parameters

jobid – str.

Returns

bool.

is_resolved(jobid)[source]

DIY: depends. Whether a resolving task is finished.

Parameters

jobid – str.

Returns

bool.

post_new_input(jobid, resource=None, prev=None)[source]

DIY: not recommended. Make some inplace changes on history dict after new task is generated, it is vital for history record system. This method is not recommended to customize in subclass of checker, at least, one should include it by super().post_new_input.

Parameters
  • jobid – str. new job id.

  • resource – Optional[Dict]. resource dict for the new job.

  • prev – Optional[str]. Parent job id. Default None indicates no prev job.

Returns

None.

class subway.framework.PlainSub(**kwargs)[source]

Bases: subway.framework.Submitter

The very general submitter class.

__call__()[source]

DIY: not recommend.

Returns

__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

_abc_impl = <_abc_data object>
is_restricted()[source]

DIY: depends. The default impl is ok if you only want to restrict some resource in simple and soft way. check the restriction is ok for now with all running tasks in consideration.

Returns

bool. whether the total resource limit is reached.

priority_task()[source]

DIY: depends. The default impl prefer jobs in aborted state, then finished state and finally pending state. Plus, the default impl prefer older jobs. choose the first task to be run from pending, finished and aborted queue

Returns

str, jobid.

submit_aborted(jobid)[source]

DIY: depends. If you have to deal with DS scheme as in Double vs. Single Submitter together with fault tolerance, then you must implement this method.

Parameters

jobid – str.

Returns

submit_finished(jobid)[source]

DIY: depends. If you have to deal with DS scheme as in Double vs. Single Submitter, then you must implement this method.

Parameters

jobid – str.

Returns

submit_job(jobid)[source]

DIY: not recommend. submit job, for example, generate sbatch file and submit it to slurm. This method automaticall dispatch to submit_pending(), submit_finished() and submit_aborted().

Parameters

jobid – str.

Returns

None.

abstract submit_pending(jobid)[source]

DIY: must. The very core of submitter class. It describes how do you define submitting a job.

Parameters

jobid – str.

Returns

class subway.framework.PreProcessor(pipeline=None, **kwargs)[source]

Bases: subway.framework.Processor

_abc_impl = <_abc_data object>
static conf_update()[source]

Write conf in memory back to config.json.

Returns

None.

version_check()[source]

Must also include update_conf() into the pipeline. Lock the binary version with hashing, ensure different binary cannot be run unless the version is changed in config.json/

Returns

None

class subway.framework.Processor(pipeline=None, **kwargs)[source]

Bases: abc.ABC

Base class for processor

__call__()[source]

Call self as a function.

__init__(pipeline=None, **kwargs)[source]
Parameters
  • pipeline – List[str]. List of str of method name that processor will run one by one.

  • kwargs

_abc_impl = <_abc_data object>
class subway.framework.Submitter[source]

Bases: abc.ABC

Empty base abstract class for submitter.

abstract __call__()[source]

Call self as a function.

_abc_impl = <_abc_data object>

subway.htree module

Forest of multiple trees of tasks

class subway.htree.HTree(history)[source]

Bases: object

Trees structure for tasks recorded in history.json.

BFSvisit(jobid)[source]
DFSvisit(jobid)[source]

generator yield job by DFS across the tree rooted as jobid

Parameters

jobid – str.

Yield

Next job id by DFS.

__init__(history)[source]

Initialize self. See help(type(self)) for accurate signature.

children(jobid, n=1)[source]
Parameters
  • jobid – jobid or jobids list

  • n – n=0 for the lowest level

Returns

list of children jobid

end(jobid)[source]

downstream of jobid as leaves

Parameters

jobid

Returns

list

leaves()[source]
match(idprefix)[source]
parent(jobid, n=1)[source]

n-level parent task id

Parameters
  • jobid

  • n – n<=0 for the ultimate root

Returns

print_tree(jobid, file=None, _prefix='', _last=True, _show=0)[source]

print a tree to stdout with root as jobid

Parameters
  • jobid – str.

  • file – args for file= in print. Default None to stdout.

  • _prefix

  • _last

  • _show

Returns

print_trees(jobids, file=None, _prefix='', _last=True, _show=0)[source]
root(jobid)[source]
roots()[source]

Find all jobs with no parent.

Returns

List[str]. jobid list

subway.utils module

some utilities and helper function with no inner dependence

subway.utils._flatten_tolist(d, parent_key, sep)[source]
subway.utils._recover_type(v)[source]

recover “17” to 17. recover “[]” to []. recover “datetime(2020,1,1)” to datetime(2020,1,1). keep “abc” as str.

Parameters

v – str.

Returns

Any.

subway.utils._replace(replace_func, s)[source]

inner function for replace_wildcard()

Parameters
  • replace_func – Callable[[char], char]. Char to char function. eg f(a) = A, indicates %a should be replaced by A.

  • s – str, that match r”[%]+[^%]*”

Returns

str, transformed after wildcard substitution

subway.utils._transform_json(obj)[source]

default function for json.dump

Parameters

obj – Object, to be serialized

Returns

Any.

subway.utils.editor(path, **kwargs)[source]

Invoke the user’s editor. This will try to execute the following, in order: 1. $VISUAL <args> # the “visual” editor (per POSIX) 2. $EDITOR <args> # the regular editor (per POSIX) 3. some default editor (see _default_editors) with <args> If an environment variable isn’t defined, it is skipped. If it points to something that can’t be executed, we’ll print a warning. And if we can’t find anything that can be executed after searching the full list above, we’ll raise an error.

Parameters

path – str. Absolute path for file to be opened by editor.

Returns

bool. True if editor is found and open.

subway.utils.flatten_dict(d, parent_key='', sep='~')[source]

Flatten a nested dict with list. eg. transform {"a":{"b":"c", "d": ["e", "f"]}} to {"a.b": c, "a.d.list_0": "e" ...}

Parameters
  • d – Dict[str, Any]. Nested dict with possibly nested list and tuple.

  • parent_key – Optional[str]. Default "". The common key prefix.

  • sep – Optional[str], default "~". The separator between different level of keys.

Returns

Dict[str, Any]. Flattened dict.

subway.utils.load_json(conf_file)[source]

load json file and return dict

Parameters

conf_file – str, absolute json file path

Returns

Dict.

subway.utils.md5file(path)[source]

return md5 for given file

Parameters

path – str, abspath for the file.

Returns

str. md5 value

subway.utils.now_ts()[source]

get timestamps of now

Returns

float, timestamps

subway.utils.print_json(json_dict, indent=2)[source]

print python dict with pretty indent

Parameters
  • json_dict – Dict.

  • indent – Optional[int], default 2.

Returns

None.

subway.utils.replace_wildcard(replace_func, s)[source]

replace %a type wildcard in the string based on customized function replace_func.

Parameters
  • replace_func – Callable[[char], char]. Char to char function. eg f(a) = A, indicates %a should be replaced by A.

  • s – str, the string possibly with %a type wildcards, eg. “bc%de%%a”

Returns

str, the string with %a type wildcards all replaced

subway.utils.simple_template_render(template, output, var_dict)[source]

render template file to output file with variable substitution defined by var_dict. template file vars are defined within {var}.

Parameters
  • template – str. Absolute file name for template file.

  • output – str. Absolute file name for output file (create or overwrite).

  • var_dict – Dict[str, str].

Returns

None.

subway.utils.statement_parser(st)[source]

Parse “aa=b; c>13;e<f g>=h” into {“aa”: (“=”, “b”), “c”: (“>”, 13)…}

Parameters

st – str.

Returns

Dict[str, Tuple[str, Any]].

subway.utils.ts2str(ts, _format='%Y-%m-%d, %H:%M:%S')[source]

transform timestamp to corresponding formatted string

Parameters

ts – int, timestamps.

Returns

str, eg. “2020-02-02, 02:02:20”

subway.utils.which(binary, path=None)[source]

Like which in linux. Find absolute binary path given command shortcut binary.

Parameters
  • binary – str. name for binary command.

  • path – Optional[str], default None. If None, PATH is searched

Returns

Optional[str], absolute path for binary. None if binary not found in path.

subway.workflow module

Main loop function for subway entry_point

subway.workflow.main_once(checker, submitter)[source]

main entry point run checker and submitter once.

Parameters
  • checker – Checker.

  • submitter – Submitter.

Returns

None

subway.workflow.main_rt(checker, submitter, preprocessor=None, postprocessor=None, sleep_interval=10, loops=None)[source]

realtime main monitor for subway entry_point

Parameters
  • checker – Checker.

  • submitter – Submitter.

  • preprocessor – Optional[Processor]. Run before the main loop.

  • postprocessor – Optional[Processor]. Run after the main loop.

  • sleep_interval – Optional[Union[float, Tuple[float, float]]], default 10. There are two intervals, one after checker, one after submitter. If one number is given, then two intervals are the same. If tuple or list of 2 numbers are given, they are explained as (after_checker_interval, after_submitter_interval)

  • loops – Optional[int], default None. max loops before exit, none for running forever, but the monitor can still quit if no job is running as expected.

Returns

None