DAS  3.0
Das Analysis System
PrefixCommand

Description

General utilities for prefix commands (try, parallel, submit).
The explicit options (`cmds`) are unchanged; among the implicit options,
the inputs are checked (if any), and the output is adapted to each thread.

Public Member Functions

None __init__ (self, str prefix, List[str] cmds)
 
None parse (self, List[str] args)
 
int get_entries (self, input)
 
None prepare_io (self, bool multi=True)
 
None prepare_fire_and_forget (self)
 
None clean_up_last_run (self)
 
None __init__ (self, str prefix, List[str] cmds)
 
None parse (self, List[str] args)
 
int get_entries (self, input)
 
None prepare_io (self, bool multi=True)
 
None prepare_fire_and_forget (self)
 
None clean_up_last_run (self)
 

Public Attributes

 name
 
 cmds
 
 helper
 
 splittable
 
 nSplit
 
 extension
 
 inputs
 
 output
 
 args
 
 absinputs
 
 absoutput
 

Static Public Attributes

 str
 
 Path
 

Private Member Functions

str _help_text (self)
 
List[str_synopsis (self)
 
None _prepare_single_output (self)
 
None _prepare_multi_output (self)
 
None _prepare_input (self)
 
None _save_environment (self)
 
str _help_text (self)
 
List[str_synopsis (self)
 
None _prepare_single_output (self)
 
None _prepare_multi_output (self)
 
None _prepare_input (self)
 
None _save_environment (self)
 

Constructor & Destructor Documentation

◆ __init__() [1/2]

None __init__ (   self,
str  prefix,
List[str cmds 
)
Constructor from a separated list of arguments.
152  def __init__(self, prefix: str, cmds: List[str]) -> None:
153  """Constructor from a separated list of arguments."""
154 
155  self.name = os.path.basename(prefix)
156  self.cmds = cmds
157 
158  self.helper = self._help_text()
159  if self.helper[0].count("output") != 1:
160  raise ValueError(
161  f"`{self.cmds.exec}`"
162  + " does not match the requirements to be run with this prefix command: "
163  + "there must be exactly one `output` argument."
164  )
165 
166  self.splittable = ["nSplit" in row for row in self.helper[1:]].count(True)
167  self.nSplit = cmds.nSplit if self.splittable else 1
168 
169  self.extension = (
170  check_output([self.cmds.exec] + ["-o"])
171  .decode()
172  .strip()
173  .split("\n")[0]
174  .split(" ")[0]
175  )
176 

◆ __init__() [2/2]

None __init__ (   self,
str  prefix,
List[str cmds 
)
Constructor from a separated list of arguments.
152  def __init__(self, prefix: str, cmds: List[str]) -> None:
153  """Constructor from a separated list of arguments."""
154 
155  self.name = os.path.basename(prefix)
156  self.cmds = cmds
157 
158  self.helper = self._help_text()
159  if self.helper[0].count("output") != 1:
160  raise ValueError(
161  f"`{self.cmds.exec}`"
162  + " does not match the requirements to be run with this prefix command: "
163  + "there must be exactly one `output` argument."
164  )
165 
166  self.splittable = ["nSplit" in row for row in self.helper[1:]].count(True)
167  self.nSplit = cmds.nSplit if self.splittable else 1
168 
169  self.extension = (
170  check_output([self.cmds.exec] + ["-o"])
171  .decode()
172  .strip()
173  .split("\n")[0]
174  .split(" ")[0]
175  )
176 

Member Function Documentation

◆ _help_text() [1/2]

str _help_text (   self)
private
Returns the command's normal help text.
177  def _help_text(self) -> str:
178  """Returns the command's normal help text."""
179  # if executable is given, then the helper is adapted to match with the prefix command
180  text = (
181  check_output(self.cmds.exec + " -h", shell=True)
182  .decode()
183  .strip()
184  .split("\n")
185  )
186  return text
187 

◆ _help_text() [2/2]

str _help_text (   self)
private
Returns the command's normal help text.
177  def _help_text(self) -> str:
178  """Returns the command's normal help text."""
179  # if executable is given, then the helper is adapted to match with the prefix command
180  text = (
181  check_output(self.cmds.exec + " -h", shell=True)
182  .decode()
183  .strip()
184  .split("\n")
185  )
186  return text
187 

◆ _prepare_input() [1/2]

None _prepare_input (   self)
private
Resolve the paths to the input(s).
262  def _prepare_input(self) -> None:
263  """Resolve the paths to the input(s)."""
264 
265  for x in self.inputs:
266  self.absinputs += glob(x)
267  if len(self.inputs) > 0 and len(self.absinputs) == 0:
268  raise ValueError("Inputs could not be found: " + " ".join(self.inputs))
269 
270  self.absinputs = [str(Path(x).resolve()) for x in self.absinputs]
271 

◆ _prepare_input() [2/2]

None _prepare_input (   self)
private
Resolve the paths to the input(s).
262  def _prepare_input(self) -> None:
263  """Resolve the paths to the input(s)."""
264 
265  for x in self.inputs:
266  self.absinputs += glob(x)
267  if len(self.inputs) > 0 and len(self.absinputs) == 0:
268  raise ValueError("Inputs could not be found: " + " ".join(self.inputs))
269 
270  self.absinputs = [str(Path(x).resolve()) for x in self.absinputs]
271 

◆ _prepare_multi_output() [1/2]

None _prepare_multi_output (   self)
private
Prepare the members for parallel running (local or on farm).
231  def _prepare_multi_output(self) -> None:
232  """Prepare the members for parallel running (local or on farm)."""
233 
234  if self.absoutput.suffix != "":
235  raise ValueError(
236  f"A directory with an extension is misleading: {self.output}"
237  )
238 
239  if self.absoutput.exists():
240  if self.absoutput.is_file():
241  raise ValueError(self.output + " is not a directory")
242 
243  new_rootfiles = []
244  for k in range(self.nSplit):
245  rootfile = self.absoutput / f"{k}{self.extension}"
246  if rootfile.exists() and not os.access(rootfile, os.W_OK):
247  raise ValueError(
248  "The root files in " + self.output + " are not writable"
249  )
250  new_rootfiles += [str(rootfile)]
251 
252  existing_rootfiles = glob(str(self.absoutput / f"*{self.extension}"))
253  if len(set(existing_rootfiles).difference(new_rootfiles)) > 0:
254  raise ValueError(
255  "Residual files from a former run will not be overwritten."
256  )
257  else:
258  self.absoutput.mkdir(parents=True)
259 
260  self._save_environment()
261 

◆ _prepare_multi_output() [2/2]

None _prepare_multi_output (   self)
private
Prepare the members for parallel running (local or on farm).
231  def _prepare_multi_output(self) -> None:
232  """Prepare the members for parallel running (local or on farm)."""
233 
234  if self.absoutput.suffix != "":
235  raise ValueError(
236  f"A directory with an extension is misleading: {self.output}"
237  )
238 
239  if self.absoutput.exists():
240  if self.absoutput.is_file():
241  raise ValueError(self.output + " is not a directory")
242 
243  new_rootfiles = []
244  for k in range(self.nSplit):
245  rootfile = self.absoutput / f"{k}{self.extension}"
246  if rootfile.exists() and not os.access(rootfile, os.W_OK):
247  raise ValueError(
248  "The root files in " + self.output + " are not writable"
249  )
250  new_rootfiles += [str(rootfile)]
251 
252  existing_rootfiles = glob(str(self.absoutput / f"*{self.extension}"))
253  if len(set(existing_rootfiles).difference(new_rootfiles)) > 0:
254  raise ValueError(
255  "Residual files from a former run will not be overwritten."
256  )
257  else:
258  self.absoutput.mkdir(parents=True)
259 
260  self._save_environment()
261 

◆ _prepare_single_output() [1/2]

None _prepare_single_output (   self)
private
Prepare the working area for a single-run command.
215  def _prepare_single_output(self) -> None:
216  """Prepare the working area for a single-run command."""
217 
218  if self.absoutput.exists():
219  if not os.access(self.absoutput, os.W_OK):
220  raise ValueError(self.absoutput + " is not writable")
221  if self.absoutput.is_dir():
222  self.absoutput /= "0" + self.extension
223  else:
224  if len(self.absoutput.suffix) == 0: # then assume it's a directory
225  self.absoutput.mkdir(parents=True)
226  self.absoutput /= "0" + self.extension
227 
228  if self.absoutput.is_dir():
229  self._save_environment()
230 

◆ _prepare_single_output() [2/2]

None _prepare_single_output (   self)
private
Prepare the working area for a single-run command.
215  def _prepare_single_output(self) -> None:
216  """Prepare the working area for a single-run command."""
217 
218  if self.absoutput.exists():
219  if not os.access(self.absoutput, os.W_OK):
220  raise ValueError(self.absoutput + " is not writable")
221  if self.absoutput.is_dir():
222  self.absoutput /= "0" + self.extension
223  else:
224  if len(self.absoutput.suffix) == 0: # then assume it's a directory
225  self.absoutput.mkdir(parents=True)
226  self.absoutput /= "0" + self.extension
227 
228  if self.absoutput.is_dir():
229  self._save_environment()
230 

◆ _save_environment() [1/2]

None _save_environment (   self)
private
Save environment in a file.
272  def _save_environment(self) -> None:
273  """Save environment in a file."""
274 
275  env = self.absoutput / ".env"
276  with open(env, "w") as f:
277  for key, value in os.environ.items():
278  if key in ["PWD", "OLDPWD", "TMPDIR", "KRB5CCNAME", "EOS_MGM_URL"]:
279  continue
280  if "BASH" in key:
281  continue
282  value = shlex.quote(value)
283  print(f"export {key}={value}", file=f)
284  os.chmod(env, stat.S_IRWXU)
285 

◆ _save_environment() [2/2]

None _save_environment (   self)
private
Save environment in a file.
272  def _save_environment(self) -> None:
273  """Save environment in a file."""
274 
275  env = self.absoutput / ".env"
276  with open(env, "w") as f:
277  for key, value in os.environ.items():
278  if key in ["PWD", "OLDPWD", "TMPDIR", "KRB5CCNAME", "EOS_MGM_URL"]:
279  continue
280  if "BASH" in key:
281  continue
282  value = shlex.quote(value)
283  print(f"export {key}={value}", file=f)
284  os.chmod(env, stat.S_IRWXU)
285 

◆ _synopsis() [1/2]

List[str] _synopsis (   self)
private
Extract synopsis from helper (after undoing the bold characters).
188  def _synopsis(self) -> List[str]:
189  """Extract synopsis from helper (after undoing the bold characters)."""
190  synopsis = self.helper[0].replace("\x1b[1m", "").replace("\x1b[0m", "")
191  return synopsis.split()[1:]
192 

◆ _synopsis() [2/2]

List[str] _synopsis (   self)
private
Extract synopsis from helper (after undoing the bold characters).
188  def _synopsis(self) -> List[str]:
189  """Extract synopsis from helper (after undoing the bold characters)."""
190  synopsis = self.helper[0].replace("\x1b[1m", "").replace("\x1b[0m", "")
191  return synopsis.split()[1:]
192 

◆ clean_up_last_run() [1/2]

None clean_up_last_run (   self)
Removes the executable, libraries, hidden files, and dictionaries from the output directory.
334  def clean_up_last_run(self) -> None:
335  """Removes the executable, libraries, hidden files, and dictionaries from the output directory."""
336 
337  exec = Path(which(self.cmds.exec)).name
338 
339  for file in [exec, ".env", ".condor"]:
340  file = self.absoutput / Path(file)
341  if file.is_file():
342  os.remove(str(file))
343 
344  for ext in ["so", "pcm"]:
345  for file in glob(str(self.absoutput) + f"/*{ext}"):
346  print("Removing " + file)
347  os.remove(file)
348 
349 

◆ clean_up_last_run() [2/2]

None clean_up_last_run (   self)
Removes the executable, libraries, hidden files, and dictionaries from the output directory.
334  def clean_up_last_run(self) -> None:
335  """Removes the executable, libraries, hidden files, and dictionaries from the output directory."""
336 
337  exec = Path(which(self.cmds.exec)).name
338 
339  for file in [exec, ".env", ".condor"]:
340  file = self.absoutput / Path(file)
341  if file.is_file():
342  os.remove(str(file))
343 
344  for ext in ["so", "pcm"]:
345  for file in glob(str(self.absoutput) + f"/*{ext}"):
346  print("Removing " + file)
347  os.remove(file)
348 
349 

◆ get_entries() [1/2]

int get_entries (   self,
  input 
)
Obtain the number of entries from input n-tuple.
206  def get_entries(self, input) -> int:
207  """Obtain the number of entries from input n-tuple."""
208 
209  output = check_output(["printEntries"] + [input.strip('\\"')])
210  nevents = int(output.decode().splitlines()[0].replace("\n", ""))
211  if nevents == 0:
212  raise ValueError("Empty input tree.")
213  return nevents
214 

◆ get_entries() [2/2]

int get_entries (   self,
  input 
)
Obtain the number of entries from input n-tuple.
206  def get_entries(self, input) -> int:
207  """Obtain the number of entries from input n-tuple."""
208 
209  output = check_output(["printEntries"] + [input.strip('\\"')])
210  nevents = int(output.decode().splitlines()[0].replace("\n", ""))
211  if nevents == 0:
212  raise ValueError("Empty input tree.")
213  return nevents
214 

◆ parse() [1/2]

None parse (   self,
List[str args 
)
Put input(s), output, and other arguments in dedicated members.
193  def parse(self, args: List[str]) -> None:
194  """Put input(s), output, and other arguments in dedicated members."""
195 
196  synopsis = self._synopsis()
197  i = synopsis.index("output")
198  if i >= len(args):
199  raise ValueError("Unable to identify `output` in given arguments")
200 
201  self.inputs = args[:i]
202  self.output = args[i]
203  if len(args) > i:
204  self.args = args[i + 1 :]
205 

◆ parse() [2/2]

None parse (   self,
List[str args 
)
Put input(s), output, and other arguments in dedicated members.
193  def parse(self, args: List[str]) -> None:
194  """Put input(s), output, and other arguments in dedicated members."""
195 
196  synopsis = self._synopsis()
197  i = synopsis.index("output")
198  if i >= len(args):
199  raise ValueError("Unable to identify `output` in given arguments")
200 
201  self.inputs = args[:i]
202  self.output = args[i]
203  if len(args) > i:
204  self.args = args[i + 1 :]
205 

◆ prepare_fire_and_forget() [1/2]

None prepare_fire_and_forget (   self)
Copies the executable, libraries, and dictionaries to the output directory.
298  def prepare_fire_and_forget(self) -> None:
299  """Copies the executable, libraries, and dictionaries to the output directory."""
300 
301  # copy executables and libraries to working directory
302  exec = Path(which(self.cmds.exec)).resolve()
303  copy_exec = self.absoutput / exec.name
304  if exec != copy_exec:
305  copy2(exec, self.output)
306  exec = copy_exec
307  self.cmds.exec = str(exec)
308 
309  # get the list of dependencies (for binary executables only)
310  mime_type = os.popen(f"file --mime-type {exec}").read()
311  cmd = ""
312  if sys.platform == "darwin": # i.e. macOS (unfortunate name conflict)
313  if not "application/x-mach-binary" in mime_type:
314  return
315  cmd = 'otool -L ' + str(exec) + ' | awk \'{print $1}\' | grep "\@rpath" | sed \'s/\@rpath\///g\''
316  else: # assuming linux
317  if not "application/x-executable" in mime_type:
318  return
319  cmd = 'ldd -r ' + str(exec) + ' | awk \'{print $1}\' | grep "^lib"'
320  dependencies = os.popen(cmd).read().splitlines()
321 
322  # get the list of local libraries
323  local_libs = glob("/builds/cms-analysis/general/DasAnalysisSystem/Core/Installer/tools/lib64/*")
324 
325  # copy the necessary libraries and dictionaries in local working directory
326  for dep in dependencies:
327  stem = Path(dep).stem
328  for ext in ["_rdict.pcm", ".rootmap", ".so", ".dylib"]:
329  candidate = stem + ext
330  for file in local_libs:
331  if candidate in file and Path(file).is_file():
332  copy2(file, self.absoutput)
333 

◆ prepare_fire_and_forget() [2/2]

None prepare_fire_and_forget (   self)
Copies the executable, libraries, and dictionaries to the output directory.
298  def prepare_fire_and_forget(self) -> None:
299  """Copies the executable, libraries, and dictionaries to the output directory."""
300 
301  # copy executables and libraries to working directory
302  exec = Path(which(self.cmds.exec)).resolve()
303  copy_exec = self.absoutput / exec.name
304  if exec != copy_exec:
305  copy2(exec, self.output)
306  exec = copy_exec
307  self.cmds.exec = str(exec)
308 
309  # get the list of dependencies (for binary executables only)
310  mime_type = os.popen(f"file --mime-type {exec}").read()
311  cmd = ""
312  if sys.platform == "darwin": # i.e. macOS (unfortunate name conflict)
313  if not "application/x-mach-binary" in mime_type:
314  return
315  cmd = 'otool -L ' + str(exec) + ' | awk \'{print $1}\' | grep "\@rpath" | sed \'s/\@rpath\///g\''
316  else: # assuming linux
317  if not "application/x-executable" in mime_type:
318  return
319  cmd = 'ldd -r ' + str(exec) + ' | awk \'{print $1}\' | grep "^lib"'
320  dependencies = os.popen(cmd).read().splitlines()
321 
322  # get the list of local libraries
323  local_libs = glob("/builds/cms-analysis/general/DasAnalysisSystem/Core/Installer/tools/lib64/*")
324 
325  # copy the necessary libraries and dictionaries in local working directory
326  for dep in dependencies:
327  stem = Path(dep).stem
328  for ext in ["_rdict.pcm", ".rootmap", ".so", ".dylib"]:
329  candidate = stem + ext
330  for file in local_libs:
331  if candidate in file and Path(file).is_file():
332  copy2(file, self.absoutput)
333 

◆ prepare_io() [1/2]

None prepare_io (   self,
bool   multi = True 
)
Prepare input and output files and directories.
286  def prepare_io(self, multi: bool = True) -> None:
287  """Prepare input and output files and directories."""
288 
289  self._prepare_input()
290  self.absoutput = Path(self.output).resolve()
291 
292  if multi:
293  self.inputs = [shlex.quote(x) for x in self.inputs]
294  self._prepare_multi_output()
295  else:
296  self._prepare_single_output()
297 

◆ prepare_io() [2/2]

None prepare_io (   self,
bool   multi = True 
)
Prepare input and output files and directories.
286  def prepare_io(self, multi: bool = True) -> None:
287  """Prepare input and output files and directories."""
288 
289  self._prepare_input()
290  self.absoutput = Path(self.output).resolve()
291 
292  if multi:
293  self.inputs = [shlex.quote(x) for x in self.inputs]
294  self._prepare_multi_output()
295  else:
296  self._prepare_single_output()
297 

Member Data Documentation

◆ absinputs

absinputs

◆ absoutput

absoutput

◆ args

args

◆ cmds

cmds

◆ extension

extension

◆ helper

helper

◆ inputs

inputs

◆ name

name

◆ nSplit

nSplit

◆ output

output

◆ Path

Path
static

◆ splittable

splittable

◆ str

str
static

The documentation for this class was generated from the following file:
Darwin::Tools::split
@ split
activate -k and -j to define slice
Definition: Options.h:31
difference
double difference(double *x, double *p)
Definition: ModifiedGaussianCore.cc:10
join
PseudoJet join(const std::vector< PseudoJet > &pieces)
Definition: fjcore.hh:1245
metPhiCorrectionExample.range
range
Definition: metPhiCorrectionExample.py:100