Skip to content

Instantly share code, notes, and snippets.

@zonca
Last active September 12, 2025 00:04
Show Gist options
  • Select an option

  • Save zonca/94b99a5590c43eba2f47d3514b166c88 to your computer and use it in GitHub Desktop.

Select an option

Save zonca/94b99a5590c43eba2f47d3514b166c88 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "554de106",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from pathlib import Path\n",
"from Pegasus.api import (\n",
" SiteCatalog,\n",
" Site,\n",
" Directory,\n",
" FileServer,\n",
" Operation,\n",
" Arch,\n",
" OS,\n",
" TransformationCatalog,\n",
" Transformation,\n",
" ReplicaCatalog,\n",
" Workflow,\n",
" Properties,\n",
" Job,\n",
")\n",
"\n",
"ANNEX_NAME = os.environ.get(\"USER\", \"annex\") # annex name = your login\n",
"\n",
"# Base directories\n",
"BASE = Path.cwd() / \"pegasus_annex_api_run\"\n",
"SC_DIR = BASE / \"sc\" # Site catalog\n",
"TC_DIR = BASE / \"tc\" # Transformation catalog\n",
"RC_DIR = BASE / \"rc\" # Replica catalog\n",
"WF_DIR = BASE / \"wf\" # Submit/run workspace\n",
"for d in (SC_DIR, TC_DIR, RC_DIR, WF_DIR):\n",
" d.mkdir(parents=True, exist_ok=True)\n",
"\n",
"# Shared filesystem layout (adjust if needed)\n",
"SCRATCH = Path.home() / \"pegasus\" / \"scratch\"\n",
"OUTPUT = Path.home() / \"pegasus\" / \"output\"\n",
"SCRATCH.mkdir(parents=True, exist_ok=True)\n",
"OUTPUT.mkdir(parents=True, exist_ok=True)\n",
"\n",
"print(\"ANNEX_NAME:\", ANNEX_NAME)\n",
"print(\"BASE:\", BASE)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "48723aa3",
"metadata": {},
"outputs": [],
"source": [
"# Cell 2 \u2014 pegasus.properties (use sharedfs + catalog file paths)\n",
"props = Properties()\n",
"props[\"pegasus.data.configuration\"] = \"sharedfs\" # 5.x default is condorio\n",
"props[\"pegasus.transfer.worker.package\"] = \"True\"\n",
"\n",
"# Tell Pegasus where the catalogs are:\n",
"sc_path = str(SC_DIR / \"sites.yml\")\n",
"tc_path = str(TC_DIR / \"transformations.yml\")\n",
"rc_path = str(RC_DIR / \"replicas.yml\")\n",
"props[\"pegasus.catalog.site\"] = \"YAML\"\n",
"props[\"pegasus.catalog.site.file\"] = sc_path\n",
"props[\"pegasus.catalog.transformation\"] = \"YAML\"\n",
"props[\"pegasus.catalog.transformation.file\"] = tc_path\n",
"props[\"pegasus.catalog.replica\"] = \"YAML\"\n",
"props[\"pegasus.catalog.replica.file\"] = rc_path\n",
"\n",
"props_path = str(BASE / \"pegasus.properties\")\n",
"props.write(props_path)\n",
"print(\"Wrote properties:\", props_path)\n",
"print(\"SC:\", sc_path)\n",
"print(\"TC:\", tc_path)\n",
"print(\"RC:\", rc_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f9a76898",
"metadata": {},
"outputs": [],
"source": [
"sc = SiteCatalog()\n",
"\n",
"# Local site for planning/output registration (unchanged) ...\n",
"# [ your local site block here ]\n",
"\n",
"# Expanse compute site; pin all jobs to your annex\n",
"expanse = Site(\"expanse\", arch=Arch.X86_64, os_type=OS.LINUX).add_directories(\n",
" Directory(\n",
" Directory.SHARED_SCRATCH, path=str(SCRATCH), shared_file_system=True\n",
" ).add_file_servers(FileServer(\"file://\" + str(SCRATCH), Operation.ALL)),\n",
" Directory(\n",
" Directory.SHARED_STORAGE, path=str(OUTPUT), shared_file_system=True\n",
" ).add_file_servers(FileServer(\"file://\" + str(OUTPUT), Operation.ALL)),\n",
" ).add_pegasus_profile(style=\"condor\")\n",
")\n",
"\n",
"# 5.1.1 ProfileMixin helpers\n",
"expanse.add_condor_profile(universe=\"vanilla\")\n",
"expanse.add_condor_profile(requirements=f'(AnnexName == \"{ANNEX_NAME}\")')\n",
"expanse.add_condor_profile(request_cpus=\"1\")\n",
"expanse.add_condor_profile(request_memory=\"1024\")\n",
"\n",
"\n",
"sc.add_sites(expanse)\n",
"\n",
"sc_path = str(SC_DIR / \"sites.yml\")\n",
"sc.write(sc_path)\n",
"print(\"Wrote site catalog:\", sc_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a02b615a",
"metadata": {},
"outputs": [],
"source": [
"tc = TransformationCatalog()\n",
"\n",
"hostname = Transformation(\n",
" name=\"hostname\",\n",
" site=\"expanse\",\n",
" pfn=\"/bin/hostname\",\n",
" is_stageable=False,\n",
" arch=Arch.X86_64,\n",
" os_type=OS.LINUX,\n",
")\n",
"\n",
"tc.add_transformations(hostname)\n",
"\n",
"tc_path = TC_DIR / \"transformations.yml\"\n",
"tc.write(str(tc_path)) # <-- str(...)\n",
"print(\"Wrote transformation catalog:\", tc_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b738c7d0",
"metadata": {},
"outputs": [],
"source": [
"rc = ReplicaCatalog()\n",
"rc_path = RC_DIR / \"replicas.yml\"\n",
"rc.write(str(rc_path)) # <-- str(...)\n",
"print(\"Wrote replica catalog:\", rc_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0469ec4d",
"metadata": {},
"outputs": [],
"source": [
"wf = Workflow(\"annex-demo\")\n",
"\n",
"j = Job(\"hostname\", node_label=\"hostname-task\")\n",
"# Capture stdout to a file in the shared output dir\n",
"j.set_stdout(\"hostname.out\")\n",
"\n",
"wf.add_jobs(j)\n",
"print(\"Workflow with 1 job defined.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3d67a4cb",
"metadata": {},
"outputs": [],
"source": [
"# Cell 7 \u2014 Plan & submit (outputs stay on expanse; no remote stage-out)\n",
"submit_dir = WF_DIR / \"run\"\n",
"planned = wf.plan(\n",
" dir=str(submit_dir),\n",
" sites=[\"expanse\"], # run jobs on expanse (the annex)\n",
" output_sites=[\n",
" \"expanse\"\n",
" ], # write/register outputs on expanse too -> no remote transfer job\n",
" submit=True,\n",
" force=True,\n",
" verbose=3,\n",
" # Either pass catalogs explicitly...\n",
" sites_catalog=sc_path,\n",
" transformations_catalog=tc_path,\n",
" replicas_catalog=rc_path,\n",
" # ...or rely on props if you put the paths there; both are fine\n",
" conf=props_path,\n",
")\n",
"print(\"Submitted. Submit dir:\", planned.submission_dir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2cd469b6",
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" planned.wait() # available in Pegasus 5.x\n",
" print(\"Workflow finished.\")\n",
"except Exception as e:\n",
" print(\"Waiting failed:\", e)\n",
" # Fallback would be to poll pegasus-status externally if needed"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7bdfec0a",
"metadata": {},
"outputs": [],
"source": [
"out_file = OUTPUT / \"hostname.out\"\n",
"if out_file.exists():\n",
" print(\"=== hostname.out ===\")\n",
" print(out_file.read_text().strip())\n",
"else:\n",
" alt = Path(submit_dir) / \"hostname.out\"\n",
" if alt.exists():\n",
" print(\"=== hostname.out (submit dir) ===\")\n",
" print(alt.read_text().strip())\n",
" else:\n",
" print(\"hostname.out not found \u2014 check workflow logs in:\", submit_dir)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment