DAS  3.0
Das Analysis System
parallel Namespace Reference

Functions

def prepare_cmds (pref_parallel)
 
None run (shell_cmd, pref, k)
 
def main ()
 

Function Documentation

◆ main()

def parallel.main ( )
Implementation of `parallel` command, relying on homemade `prefix` lib.
65 def main():
66  """Implementation of `parallel` command, relying on homemade `prefix` lib."""
67 
68  cmds, args = prefix.preparse(
69  sys.argv,
70  tutorial=(
71  "Runs the command in parallel on local machine. Only commands "
72  "running over n-tuples may be prefixed, and must natively output "
73  "a ROOT file. The prefix command will replace the single ROOT "
74  "file with a directory containing a series of ROOT files, as well "
75  "as the standard output stream in hidden text files. Note: be nice "
76  "with your colleagues, and don't use all cores for a too long time "
77  "unless you are sure that the machine is free."
78  ),
79  multi_opt=True,
80  )
81 
82  pref_parallel = prefix.PrefixCommand(sys.argv[0], cmds)
83 
84  if cmds.help:
85  prefix.tweak_helper_multi(pref_parallel)
86 
87  if cmds.git:
88  prefix.git_hash(cmds.exec)
89 
90  if cmds.help or cmds.git:
91  sys.exit()
92 
93  pref_parallel.parse(args)
94  pref_parallel.prepare_io()
95  pref_parallel.prepare_fire_and_forget()
96 
97  with open(pref_parallel.absoutput / "parallel", "w") as backup:
98  shell_cmd = (
99  ["parallel"]
100  + [pref_parallel.cmds.exec]
101  + pref_parallel.inputs
102  + [pref_parallel.output]
103  + pref_parallel.args
104  )
105  if pref_parallel.splittable:
106  shell_cmd += ["-j", str(pref_parallel.nSplit)]
107  print(" ".join(shell_cmd), file=backup)
108 
109  if cmds.dry_run:
110  print(f"Run in parallel with `{pref_parallel.absoutput}/parallel`")
111  sys.exit()
112 
113  with Pool(pref_parallel.nSplit) as pool:
114  shell_cmds = prepare_cmds(pref_parallel)
115  pool.starmap(
116  run, [(shell_cmd, pref_parallel, k) for k, shell_cmd in enumerate(shell_cmds)]
117  )
118 
119  if cmds.background:
120  print("Check the status of your tasks with `top` (or `htop` if available).")
121 
122 

◆ prepare_cmds()

def parallel.prepare_cmds (   pref_parallel)
Prepare the single commands to be run, including redirection of standard output.
18 def prepare_cmds(pref_parallel):
19  """Prepare the single commands to be run, including redirection of standard output."""
20 
21  shell_cmds = []
22  for k in range(pref_parallel.nSplit):
23 
24  output_file = f"{k}{pref_parallel.extension}"
25  shell_cmd = (
26  [pref_parallel.cmds.exec]
27  + pref_parallel.inputs
28  + [pref_parallel.output + "/" + output_file]
29  + pref_parallel.args
30  )
31  if pref_parallel.splittable:
32  shell_cmd += ["-j", str(pref_parallel.nSplit), "-k", str(k)]
33  shell_cmd_str = " ".join(shell_cmd)
34  print(shell_cmd_str)
35 
36  output_log = pref_parallel.output + f"/.{k}.out"
37  if pref_parallel.cmds.background:
38  shell_cmd_str += f" -p {pref_parallel.name} > {output_log} 2>&1 &"
39  else:
40  shell_cmd_str += f" -p {pref_parallel.name} 2>&1 | tee {output_log}"
41  shell_cmds.append(shell_cmd_str)
42 
43  return shell_cmds
44 
45 

◆ run()

None parallel.run (   shell_cmd,
  pref,
  k 
)
Runs one single command.
46 def run(shell_cmd, pref, k) -> None:
47  """Runs one single command."""
48 
49  output=pref.output
50  nSplit=pref.nSplit
51 
52  proc = subprocess.Popen(shell_cmd,
53  shell=True,
54  text=True,
55  stdout=subprocess.PIPE
56  )
57 
58  formatting=""
59  for line in iter(proc.stdout.readline, ""):
60  formatting = prefix.print_slice(formatting, line, k, nSplit, '')
61 
62  proc.wait()
63 
64 
parallel.prepare_cmds
def prepare_cmds(pref_parallel)
Definition: parallel.py:18
parallel.run
None run(shell_cmd, pref, k)
Definition: parallel.py:46
prefix.PrefixCommand
Definition: prefix.py:133
parallel.main
def main()
Definition: parallel.py:65
prefix.preparse
def preparse(List[str] argv, str tutorial, bool multi_opt=False, bool dag_opt=False, bool condor=False)
Definition: prefix.py:32
prefix.print_slice
def print_slice(formatting, line, nNow, nSplit, end)
Definition: prefix.py:395
metPhiCorrectionExample.range
range
Definition: metPhiCorrectionExample.py:100
prefix.git_hash
None git_hash(str exec)
Definition: prefix.py:111
prefix.tweak_helper_multi
None tweak_helper_multi(PrefixCommand prefix, bool multi_opt=True, bool dag_opt=False, bool condor=False)
Definition: prefix.py:346