DAS  3.0
Das Analysis System
parallel Namespace Reference

Functions

def prepare_cmds (pref_parallel)
 
None run (shell_cmd, output)
 
def main ()
 

Function Documentation

◆ main()

def parallel.main ( )
Implementation of `parallel` command, relying on homemade `prefix` lib.
48 def main():
49  """Implementation of `parallel` command, relying on homemade `prefix` lib."""
50 
51  cmds, args = prefix.preparse(
52  sys.argv,
53  tutorial=(
54  "Runs the command in parallel on local machine. Only commands "
55  "running over n-tuples may be prefixed, and must natively output "
56  "a ROOT file. The prefix command will replace the single ROOT "
57  "file with a directory containing a series of ROOT files, as well "
58  "as the standard output stream in hidden text files. Note: be nice "
59  "with your colleagues, and don't use all cores for a too long time "
60  "unless you are sure that the machine is free."
61  ),
62  multi_opt=True,
63  )
64 
65  pref_parallel = prefix.PrefixCommand(sys.argv[0], cmds)
66 
67  if cmds.help:
68  prefix.tweak_helper_multi(pref_parallel)
69 
70  if cmds.git:
71  prefix.git_hash(cmds.exec)
72 
73  if cmds.help or cmds.git:
74  sys.exit()
75 
76  pref_parallel.parse(args)
77  pref_parallel.prepare_io()
78  pref_parallel.prepare_fire_and_forget()
79 
80  with open(pref_parallel.absoutput / "parallel", "w") as backup:
81  shell_cmd = (
82  ["parallel"]
83  + [pref_parallel.cmds.exec]
84  + pref_parallel.inputs
85  + [pref_parallel.output]
86  + pref_parallel.args
87  )
88  if pref_parallel.splittable:
89  shell_cmd += ["-j", str(pref_parallel.nSplit)]
90  print(" ".join(shell_cmd), file=backup)
91 
92  if cmds.dry_run:
93  print(f"Run in parallel with `{pref_parallel.absoutput}/parallel`")
94  sys.exit()
95 
96  with Pool(pref_parallel.nSplit) as pool:
97  shell_cmds = prepare_cmds(pref_parallel)
98  pool.starmap(
99  run, [(shell_cmd, pref_parallel.output) for shell_cmd in shell_cmds]
100  )
101 
102  if cmds.background:
103  print("Check the status of your tasks with `top` (or `htop` if available).")
104 
105 

◆ prepare_cmds()

def parallel.prepare_cmds (   pref_parallel)
Prepare the single commands to be run, including redirection of standard output.
11 def prepare_cmds(pref_parallel):
12  """Prepare the single commands to be run, including redirection of standard output."""
13 
14  shell_cmds = []
15  for k in range(pref_parallel.nSplit):
16 
17  output_file = f"{k}{pref_parallel.extension}"
18  shell_cmd = (
19  [pref_parallel.cmds.exec]
20  + pref_parallel.inputs
21  + [pref_parallel.output + "/" + output_file]
22  + pref_parallel.args
23  )
24  if pref_parallel.splittable:
25  shell_cmd += ["-j", str(pref_parallel.nSplit), "-k", str(k)]
26  shell_cmd_str = " ".join(shell_cmd)
27  print(shell_cmd_str)
28 
29  output_log = pref_parallel.output + f"/.{k}.out"
30  if pref_parallel.cmds.background:
31  shell_cmd_str += f" -p {pref_parallel.name} > {output_log} 2>&1 &"
32  else:
33  shell_cmd_str += f" -p {pref_parallel.name} 2>&1 | tee {output_log}"
34  shell_cmds.append(shell_cmd_str)
35 
36  return shell_cmds
37 
38 

◆ run()

None parallel.run (   shell_cmd,
  output 
)
Runs one single command.
39 def run(shell_cmd, output) -> None:
40  """Runs one single command."""
41 
42  my_env = os.environ.copy()
43  my_env["LD_LIBRARY_PATH"] = my_env["LD_LIBRARY_PATH"] + f":{output}"
44  my_env["PATH"] = my_env["PATH"] + f":{output}"
45  subprocess.run(shell_cmd, check=True, shell=True, env=my_env)
46 
47 
parallel.prepare_cmds
def prepare_cmds(pref_parallel)
Definition: parallel.py:11
prefix.PrefixCommand
Definition: prefix.py:133
parallel.main
def main()
Definition: parallel.py:48
prefix.preparse
def preparse(List[str] argv, str tutorial, bool multi_opt=False, bool dag_opt=False, bool condor=False)
Definition: prefix.py:32
parallel.run
None run(shell_cmd, output)
Definition: parallel.py:39
metPhiCorrectionExample.range
range
Definition: metPhiCorrectionExample.py:100
prefix.git_hash
None git_hash(str exec)
Definition: prefix.py:111
prefix.tweak_helper_multi
None tweak_helper_multi(PrefixCommand prefix, bool multi_opt=True, bool dag_opt=False, bool condor=False)
Definition: prefix.py:323