DAS  3.0
Das Analysis System
parallel Namespace Reference

Functions

def prepare_cmds (pref_parallel)
 
None run (shell_cmd, pref, k)
 
def main ()
 

Function Documentation

◆ main()

def parallel.main ( )
Implementation of `parallel` command, relying on homemade `prefix` lib.
60 def main():
61  """Implementation of `parallel` command, relying on homemade `prefix` lib."""
62 
63  cmds, args = prefix.preparse(
64  sys.argv,
65  tutorial=(
66  "Runs the command in parallel on local machine. Only commands "
67  "running over n-tuples may be prefixed, and must natively output "
68  "a ROOT file. The prefix command will replace the single ROOT "
69  "file with a directory containing a series of ROOT files, as well "
70  "as the standard output stream in hidden text files. Note: be nice "
71  "with your colleagues, and don't use all cores for a too long time "
72  "unless you are sure that the machine is free."
73  ),
74  multi_opt=True,
75  )
76 
77  pref_parallel = prefix.PrefixCommand(sys.argv[0], cmds)
78 
79  if cmds.help:
80  prefix.tweak_helper_multi(pref_parallel)
81 
82  if cmds.git:
83  prefix.git_hash(cmds.exec)
84 
85  if cmds.help or cmds.git:
86  sys.exit()
87 
88  pref_parallel.parse(args)
89  pref_parallel.prepare_io()
90  pref_parallel.prepare_fire_and_forget()
91 
92  with open(pref_parallel.absoutput / "parallel", "w") as backup:
93  shell_cmd = (
94  ["parallel"]
95  + [pref_parallel.cmds.exec]
96  + pref_parallel.inputs
97  + [pref_parallel.output]
98  + pref_parallel.args
99  )
100  if pref_parallel.splittable:
101  shell_cmd += ["-j", str(pref_parallel.nSplit)]
102  print(" ".join(shell_cmd), file=backup)
103 
104  if cmds.dry_run:
105  print(f"Run in parallel with `{pref_parallel.absoutput}/parallel`")
106  sys.exit()
107 
108  with Pool(pref_parallel.nSplit) as pool:
109  shell_cmds = prepare_cmds(pref_parallel)
110  pool.starmap(
111  run,
112  [(shell_cmd, pref_parallel, k) for k, shell_cmd in enumerate(shell_cmds)],
113  )
114 
115  if cmds.background:
116  print("Check the status of your tasks with `top` (or `htop` if available).")
117 
118 

◆ prepare_cmds()

def parallel.prepare_cmds (   pref_parallel)
Prepare the single commands to be run, including redirection of standard output.
18 def prepare_cmds(pref_parallel):
19  """Prepare the single commands to be run, including redirection of standard output."""
20 
21  shell_cmds = []
22  for k in range(pref_parallel.nSplit):
23  output_file = f"{k}{pref_parallel.extension}"
24  shell_cmd = (
25  [pref_parallel.cmds.exec]
26  + pref_parallel.inputs
27  + [pref_parallel.output + "/" + output_file]
28  + pref_parallel.args
29  )
30  if pref_parallel.splittable:
31  shell_cmd += ["-j", str(pref_parallel.nSplit), "-k", str(k)]
32  shell_cmd_str = " ".join(shell_cmd)
33  print(shell_cmd_str)
34 
35  output_log = pref_parallel.output + f"/.{k}.out"
36  if pref_parallel.cmds.background:
37  shell_cmd_str += f" -p {pref_parallel.name} > {output_log} 2>&1 &"
38  else:
39  shell_cmd_str += f" -p {pref_parallel.name} 2>&1 | tee {output_log}"
40  shell_cmds.append(shell_cmd_str)
41 
42  return shell_cmds
43 
44 

◆ run()

None parallel.run (   shell_cmd,
  pref,
  k 
)
Runs one single command.
45 def run(shell_cmd, pref, k) -> None:
46  """Runs one single command."""
47 
48  output = pref.output
49  nSplit = pref.nSplit
50 
51  proc = subprocess.Popen(shell_cmd, shell=True, text=True, stdout=subprocess.PIPE)
52 
53  formatting = ""
54  for line in iter(proc.stdout.readline, ""):
55  formatting = prefix.print_slice(formatting, line, k, nSplit, "")
56 
57  proc.wait()
58 
59 
parallel.prepare_cmds
def prepare_cmds(pref_parallel)
Definition: parallel.py:18
parallel.run
None run(shell_cmd, pref, k)
Definition: parallel.py:45
prefix.PrefixCommand
Definition: prefix.py:133
parallel.main
def main()
Definition: parallel.py:60
prefix.preparse
def preparse(List[str] argv, str tutorial, bool multi_opt=False, bool dag_opt=False, bool condor=False)
Definition: prefix.py:32
join
PseudoJet join(const std::vector< PseudoJet > &pieces)
Definition: fjcore.hh:1245
prefix.print_slice
def print_slice(formatting, line, nNow, nSplit, end)
Definition: prefix.py:395
metPhiCorrectionExample.range
range
Definition: metPhiCorrectionExample.py:100
prefix.git_hash
None git_hash(str exec)
Definition: prefix.py:111
prefix.tweak_helper_multi
None tweak_helper_multi(PrefixCommand prefix, bool multi_opt=True, bool dag_opt=False, bool condor=False)
Definition: prefix.py:346