Last active
September 12, 2025 00:04
-
-
Save zonca/94b99a5590c43eba2f47d3514b166c88 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "554de106", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import os\n", | |
| "from pathlib import Path\n", | |
| "from Pegasus.api import (\n", | |
| " SiteCatalog,\n", | |
| " Site,\n", | |
| " Directory,\n", | |
| " FileServer,\n", | |
| " Operation,\n", | |
| " Arch,\n", | |
| " OS,\n", | |
| " TransformationCatalog,\n", | |
| " Transformation,\n", | |
| " ReplicaCatalog,\n", | |
| " Workflow,\n", | |
| " Properties,\n", | |
| " Job,\n", | |
| ")\n", | |
| "\n", | |
| "ANNEX_NAME = os.environ.get(\"USER\", \"annex\") # annex name = your login\n", | |
| "\n", | |
| "# Base directories\n", | |
| "BASE = Path.cwd() / \"pegasus_annex_api_run\"\n", | |
| "SC_DIR = BASE / \"sc\" # Site catalog\n", | |
| "TC_DIR = BASE / \"tc\" # Transformation catalog\n", | |
| "RC_DIR = BASE / \"rc\" # Replica catalog\n", | |
| "WF_DIR = BASE / \"wf\" # Submit/run workspace\n", | |
| "for d in (SC_DIR, TC_DIR, RC_DIR, WF_DIR):\n", | |
| " d.mkdir(parents=True, exist_ok=True)\n", | |
| "\n", | |
| "# Shared filesystem layout (adjust if needed)\n", | |
| "SCRATCH = Path.home() / \"pegasus\" / \"scratch\"\n", | |
| "OUTPUT = Path.home() / \"pegasus\" / \"output\"\n", | |
| "SCRATCH.mkdir(parents=True, exist_ok=True)\n", | |
| "OUTPUT.mkdir(parents=True, exist_ok=True)\n", | |
| "\n", | |
| "print(\"ANNEX_NAME:\", ANNEX_NAME)\n", | |
| "print(\"BASE:\", BASE)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "48723aa3", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Cell 2 \u2014 pegasus.properties (use sharedfs + catalog file paths)\n", | |
| "props = Properties()\n", | |
| "props[\"pegasus.data.configuration\"] = \"sharedfs\" # 5.x default is condorio\n", | |
| "props[\"pegasus.transfer.worker.package\"] = \"True\"\n", | |
| "\n", | |
| "# Tell Pegasus where the catalogs are:\n", | |
| "sc_path = str(SC_DIR / \"sites.yml\")\n", | |
| "tc_path = str(TC_DIR / \"transformations.yml\")\n", | |
| "rc_path = str(RC_DIR / \"replicas.yml\")\n", | |
| "props[\"pegasus.catalog.site\"] = \"YAML\"\n", | |
| "props[\"pegasus.catalog.site.file\"] = sc_path\n", | |
| "props[\"pegasus.catalog.transformation\"] = \"YAML\"\n", | |
| "props[\"pegasus.catalog.transformation.file\"] = tc_path\n", | |
| "props[\"pegasus.catalog.replica\"] = \"YAML\"\n", | |
| "props[\"pegasus.catalog.replica.file\"] = rc_path\n", | |
| "\n", | |
| "props_path = str(BASE / \"pegasus.properties\")\n", | |
| "props.write(props_path)\n", | |
| "print(\"Wrote properties:\", props_path)\n", | |
| "print(\"SC:\", sc_path)\n", | |
| "print(\"TC:\", tc_path)\n", | |
| "print(\"RC:\", rc_path)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "f9a76898", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "sc = SiteCatalog()\n", | |
| "\n", | |
| "# Local site for planning/output registration (unchanged) ...\n", | |
| "# [ your local site block here ]\n", | |
| "\n", | |
| "# Expanse compute site; pin all jobs to your annex\n", | |
| "expanse = Site(\"expanse\", arch=Arch.X86_64, os_type=OS.LINUX).add_directories(\n", | |
| " Directory(\n", | |
| " Directory.SHARED_SCRATCH, path=str(SCRATCH), shared_file_system=True\n", | |
| " ).add_file_servers(FileServer(\"file://\" + str(SCRATCH), Operation.ALL)),\n", | |
| " Directory(\n", | |
| " Directory.SHARED_STORAGE, path=str(OUTPUT), shared_file_system=True\n", | |
| " ).add_file_servers(FileServer(\"file://\" + str(OUTPUT), Operation.ALL)),\n", | |
| " ).add_pegasus_profile(style=\"condor\")\n", | |
| ")\n", | |
| "\n", | |
| "# 5.1.1 ProfileMixin helpers\n", | |
| "expanse.add_condor_profile(universe=\"vanilla\")\n", | |
| "expanse.add_condor_profile(requirements=f'(AnnexName == \"{ANNEX_NAME}\")')\n", | |
| "expanse.add_condor_profile(request_cpus=\"1\")\n", | |
| "expanse.add_condor_profile(request_memory=\"1024\")\n", | |
| "\n", | |
| "\n", | |
| "sc.add_sites(expanse)\n", | |
| "\n", | |
| "sc_path = str(SC_DIR / \"sites.yml\")\n", | |
| "sc.write(sc_path)\n", | |
| "print(\"Wrote site catalog:\", sc_path)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "a02b615a", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "tc = TransformationCatalog()\n", | |
| "\n", | |
| "hostname = Transformation(\n", | |
| " name=\"hostname\",\n", | |
| " site=\"expanse\",\n", | |
| " pfn=\"/bin/hostname\",\n", | |
| " is_stageable=False,\n", | |
| " arch=Arch.X86_64,\n", | |
| " os_type=OS.LINUX,\n", | |
| ")\n", | |
| "\n", | |
| "tc.add_transformations(hostname)\n", | |
| "\n", | |
| "tc_path = TC_DIR / \"transformations.yml\"\n", | |
| "tc.write(str(tc_path)) # <-- str(...)\n", | |
| "print(\"Wrote transformation catalog:\", tc_path)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "b738c7d0", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "rc = ReplicaCatalog()\n", | |
| "rc_path = RC_DIR / \"replicas.yml\"\n", | |
| "rc.write(str(rc_path)) # <-- str(...)\n", | |
| "print(\"Wrote replica catalog:\", rc_path)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "0469ec4d", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "wf = Workflow(\"annex-demo\")\n", | |
| "\n", | |
| "j = Job(\"hostname\", node_label=\"hostname-task\")\n", | |
| "# Capture stdout to a file in the shared output dir\n", | |
| "j.set_stdout(\"hostname.out\")\n", | |
| "\n", | |
| "wf.add_jobs(j)\n", | |
| "print(\"Workflow with 1 job defined.\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "3d67a4cb", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Cell 7 \u2014 Plan & submit (outputs stay on expanse; no remote stage-out)\n", | |
| "submit_dir = WF_DIR / \"run\"\n", | |
| "planned = wf.plan(\n", | |
| " dir=str(submit_dir),\n", | |
| " sites=[\"expanse\"], # run jobs on expanse (the annex)\n", | |
| " output_sites=[\n", | |
| " \"expanse\"\n", | |
| " ], # write/register outputs on expanse too -> no remote transfer job\n", | |
| " submit=True,\n", | |
| " force=True,\n", | |
| " verbose=3,\n", | |
| " # Either pass catalogs explicitly...\n", | |
| " sites_catalog=sc_path,\n", | |
| " transformations_catalog=tc_path,\n", | |
| " replicas_catalog=rc_path,\n", | |
| " # ...or rely on props if you put the paths there; both are fine\n", | |
| " conf=props_path,\n", | |
| ")\n", | |
| "print(\"Submitted. Submit dir:\", planned.submission_dir)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "2cd469b6", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "try:\n", | |
| " planned.wait() # available in Pegasus 5.x\n", | |
| " print(\"Workflow finished.\")\n", | |
| "except Exception as e:\n", | |
| " print(\"Waiting failed:\", e)\n", | |
| " # Fallback would be to poll pegasus-status externally if needed" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "7bdfec0a", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "out_file = OUTPUT / \"hostname.out\"\n", | |
| "if out_file.exists():\n", | |
| " print(\"=== hostname.out ===\")\n", | |
| " print(out_file.read_text().strip())\n", | |
| "else:\n", | |
| " alt = Path(submit_dir) / \"hostname.out\"\n", | |
| " if alt.exists():\n", | |
| " print(\"=== hostname.out (submit dir) ===\")\n", | |
| " print(alt.read_text().strip())\n", | |
| " else:\n", | |
| " print(\"hostname.out not found \u2014 check workflow logs in:\", submit_dir)" | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.6.8" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 5 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment