 |
DAS
3.0
Das Analysis System
|
◆ main()
Implementation of `parallel` command, relying on homemade `prefix` lib.
61 """Implementation of `parallel` command, relying on homemade `prefix` lib."""
66 "Runs the command in parallel on local machine. Only commands "
67 "running over n-tuples may be prefixed, and must natively output "
68 "a ROOT file. The prefix command will replace the single ROOT "
69 "file with a directory containing a series of ROOT files, as well "
70 "as the standard output stream in hidden text files. Note: be nice "
71 "with your colleagues, and don't use all cores for a too long time "
72 "unless you are sure that the machine is free."
85 if cmds.help
or cmds.git:
88 pref_parallel.parse(args)
89 pref_parallel.prepare_io()
90 pref_parallel.prepare_fire_and_forget()
92 with open(pref_parallel.absoutput /
"parallel",
"w")
as backup:
95 + [pref_parallel.cmds.exec]
96 + pref_parallel.inputs
97 + [pref_parallel.output]
100 if pref_parallel.splittable:
101 shell_cmd += [
"-j", str(pref_parallel.nSplit)]
102 print(
" ".
join(shell_cmd), file=backup)
105 print(f
"Run in parallel with `{pref_parallel.absoutput}/parallel`")
108 with Pool(pref_parallel.nSplit)
as pool:
112 [(shell_cmd, pref_parallel, k)
for k, shell_cmd
in enumerate(shell_cmds)],
116 print(
"Check the status of your tasks with `top` (or `htop` if available).")
◆ prepare_cmds()
def parallel.prepare_cmds |
( |
|
pref_parallel | ) |
|
Prepare the single commands to be run, including redirection of standard output.
19 """Prepare the single commands to be run, including redirection of standard output."""
22 for k
in range(pref_parallel.nSplit):
23 output_file = f
"{k}{pref_parallel.extension}"
25 [pref_parallel.cmds.exec]
26 + pref_parallel.inputs
27 + [pref_parallel.output +
"/" + output_file]
30 if pref_parallel.splittable:
31 shell_cmd += [
"-j", str(pref_parallel.nSplit),
"-k", str(k)]
32 shell_cmd_str =
" ".
join(shell_cmd)
35 output_log = pref_parallel.output + f
"/.{k}.out"
36 if pref_parallel.cmds.background:
37 shell_cmd_str += f
" -p {pref_parallel.name} > {output_log} 2>&1 &"
39 shell_cmd_str += f
" -p {pref_parallel.name} 2>&1 | tee {output_log}"
40 shell_cmds.append(shell_cmd_str)
◆ run()
None parallel.run |
( |
|
shell_cmd, |
|
|
|
pref, |
|
|
|
k |
|
) |
| |
Runs one single command.
45 def run(shell_cmd, pref, k) -> None:
46 """Runs one single command."""
51 proc = subprocess.Popen(shell_cmd, shell=
True, text=
True, stdout=subprocess.PIPE)
54 for line
in iter(proc.stdout.readline,
""):
def prepare_cmds(pref_parallel)
Definition: parallel.py:18
None run(shell_cmd, pref, k)
Definition: parallel.py:45
Definition: prefix.py:133
def main()
Definition: parallel.py:60
def preparse(List[str] argv, str tutorial, bool multi_opt=False, bool dag_opt=False, bool condor=False)
Definition: prefix.py:32
PseudoJet join(const std::vector< PseudoJet > &pieces)
Definition: fjcore.hh:1245
def print_slice(formatting, line, nNow, nSplit, end)
Definition: prefix.py:395
range
Definition: metPhiCorrectionExample.py:100
None git_hash(str exec)
Definition: prefix.py:111
None tweak_helper_multi(PrefixCommand prefix, bool multi_opt=True, bool dag_opt=False, bool condor=False)
Definition: prefix.py:346