DAS  3.0
Das Analysis System
transcribe Namespace Reference

Functions

def parse (List[str] argv)
 
def create_new_dag (str script, str outputDir, bool verbose=False)
 
def main ()
 

Function Documentation

◆ create_new_dag()

def transcribe.create_new_dag ( str  script,
str  outputDir,
bool   verbose = False 
)
Actual script transcription and preparation of dag directory
64 def create_new_dag(script: str, outputDir: str, verbose: bool = False):
65  """Actual script transcription and preparation of dag directory"""
66 
67  outputDir.mkdir(parents=True)
68 
69  # copy the script to the output directory with an additional alias
70  # to replace `submit` with `job`
71  lines = []
72  with open(script, "r") as f:
73  lines = f.readlines()
74  alias = f"alias submit='job -D {outputDir}'"
75  if verbose:
76  alias += " -v"
77  lines = [alias + "\n"] + lines
78  if lines[1][:2] == "#!":
79  lines[0], lines[1] = lines[1], lines[0]
80  script = outputDir / "script"
81  with open(script, "w") as f:
82  for line in lines:
83  f.write(line)
84  os.chmod(script, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
85 
86  # run the modified script to make the DAG
87  subprocess.run(str(script), shell=True, check=True)
88  # \todo in case, capture environment from the script (`/proc/<pid>/environ`)
89 
90  # add the dot file to visualise the dependencies
91  with open(outputDir / "dag", "a") as f:
92  print(f"DOT {outputDir}/jobs.dot", file=f)
93 
94  # copy the libraries and dictionaries in a safe place
95  for ext in ["so", "pcm"]:
96  for lib in glob(os.environ["DARWIN_FIRE_AND_FORGET"] + "/*" + ext):
97  if not Path(lib).exists():
98  copy2(lib, outputDir)
99 
100  # \todo copy any config file found in the input script
101 
102 

◆ main()

def transcribe.main ( )
Perform the transcription of a script with submit prefix commands
to a DAG format, which can then be submitted on HTCondor.
103 def main():
104  """Perform the transcription of a script with submit prefix commands
105  to a DAG format, which can then be submitted on HTCondor."""
106 
107  # retrieve and define a few variables
108  args = parse(sys.argv)
109  script = Path(args.script)
110  batchname = script.name
111 
112  # prepare output directory
113  outputDir = Path(args.dag)
114  outputDir_preexists = outputDir.exists()
115  rescue_preexists = len(list(outputDir.glob("dag.rescue*"))) > 0
116  if outputDir_preexists:
117  print("(Re)submitting existing dag.")
118  else:
119  create_new_dag(script, outputDir, args.verbose)
120 
121  # prepare and run dag
122  if outputDir_preexists and not rescue_preexists:
123  cmd = f"condor_submit -batch-name {batchname}"
124  else:
125  cmd = f"condor_submit_dag -import_env -batch-name {batchname}"
126 
127  if args.verbose:
128  cmd += " -verbose"
129  if args.dry_run:
130  cmd += " -no_submit"
131 
132  if outputDir_preexists and not rescue_preexists:
133  cmd += f" dag.condor.sub"
134  else:
135  cmd += f" dag"
136 
137  # prepare the environment
138  path = os.environ["PATH"]
139  ld_library_path = os.environ["LD_LIBRARY_PATH"]
140  env = os.environ.copy()
141  env["PATH"] = f"{outputDir}:{path}"
142  env["LD_LIBRARY_PATH"] = f"{outputDir}:{ld_library_path}"
143  if args.verbose:
144  print(cmd)
145  if which("condor_submit_dag") is None or which("condor_submit") is None:
146  print("HTCondor shell commands cannot be found.")
147  sys.exit(not args.dry_run)
148  subprocess.run(cmd, shell=True, env=env, check=True, cwd=outputDir)
149  if args.dry_run:
150  sys.exit()
151 
152  # prepare the babysitting
153  cmd = "condor_q -dag -nobatch" # \todo constraint
154  if args.background:
155  print(f"Check the status of your jobs with `{cmd}`.")
156  sys.exit()
157 
158  # prepare killing jobs on Ctrl+C
159  def handler(signum, frame):
160  subprocess.run(
161  f"condor_rm -constraint 'JobBatchName == \"{batchname}\"'", shell=True
162  )
163  sys.exit()
164 
165  signal.signal(signal.SIGINT, handler)
166 
167  # watch jobs
168  cmd = f"watch -t -d {cmd}"
169  subprocess.run(cmd, check=True, shell=True)
170 
171 

◆ parse()

def transcribe.parse ( List[str]  argv)
17 def parse(argv: List[str]):
18  parser = ArgumentParser()
19  parser.add_argument(
20  "-b",
21  "--background",
22  action="store_true",
23  help="exit right after submitting the job",
24  )
25  parser.add_argument(
26  "-d", "--dry-run", action="store_true", help="exit right before submitting"
27  )
28  parser.add_argument(
29  "-D",
30  "--dag",
31  help="indicate the DAG directory",
32  type=str,
33  nargs='?',
34  default="dag"
35  )
36  parser.add_argument(
37  "-t", "--tutorial", action="store_true", help="show a brief tutorial"
38  )
39  parser.add_argument(
40  "-v", "--verbose", action="store_true", help="increase verbosity"
41  )
42  parser.add_argument(
43  "script",
44  nargs="?",
45  type=str,
46  help="shell script including a series of `submit` statements",
47  )
48 
49  args = parser.parse_args(argv[1:])
50  if args.tutorial:
51  print(
52  "Replaces `submit` prefix commands with `job` prefix commands in the input script "
53  "to produce and submit a DAG file, with the fire-and-forget strategy. The full "
54  "power of HTCondor DAGMans may then be used, in particular to resubmit failed jobs."
55  )
56  sys.exit()
57 
58  if len(args.script) == 0:
59  raise ValueError("Missing input script")
60 
61  return args
62 
63 
transcribe.create_new_dag
def create_new_dag(str script, str outputDir, bool verbose=False)
Definition: transcribe.py:64
transcribe.parse
def parse(List[str] argv)
Definition: transcribe.py:17
transcribe.main
def main()
Definition: transcribe.py:103