subway package¶
subway.bootstrap module¶
actions when sub init in cli, including mkdir, touch files and render templates in templates fold
-
subway.bootstrap.
default_conf
(path)[source]¶ generate default config dict for subway project
- Parameters
path – str.
conf["work_dir] = path
- Returns
Dict. The default config dict.
-
subway.bootstrap.
env_init
(path, conf=None, fromfile=None, include_main=True)[source]¶ initialize a subway project including: mkdir .subway, and inputs/outputs check inputs/outputs dirs if possible create config.json and empty history.json within .subway render an example main.py
- Parameters
path – str. absolute path for subway project dir
conf – Optional[Dict], Default None, indicates a simple default config is applied.
fromfile – Optional[str]. Default None, indicates a default example main.py will be rendered. If specified as a full path, the corresponding file will be copied into subway project as entry point.
include_main – Optional[bool], default True. If False, main.py is not rendered.
- Returns
None.
-
subway.bootstrap.
mkdirs
(path, conf)[source]¶ mkdirs for “inputs_dir”, “outputs_dir”, “check_inputs_dir”, “check_outputs_dir”, if they are mentioned in conf dict
- Parameters
path – str. full path for subway project.
conf – Dict. config dict for subway project.
- Returns
None
-
subway.bootstrap.
render_config
(path, conf)[source]¶ generate config.json based on conf dict within .subway dir
- Parameters
path – str. full path for subway project.
conf – Dict. config dict for subway project.
- Returns
None
-
subway.bootstrap.
render_history
(path)[source]¶ generate empty history.json with only
{}
within .subway dir- Parameters
path – str. full path for subway project.
- Returns
None
-
subway.bootstrap.
render_main
(path, conf, fromfile=None)[source]¶ render main.py as detailed as possible and make its permission 700 (executable).
- Parameters
path – str. full path for subway project.
conf – Dict. config dict for subway project.
fromfile – Optional[str]. Default None, indicates a default example main.py will be rendered. If specified as a full path, the corresponding file will be copied into subway project as entry point.
- Returns
None.
subway.cli module¶
cli interface for sub init and more
-
class
subway.cli.
SubwayCLI
(_argv=None, _test=False)[source]¶ Bases:
object
-
__init__
(_argv=None, _test=False)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
static
_get
(s, k, _default=None)[source]¶ _get({"a": {"b":1}}, "a.b") = 1
- Parameters
s – Dict.
k – str. Key string separating with “.”
_default – Any, default return when nothing match. Default value is None.
- Returns
-
_meta_query_key
(key)[source]¶ meta function to generate member function for this class, generate function as follows
def query_checked(self): if self.jid: print("1" if self.history[self.jid]['state'] == "checked" else "0") else: r = [] for j, s in self.history.items(): if s['state'] == "checked": r.append(j) print(*r, sep="\n")
- Parameters
key – str. The added function will be named as
query_key
- Returns
None
-
static
_set
(s, k, v)[source]¶ _set({"a":{"b":{"c":2}, "zz":"pp"}}, "a.b.m", "q")={'a': {'b': {'c': 2, 'm': 'q'}, 'zz': 'pp'}}
- Parameters
s – Dict.
k – Any.
v – Any.
- Returns
Dict. after the set
-
c
()¶
-
d
()¶
-
h
()¶
-
i
()¶
-
q
()¶
-
query_i
()¶
-
query_l
()¶
-
query_o
()¶
-
query_r
()¶
-
query_s
()¶
-
query_t
()¶
-
r
()¶
-
subway.config module¶
import config from config.json
subway.cons module¶
Workaround to configure work_dir
in runtime.
subway.exceptions module¶
Exception classes for subway.
subway.framework module¶
Basic classes: submitter, checker and processor for subway main loop
-
class
subway.framework.
Checker
[source]¶ Bases:
abc.ABC
Empty base abstract class for checker.
-
_abc_impl
= <_abc_data object>¶
-
-
class
subway.framework.
PlainChk
(params=None, **kwargs)[source]¶ Bases:
subway.framework.Checker
The very general checker class.
-
_abc_impl
= <_abc_data object>¶
-
check_aborted
(jobid)[source]¶ DIY: depends. Check job whose state is
aborted
. These jobs failed the main task, and this method should generate resolve task and resolve input if necessary. If resolve job is not under control of submitter, the default implementation is ok.- Parameters
jobid – str.
- Returns
List[Tuple[str, Dict]]. The length of the list must be 0 or 1.
-
abstract
check_checking
(jobid)[source]¶ DIY: must. Check job whose state is
checking
. Jobs are sent to this method only whenis_checked()
returnsTrue
. Generate possible new jobs and inputs based on results from main task and possibly check task. The very core of checker class.- Parameters
jobid – str.
- Returns
List[Tuple[str, Dict]]. [(jobid, resource dict), …]
-
check_finished
(jobid)[source]¶ DIY: depends. Check job whose state is
finished
. Namely, the main task has been computed sucessfully, we need to check output of main task to further generate check job and associated check input. On the other hand, if there is no need for check job to be controlled by submitter (i.e. one simply check the main output in checker and generate new jobs), one can omit this method, since the default implementation is used for such case (no check job scenario).- Parameters
jobid – str.
- Returns
List[Tuple[str, Dict]]. The length of the list must be 0 (no check job) or 1 (namely check job must be one to one correspondence with main job).
-
check_kickstart
()[source]¶ DIY: strongly recommended. Generate jobs with inputs at the beginning. The default impl does nothing. If the user doesn’t define this method in subclass, then the user must add inputs files and possible items in empty history.json by hand.
- Returns
List[Tuple[str, Dict]]. [(jobid, resource dict)]
-
check_resolving
(jobid)[source]¶ DIY: depends. Check job whose state is
resolving
. Jobs are sent to this method only whenis_resolved()
returnsTrue
. If the user case has no consideration on error tolerant and aborted states, the method can be left as it is doing nothing.- Parameters
jobid – str.
- Returns
List[Tuple[str, Dict]]. [(jobid, resource dict), …]
-
check_task
(jobid=None)[source]¶ DIY: not recommended. Determine whether output is “converged” and if not, generate new input files for more calculation with the new jobid and resource returned. The inner process will dispatch to
check_finished()
,check_aborted()
check_checking()
,check_resolving()
.- Parameters
jobid – Optional[str, None]. Default None means kickstart generation for inputs tasks.
- Returns
List[Tuple[str, Dict]]. [(next jobid, resource dict)]
-
ending_time
(jobid)[source]¶ DIY: depends. This method is reponsible for 4 states:
checked
,resolved
,frustrated
,failed
. The default impl is now.- Parameters
jobid – str.
- Returns
float. timestamp.
-
finishing_time
(jobid)[source]¶ DIY: depends. This method is reponsible for 2 states:
finished
andaborted
. The default impl forfinished
state is last modifed time of output file.- Parameters
jobid – str.
- Returns
float. timestamp.
-
is_aborted
(jobid)[source]¶ DIY: depends. Whether a
running
task is failed.- Parameters
jobid – str.
- Returns
bool.
-
is_checked
(jobid)[source]¶ DIY: depends. Whether a
checking
task is finished.- Parameters
jobid – str.
- Returns
bool.
-
is_failed
(jobid)[source]¶ DIY: depends. Whether a
resolving
task is failed.- Parameters
jobid – str.
- Returns
bool.
-
is_finished
(jobid)[source]¶ DIY: strongly recommended. Whether a
running
task is finished. The default impl is to check whether output file exists. It is not a good criteria for finished job in most cases.- Parameters
jobid – str.
- Returns
-
is_frustrated
(jobid)[source]¶ DIY: depends. Whether a
checking
task is failed.- Parameters
jobid – str.
- Returns
bool.
-
is_resolved
(jobid)[source]¶ DIY: depends. Whether a
resolving
task is finished.- Parameters
jobid – str.
- Returns
bool.
-
post_new_input
(jobid, resource=None, prev=None)[source]¶ DIY: not recommended. Make some inplace changes on history dict after new task is generated, it is vital for history record system. This method is not recommended to customize in subclass of checker, at least, one should include it by
super().post_new_input
.- Parameters
jobid – str. new job id.
resource – Optional[Dict]. resource dict for the new job.
prev – Optional[str]. Parent job id. Default None indicates no prev job.
- Returns
None.
-
-
class
subway.framework.
PlainSub
(**kwargs)[source]¶ Bases:
subway.framework.Submitter
The very general submitter class.
-
_abc_impl
= <_abc_data object>¶
-
is_restricted
()[source]¶ DIY: depends. The default impl is ok if you only want to restrict some resource in simple and soft way. check the restriction is ok for now with all running tasks in consideration.
- Returns
bool. whether the total resource limit is reached.
-
priority_task
()[source]¶ DIY: depends. The default impl prefer jobs in aborted state, then finished state and finally pending state. Plus, the default impl prefer older jobs. choose the first task to be run from pending, finished and aborted queue
- Returns
str, jobid.
-
submit_aborted
(jobid)[source]¶ DIY: depends. If you have to deal with DS scheme as in Double vs. Single Submitter together with fault tolerance, then you must implement this method.
- Parameters
jobid – str.
- Returns
-
submit_finished
(jobid)[source]¶ DIY: depends. If you have to deal with DS scheme as in Double vs. Single Submitter, then you must implement this method.
- Parameters
jobid – str.
- Returns
-
submit_job
(jobid)[source]¶ DIY: not recommend. submit job, for example, generate sbatch file and submit it to slurm. This method automaticall dispatch to
submit_pending()
,submit_finished()
andsubmit_aborted()
.- Parameters
jobid – str.
- Returns
None.
-
-
class
subway.framework.
PreProcessor
(pipeline=None, **kwargs)[source]¶ Bases:
subway.framework.Processor
-
_abc_impl
= <_abc_data object>¶
-
subway.htree module¶
Forest of multiple trees of tasks
-
class
subway.htree.
HTree
(history)[source]¶ Bases:
object
Trees structure for tasks recorded in history.json.
-
DFSvisit
(jobid)[source]¶ generator yield job by DFS across the tree rooted as
jobid
- Parameters
jobid – str.
- Yield
Next job id by DFS.
-
children
(jobid, n=1)[source]¶ - Parameters
jobid – jobid or jobids list
n – n=0 for the lowest level
- Returns
list of children jobid
-
parent
(jobid, n=1)[source]¶ n-level parent task id
- Parameters
jobid –
n – n<=0 for the ultimate root
- Returns
-
subway.utils module¶
some utilities and helper function with no inner dependence
-
subway.utils.
_recover_type
(v)[source]¶ recover “17” to 17. recover “[]” to []. recover “datetime(2020,1,1)” to datetime(2020,1,1). keep “abc” as str.
- Parameters
v – str.
- Returns
Any.
-
subway.utils.
_replace
(replace_func, s)[source]¶ inner function for
replace_wildcard()
- Parameters
replace_func – Callable[[char], char]. Char to char function. eg f(a) = A, indicates %a should be replaced by A.
s – str, that match r”[%]+[^%]*”
- Returns
str, transformed after wildcard substitution
-
subway.utils.
_transform_json
(obj)[source]¶ default function for json.dump
- Parameters
obj – Object, to be serialized
- Returns
Any.
-
subway.utils.
editor
(path, **kwargs)[source]¶ Invoke the user’s editor. This will try to execute the following, in order: 1. $VISUAL <args> # the “visual” editor (per POSIX) 2. $EDITOR <args> # the regular editor (per POSIX) 3. some default editor (see
_default_editors
) with <args> If an environment variable isn’t defined, it is skipped. If it points to something that can’t be executed, we’ll print a warning. And if we can’t find anything that can be executed after searching the full list above, we’ll raise an error.- Parameters
path – str. Absolute path for file to be opened by editor.
- Returns
bool. True if editor is found and open.
-
subway.utils.
flatten_dict
(d, parent_key='', sep='~')[source]¶ Flatten a nested dict with list. eg. transform
{"a":{"b":"c", "d": ["e", "f"]}}
to{"a.b": c, "a.d.list_0": "e" ...}
- Parameters
d – Dict[str, Any]. Nested dict with possibly nested list and tuple.
parent_key – Optional[str]. Default
""
. The common key prefix.sep – Optional[str], default
"~"
. The separator between different level of keys.
- Returns
Dict[str, Any]. Flattened dict.
-
subway.utils.
load_json
(conf_file)[source]¶ load json file and return dict
- Parameters
conf_file – str, absolute json file path
- Returns
Dict.
-
subway.utils.
md5file
(path)[source]¶ return md5 for given file
- Parameters
path – str, abspath for the file.
- Returns
str. md5 value
-
subway.utils.
print_json
(json_dict, indent=2)[source]¶ print python dict with pretty indent
- Parameters
json_dict – Dict.
indent – Optional[int], default 2.
- Returns
None.
-
subway.utils.
replace_wildcard
(replace_func, s)[source]¶ replace %a type wildcard in the string based on customized function replace_func.
- Parameters
replace_func – Callable[[char], char]. Char to char function. eg f(a) = A, indicates %a should be replaced by A.
s – str, the string possibly with %a type wildcards, eg. “bc%de%%a”
- Returns
str, the string with %a type wildcards all replaced
-
subway.utils.
simple_template_render
(template, output, var_dict)[source]¶ render template file to output file with variable substitution defined by var_dict. template file vars are defined within {var}.
- Parameters
template – str. Absolute file name for template file.
output – str. Absolute file name for output file (create or overwrite).
var_dict – Dict[str, str].
- Returns
None.
-
subway.utils.
statement_parser
(st)[source]¶ Parse “aa=b; c>13;e<f g>=h” into {“aa”: (“=”, “b”), “c”: (“>”, 13)…}
- Parameters
st – str.
- Returns
Dict[str, Tuple[str, Any]].
-
subway.utils.
ts2str
(ts, _format='%Y-%m-%d, %H:%M:%S')[source]¶ transform timestamp to corresponding formatted string
- Parameters
ts – int, timestamps.
- Returns
str, eg. “2020-02-02, 02:02:20”
-
subway.utils.
which
(binary, path=None)[source]¶ Like which in linux. Find absolute binary path given command shortcut binary.
- Parameters
binary – str. name for binary command.
path – Optional[str], default None. If None, PATH is searched
- Returns
Optional[str], absolute path for binary. None if binary not found in path.
subway.workflow module¶
Main loop function for subway entry_point
-
subway.workflow.
main_once
(checker, submitter)[source]¶ main entry point run checker and submitter once.
- Parameters
checker – Checker.
submitter – Submitter.
- Returns
None
-
subway.workflow.
main_rt
(checker, submitter, preprocessor=None, postprocessor=None, sleep_interval=10, loops=None)[source]¶ realtime main monitor for subway entry_point
- Parameters
checker – Checker.
submitter – Submitter.
preprocessor – Optional[Processor]. Run before the main loop.
postprocessor – Optional[Processor]. Run after the main loop.
sleep_interval – Optional[Union[float, Tuple[float, float]]], default 10. There are two intervals, one after checker, one after submitter. If one number is given, then two intervals are the same. If tuple or list of 2 numbers are given, they are explained as (after_checker_interval, after_submitter_interval)
loops – Optional[int], default None. max loops before exit, none for running forever, but the monitor can still quit if no job is running as expected.
- Returns
None