 |
DAS
3.0
Das Analysis System
|
◆ main()
Implementation of `parallel` command, relying on homemade `prefix` lib.
66 """Implementation of `parallel` command, relying on homemade `prefix` lib."""
71 "Runs the command in parallel on local machine. Only commands "
72 "running over n-tuples may be prefixed, and must natively output "
73 "a ROOT file. The prefix command will replace the single ROOT "
74 "file with a directory containing a series of ROOT files, as well "
75 "as the standard output stream in hidden text files. Note: be nice "
76 "with your colleagues, and don't use all cores for a too long time "
77 "unless you are sure that the machine is free."
90 if cmds.help
or cmds.git:
93 pref_parallel.parse(args)
94 pref_parallel.prepare_io()
95 pref_parallel.prepare_fire_and_forget()
97 with open(pref_parallel.absoutput /
"parallel",
"w")
as backup:
100 + [pref_parallel.cmds.exec]
101 + pref_parallel.inputs
102 + [pref_parallel.output]
105 if pref_parallel.splittable:
106 shell_cmd += [
"-j", str(pref_parallel.nSplit)]
107 print(
" ".join(shell_cmd), file=backup)
110 print(f
"Run in parallel with `{pref_parallel.absoutput}/parallel`")
113 with Pool(pref_parallel.nSplit)
as pool:
116 run, [(shell_cmd, pref_parallel, k)
for k, shell_cmd
in enumerate(shell_cmds)]
120 print(
"Check the status of your tasks with `top` (or `htop` if available).")
◆ prepare_cmds()
def parallel.prepare_cmds |
( |
|
pref_parallel | ) |
|
Prepare the single commands to be run, including redirection of standard output.
19 """Prepare the single commands to be run, including redirection of standard output."""
22 for k
in range(pref_parallel.nSplit):
24 output_file = f
"{k}{pref_parallel.extension}"
26 [pref_parallel.cmds.exec]
27 + pref_parallel.inputs
28 + [pref_parallel.output +
"/" + output_file]
31 if pref_parallel.splittable:
32 shell_cmd += [
"-j", str(pref_parallel.nSplit),
"-k", str(k)]
33 shell_cmd_str =
" ".join(shell_cmd)
36 output_log = pref_parallel.output + f
"/.{k}.out"
37 if pref_parallel.cmds.background:
38 shell_cmd_str += f
" -p {pref_parallel.name} > {output_log} 2>&1 &"
40 shell_cmd_str += f
" -p {pref_parallel.name} 2>&1 | tee {output_log}"
41 shell_cmds.append(shell_cmd_str)
◆ run()
None parallel.run |
( |
|
shell_cmd, |
|
|
|
pref, |
|
|
|
k |
|
) |
| |
Runs one single command.
46 def run(shell_cmd, pref, k) -> None:
47 """Runs one single command."""
52 proc = subprocess.Popen(shell_cmd,
55 stdout=subprocess.PIPE
59 for line
in iter(proc.stdout.readline,
""):
def prepare_cmds(pref_parallel)
Definition: parallel.py:18
None run(shell_cmd, pref, k)
Definition: parallel.py:46
Definition: prefix.py:133
def main()
Definition: parallel.py:65
def preparse(List[str] argv, str tutorial, bool multi_opt=False, bool dag_opt=False, bool condor=False)
Definition: prefix.py:32
def print_slice(formatting, line, nNow, nSplit, end)
Definition: prefix.py:395
range
Definition: metPhiCorrectionExample.py:100
None git_hash(str exec)
Definition: prefix.py:111
None tweak_helper_multi(PrefixCommand prefix, bool multi_opt=True, bool dag_opt=False, bool condor=False)
Definition: prefix.py:346