Skip to content

Instantly share code, notes, and snippets.

@andybak
Created October 14, 2025 11:15
Show Gist options
  • Save andybak/60c6c823175fff2631bbf7587d72ca23 to your computer and use it in GitHub Desktop.
Save andybak/60c6c823175fff2631bbf7587d72ca23 to your computer and use it in GitHub Desktop.
from django.core.management.base import BaseCommand
from django.db.models import Count
from icosa.models import Asset
class Command(BaseCommand):
help = (
"Delete assets that have no viewable formats. "
"This cleans up orphaned assets that were created but never successfully imported."
)
def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be deleted without actually deleting anything",
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt",
)
parser.add_argument(
"--source",
dest="source",
default=None,
help="Only delete assets from a specific import source (e.g., 'sketchfab')",
)
def handle(self, *args, **options):
dry_run = options.get("dry_run", False)
skip_confirm = options.get("yes", False)
source = options.get("source")
# Find assets with no formats
assets_query = Asset.objects.annotate(
format_count=Count("format_set")
).filter(format_count=0)
# Filter by source if specified
if source:
assets_query = assets_query.filter(imported_from=source)
assets = list(assets_query)
count = len(assets)
if count == 0:
self.stdout.write(self.style.SUCCESS("No assets found without formats."))
return
# Show what will be deleted
self.stdout.write(f"\nFound {count} asset(s) without formats:")
if options.get("verbosity", 1) >= 2:
for asset in assets[:10]: # Show first 10
self.stdout.write(f" - {asset.url}: {asset.name} (source: {asset.imported_from})")
if count > 10:
self.stdout.write(f" ... and {count - 10} more")
# Source breakdown
if options.get("verbosity", 1) >= 1:
sources = {}
for asset in assets:
source_name = asset.imported_from or "(no source)"
sources[source_name] = sources.get(source_name, 0) + 1
self.stdout.write("\nBreakdown by source:")
for source_name, source_count in sorted(sources.items()):
self.stdout.write(f" {source_name}: {source_count}")
if dry_run:
self.stdout.write(
self.style.WARNING(f"\n[DRY RUN] Would delete {count} asset(s). Run without --dry-run to actually delete.")
)
return
# Confirmation
if not skip_confirm:
self.stdout.write(
self.style.WARNING(f"\nThis will permanently delete {count} asset(s) from the database.")
)
confirm = input("Are you sure you want to continue? [y/N]: ")
if confirm.lower() not in ["y", "yes"]:
self.stdout.write("Cancelled.")
return
# Delete assets
deleted_count = 0
for asset in assets:
try:
asset_url = asset.url
asset.delete()
deleted_count += 1
if options.get("verbosity", 1) >= 2:
self.stdout.write(f"Deleted: {asset_url}")
except Exception as exc:
self.stderr.write(f"Error deleting {asset.url}: {exc}")
self.stdout.write(
self.style.SUCCESS(f"Successfully deleted {deleted_count} out of {count} asset(s).")
)
import math
import json
from string import Template
from typing import Dict, Optional, Iterable, List
from django.core.management.base import BaseCommand, CommandError
from icosa.models import Asset
def _quat_from_lookat(position, target, up):
try:
px, py, pz = position
tx, ty, tz = target
ux, uy, uz = up
fx, fy, fz = tx - px, ty - py, tz - pz
fl = math.sqrt(fx * fx + fy * fy + fz * fz) or 1.0
fx, fy, fz = fx / fl, fy / fl, fz / fl
rx, ry, rz = (fy * uz - fz * uy, fz * ux - fx * uz, fx * uy - fy * ux)
rl = math.sqrt(rx * rx + ry * ry + rz * rz) or 1.0
rx, ry, rz = rx / rl, ry / rl, rz / rl
ux2, uy2, uz2 = (ry * fz - rz * fy, rz * fx - rx * fz, rx * fy - ry * fx)
m00, m01, m02 = rx, ux2, -fx
m10, m11, m12 = ry, uy2, -fy
m20, m21, m22 = rz, uz2, -fz
trace = m00 + m11 + m22
if trace > 0:
s = math.sqrt(trace + 1.0) * 2.0
w = 0.25 * s
x = (m21 - m12) / s
y = (m02 - m20) / s
z = (m10 - m01) / s
elif (m00 > m11) and (m00 > m22):
s = math.sqrt(1.0 + m00 - m11 - m22) * 2.0
w = (m21 - m12) / s
x = 0.25 * s
y = (m01 + m10) / s
z = (m02 + m20) / s
elif m11 > m22:
s = math.sqrt(1.0 + m11 - m00 - m22) * 2.0
w = (m02 - m20) / s
x = (m01 + m10) / s
y = 0.25 * s
z = (m12 + m21) / s
else:
s = math.sqrt(1.0 + m22 - m00 - m11) * 2.0
w = (m10 - m01) / s
x = (m02 + m20) / s
y = (m12 + m21) / s
z = 0.25 * s
return [x, y, z, w]
except Exception:
return [0, 0, 0, 1]
def map_viewer_snapshot_to_presentation(snapshot: Dict) -> Optional[Dict]:
if not snapshot:
return None
camera = snapshot.get("cameraLookAt") or {}
position = camera.get("position")
target = camera.get("target")
up = camera.get("up") or [0, 1, 0]
fov_deg = snapshot.get("fov")
bg = snapshot.get("background") or {}
env = snapshot.get("currentEnvironment")
pres: Dict = {"camera": {"type": "perspective", "perspective": {"znear": 0.1}}}
if position:
pres["camera"]["translation"] = position
if position and target:
pres["camera"]["rotation"] = _quat_from_lookat(position, target, up)
pres["camera"].setdefault("GOOGLE_camera_settings", {})["pivot"] = target
pres["camera"].setdefault("GOOGLE_camera_settings", {})["mode"] = "movableOrbit"
if isinstance(fov_deg, (int, float)):
pres["camera"].setdefault("perspective", {})["yfov"] = math.radians(fov_deg)
if isinstance(bg.get("color"), list) and len(bg.get("color")) >= 3:
r, g, b = bg["color"][:3]
def clamp01(x):
try:
return max(0, min(1, float(x)))
except Exception:
return 0
r8 = int(round(clamp01(r) * 255))
g8 = int(round(clamp01(g) * 255))
b8 = int(round(clamp01(b) * 255))
pres["backgroundColor"] = f"#{r8:02x}{g8:02x}{b8:02x}"
pres["GOOGLE_backgrounds"] = {"color": [r, g, b]}
if env:
pres["GOOGLE_lighting_rig"] = env
pres["GOOGLE_lights_image_based"] = env
pres["orientingRotation"] = {"w": 1}
pres["GOOGLE_scene_rotation"] = {"rotation": [0, 0, 0, 1]}
pres["GOOGLE_real_world_transform"] = {"scaling_factor": 1}
return pres
def fetch_sketchfab_viewer_snapshot(uid: str, timeout_ms: int = 20000) -> Optional[Dict]:
try:
from playwright.sync_api import sync_playwright
except Exception as exc:
raise CommandError("Playwright is not installed in this environment.") from exc
viewer_js = "https://static.sketchfab.com/api/sketchfab-viewer-1.12.1.js"
html_template = Template(
"""
<!doctype html><html><head><meta charset=\"utf-8\"><script src=\"$viewer_js\"></script></head>
<body style=\"margin:0\"><iframe id=\"api-frame\" allow=\"autoplay; fullscreen; vr\" style=\"width:10px;height:10px;border:0\"></iframe>
<script>
const iframe=document.getElementById('api-frame');
const client=new window.Sketchfab(iframe);
function call(api, name){return new Promise((resolve)=>{if(typeof api[name]!== 'function'){return resolve(undefined);}try{api[name]((v)=>resolve(v));}catch(e){resolve(undefined);}})}
client.init('$uid', {autostart:1,ui_controls:0,ui_stop:0,success: function(api){api.addEventListener('viewerready', async function(){
const cameraLookAt=await call(api,'getCameraLookAt');
const fov=await call(api,'getFov');
const background=await call(api,'getBackground');
const currentEnvironment=await call(api,'getCurrentEnvironment');
const postProcessing=await call(api,'getPostProcessing');
const shading=await call(api,'getShading');
const viewerSettings=await call(api,'getViewerSettings');
window._snapshot={cameraLookAt,fov,background,currentEnvironment,postProcessing,shading,viewerSettings};
console.log('SNAPSHOT:'+JSON.stringify(window._snapshot));
});},error:function(){console.error('init error')}});
</script></body></html>
"""
)
html = html_template.substitute(uid=uid, viewer_js=viewer_js)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.set_default_timeout(timeout_ms)
snapshot = {}
def on_console(msg):
text = msg.text if isinstance(msg.text, str) else msg.text()
if isinstance(text, str) and text.startswith("SNAPSHOT:"):
try:
snapshot.update(json.loads(text[len("SNAPSHOT:"):]))
except Exception:
pass
page.on("console", on_console)
from tempfile import NamedTemporaryFile
import os
with NamedTemporaryFile("w", delete=False, suffix=".html", encoding="utf-8") as f:
f.write(html)
html_path = f.name
page.goto("file://" + os.path.abspath(html_path))
page.wait_for_timeout(12000)
browser.close()
return snapshot or None
class Command(BaseCommand):
help = "Enrich assets imported from Sketchfab with viewer presentation parameters (camera, background, environment, post-fx)."
def add_arguments(self, parser):
parser.add_argument("--asset", dest="assets", nargs="*", help="Asset.url values to process")
parser.add_argument("--uid", dest="uids", nargs="*", help="Sketchfab model UIDs to process")
parser.add_argument("--all", action="store_true", help="Process all assets imported from Sketchfab")
parser.add_argument("--limit", type=int, default=None, help="Limit number of assets to process")
parser.add_argument("--dry-run", action="store_true", help="Do not save; just print")
def handle(self, *args, **opts):
assets_arg = opts.get("assets") or []
uids_arg = opts.get("uids") or []
do_all = opts.get("all")
limit = opts.get("limit")
dry_run = opts.get("dry_run")
targets: List[Asset] = []
if assets_arg:
for aurl in assets_arg:
asset = Asset.objects.filter(url=aurl).first()
if asset:
targets.append(asset)
else:
self.stderr.write(f"No asset with url={aurl}")
if uids_arg:
for uid in uids_arg:
a = Asset.objects.filter(polydata__uid=uid).first()
if a:
targets.append(a)
else:
# Try by url convention
a = Asset.objects.filter(url=f"sketchfab-{uid}").first()
if a:
targets.append(a)
else:
self.stderr.write(f"No asset found for uid={uid}")
if do_all or (not targets and not uids_arg and not assets_arg):
qs = Asset.objects.filter(imported_from="sketchfab").order_by("-create_time")
if limit:
qs = qs[:limit]
targets.extend(list(qs))
if limit and len(targets) > limit:
targets = targets[:limit]
if not targets:
self.stdout.write("Nothing to process")
return
processed = 0
for asset in targets:
uid = None
if asset.polydata and isinstance(asset.polydata, dict):
uid = asset.polydata.get("uid")
if not uid and asset.url and asset.url.startswith("sketchfab-"):
uid = asset.url[len("sketchfab-") :]
if not uid:
self.stderr.write(f"Skipping {asset.url}: no Sketchfab uid found")
continue
self.stdout.write(f"Probing viewer for {asset.url} (uid={uid})...")
snapshot = fetch_sketchfab_viewer_snapshot(uid)
if not snapshot:
self.stderr.write(f" → No snapshot captured")
continue
pres = map_viewer_snapshot_to_presentation(snapshot)
if not pres:
self.stderr.write(f" → No mappable presentation data")
continue
if dry_run:
self.stdout.write(json.dumps(pres))
else:
asset.presentation_params = pres
asset.save(update_fields=["presentation_params"])
self.stdout.write(" → Saved presentation_params")
processed += 1
self.stdout.write(self.style.SUCCESS(f"Done. Processed {processed} assets."))
import json
import io
import mimetypes
import os
from datetime import datetime
from pathlib import Path
from typing import Iterable, List, Optional, Tuple
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand, CommandError
from django.utils.text import slugify
from django.utils import timezone
from PIL import Image
from icosa.helpers.file import get_content_type
from icosa.helpers.snowflake import generate_snowflake
from icosa.models import (
ASSET_STATE_COMPLETE,
PUBLIC,
Asset,
AssetOwner,
Format,
Resource,
Tag,
)
from icosa.models.common import CATEGORY_LABEL_MAP
IMPORT_SOURCE = "Poly Haven"
def first_json_file(path: Path) -> Optional[Path]:
for p in sorted(path.glob("*.json")):
return p
return None
def pick_thumbnail_file(path: Path) -> Optional[Path]:
"""Only use an exact "thumbnail.webp" if present; otherwise no thumbnail."""
thumb_webp = path / "thumbnail.webp"
if thumb_webp.exists() and thumb_webp.is_file():
return thumb_webp
return None
def pick_glb_file(path: Path) -> Optional[Path]:
glbs = sorted(path.glob("*.glb"))
if glbs:
# If multiple, prefer one that does not look like LOD or low-res
preferred = [
p
for p in glbs
if not any(k in p.name.lower() for k in ("lod", "low", "preview", "thumb"))
]
return preferred[0] if preferred else glbs[0]
return None
def parse_datetime(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
# Try ISO first
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except Exception:
return None
def derive_license(meta: dict) -> Optional[str]:
raw = None
for key in ("license", "licence", "license_id", "licenseName", "license_slug"):
v = meta.get(key)
if v:
raw = str(v)
break
if raw:
low = raw.lower()
if "cc0" in low or "public domain" in low or "creative commons 0" in low:
return "CREATIVE_COMMONS_0"
if "by-sa" in low:
return "CREATIVE_COMMONS_BY_SA_4_0"
if low in ("by", "cc-by", "creative commons by", "cc by"):
return "CREATIVE_COMMONS_BY_4_0"
return None
class Command(BaseCommand):
help = (
"Import local Poly Haven-style assets from a directory. "
"Each subdirectory is treated as an asset folder; directories without a .glb are ignored."
)
def add_arguments(self, parser):
parser.add_argument(
"--base-dir",
dest="base_dir",
default=os.environ.get("POLYHAVEN_DIR") or r"C:\\Users\\andyb\\3D Objects\\Poly Haven",
help="Base directory containing Poly Haven asset folders",
)
parser.add_argument(
"--max",
dest="max_items",
type=int,
default=None,
help="Maximum number of items to import",
)
parser.add_argument(
"--update-existing",
dest="update_existing",
action="store_true",
help="Update assets if they already exist",
)
parser.add_argument(
"--owner",
dest="owner_slug",
default="polyhaven",
help="Owner slug to assign when author is not derivable",
)
def handle(self, *args, **options):
base_dir = Path(options["base_dir"]).expanduser()
if not base_dir.exists() or not base_dir.is_dir():
raise CommandError(f"Base directory does not exist: {base_dir}")
update_existing: bool = options.get("update_existing", False)
max_items: Optional[int] = options.get("max_items")
owner_slug_default: str = options.get("owner_slug")
count = 0
scanned = 0
imported_dirs: List[Path] = []
for root, _dirs, _files in os.walk(base_dir):
dirpath = Path(root)
scanned += 1
glb = pick_glb_file(dirpath)
if not glb:
continue
try:
asset = self.create_or_update_from_dir(dirpath, glb, owner_slug_default, update_existing)
if asset is not None:
count += 1
imported_dirs.append(dirpath)
self.stdout.write(f"Imported {asset.url} from {dirpath.name}")
except CommandError as exc:
self.stderr.write(f"Skipping {dirpath.name}: {exc}")
if max_items is not None and count >= max_items:
break
self.stdout.write(self.style.SUCCESS(f"Finished. Scanned={scanned} imported={count}"))
def create_or_update_from_dir(
self,
dirpath: Path,
glb_path: Path,
owner_slug_default: str,
update_existing: bool,
) -> Optional[Asset]:
meta_path = first_json_file(dirpath)
meta: dict = {}
meta_present = False
if meta_path and meta_path.exists():
meta_present = True
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
meta = {}
# Derive basic fields
name = meta.get("name") or meta.get("title") or dirpath.name
desc = meta.get("description") or meta.get("desc")
# Prefer an explicit id/slug; else folder name
ident = (
str(meta.get("id") or meta.get("slug") or slugify(name) or dirpath.name)
.strip()
.replace(" ", "-")
)
asset_url = f"polyhaven-{ident}"
# Owner: try author info; else default
author_name = None
for key in ("author", "artist", "creator"):
v = meta.get(key)
if isinstance(v, str) and v.strip():
author_name = v.strip()
break
if isinstance(v, dict):
author_name = (v.get("name") or v.get("username") or v.get("id") or "").strip() or None
if author_name:
break
if not author_name and isinstance(meta.get("authors"), list) and meta.get("authors"):
first = meta["authors"][0]
if isinstance(first, dict):
author_name = (first.get("name") or first.get("username") or first.get("id") or "").strip() or None
elif isinstance(first, str):
author_name = first.strip()
owner_slug = slugify(author_name) if author_name else owner_slug_default
owner_display = author_name or owner_slug_default
owner, _ = AssetOwner.objects.get_or_create(
url=owner_slug,
defaults={
"displayname": owner_display,
"imported": True,
"is_claimed": False,
},
)
# Locate or create asset
asset = Asset.objects.filter(url=asset_url).first()
created = False
if not asset:
created = True
asset = Asset(url=asset_url)
else:
if not update_existing:
return None
# Core fields
created_at = parse_datetime(meta.get("created") or meta.get("created_at") or meta.get("date")) or timezone.now()
updated_at = parse_datetime(meta.get("updated") or meta.get("modified") or meta.get("updated_at")) or created_at
asset.name = name
asset.description = desc
if created and not asset.create_time:
asset.create_time = created_at
asset.update_time = updated_at
asset.visibility = PUBLIC
asset.curated = True
asset.state = ASSET_STATE_COMPLETE
asset.owner = owner
asset.imported_from = IMPORT_SOURCE
if meta_present:
asset.polydata = meta
# All Poly Haven assets are CC0
asset.license = "CREATIVE_COMMONS_0"
# Category
cat_name = None
cats = meta.get("categories") or meta.get("category")
if isinstance(cats, list) and cats:
c0 = cats[0]
cat_name = c0.get("name") if isinstance(c0, dict) else str(c0)
elif isinstance(cats, str):
cat_name = cats
if cat_name:
key = str(cat_name).strip().lower()
asset.category = CATEGORY_LABEL_MAP.get(key)
# Assign id for new assets
if created:
asset.id = generate_snowflake()
asset.save()
# Tags
tags_raw: Iterable = meta.get("tags") or meta.get("keywords") or []
tag_names: List[str] = []
for t in tags_raw:
if isinstance(t, dict):
tag_names.append(t.get("name") or t.get("slug"))
elif isinstance(t, str):
tag_names.append(t)
tag_objs = []
for name in filter(None, set(tag_names)):
tag, _ = Tag.objects.get_or_create(name=name)
tag_objs.append(tag)
if tag_objs:
asset.tags.set(tag_objs)
# Thumbnail
thumb_path = pick_thumbnail_file(dirpath)
if thumb_path and ((not asset.thumbnail) or update_existing):
# Convert webp to jpeg to satisfy thumbnail validators
if thumb_path.suffix.lower() == ".webp":
with Image.open(thumb_path) as im:
# Ensure RGB (discard alpha on white background if present)
if im.mode in ("RGBA", "LA"):
bg = Image.new("RGB", im.size, (255, 255, 255))
alpha = im.split()[-1] if im.mode in ("RGBA", "LA") else None
if alpha is not None:
bg.paste(im.convert("RGB"), mask=alpha)
else:
bg.paste(im.convert("RGB"))
im = bg
else:
im = im.convert("RGB")
# Fit image into an 8:5 box without upscaling image content.
target_ar = 8 / 5
max_w, max_h = 1600, 1000 # upper bound for large sources
w, h = im.size
# Scale down if larger than max box; never scale up
scale = min(1.0, min(max_w / w, max_h / h))
new_w = int(w * scale)
new_h = int(h * scale)
if scale < 1.0:
im = im.resize((new_w, new_h), Image.LANCZOS)
else:
new_w, new_h = w, h
# Compute minimal padding to achieve 8:5 aspect ratio canvas
if new_w / new_h < target_ar:
canvas_w = int(round(new_h * target_ar))
canvas_h = new_h
else:
canvas_w = new_w
canvas_h = int(round(new_w / target_ar))
# Add 10% white padding around the image
pad = int(0.1 * max(canvas_w, canvas_h))
padded_w = canvas_w + 2 * pad
padded_h = canvas_h + 2 * pad
canvas = Image.new("RGB", (padded_w, padded_h), (255, 255, 255))
# Center the image on the padded canvas
paste_x = (padded_w - canvas_w) // 2
paste_y = (padded_h - canvas_h) // 2
inner_canvas = Image.new("RGB", (canvas_w, canvas_h), (255, 255, 255))
img_x = (canvas_w - new_w) // 2
img_y = (canvas_h - new_h) // 2
inner_canvas.paste(im, (img_x, img_y))
canvas.paste(inner_canvas, (paste_x, paste_y))
buf = io.BytesIO()
canvas.save(buf, format="JPEG", quality=90)
buf.seek(0)
jpg_name = thumb_path.with_suffix(".jpg").name
asset.thumbnail.save(jpg_name, ContentFile(buf.read()), save=False)
asset.thumbnail_contenttype = "image/jpeg"
else:
# Guess content type and save
content_type = get_content_type(thumb_path.name) or mimetypes.guess_type(thumb_path.name)[0] or "image/jpeg"
asset.thumbnail.save(thumb_path.name, ContentFile(thumb_path.read_bytes()), save=False)
asset.thumbnail_contenttype = content_type
asset.save()
# Formats/resources: attach GLB as primary format (avoid duplicates)
existing_glb = asset.format_set.filter(format_type="GLB").last()
if not existing_glb:
fmt = Format.objects.create(asset=asset, format_type="GLB", role="POLYHAVEN_GLB")
glb_bytes = glb_path.read_bytes()
content_type = get_content_type(glb_path.name) or mimetypes.guess_type(glb_path.name)[0] or "application/octet-stream"
res = Resource(asset=asset, format=fmt, contenttype=content_type)
res.file.save(glb_path.name, ContentFile(glb_bytes), save=True)
fmt.add_root_resource(res)
# Assign preferred viewer format and save
asset.assign_preferred_viewer_format()
asset.save()
return asset
import os
import time
import mimetypes
import zipfile
import io
from os.path import basename
from datetime import datetime
from typing import Dict, Generator, Iterable, List, Optional
import requests
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from icosa.helpers.file import (
get_content_type,
validate_file,
process_main_file,
UploadedFormat,
)
from django.core.files.uploadedfile import SimpleUploadedFile
from icosa.helpers.snowflake import generate_snowflake
from icosa.models import (
ASSET_STATE_COMPLETE,
PUBLIC,
Asset,
AssetOwner,
Format,
Resource,
Tag,
)
from icosa.models.common import CATEGORY_LABEL_MAP
IMPORT_SOURCE = "sketchfab"
def parse_iso8601(ts: Optional[str]) -> Optional[datetime]:
if not ts:
return None
try:
# Sketchfab returns ISO8601 strings; parse and ensure timezone-aware
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
# Ensure the datetime is timezone-aware (Django requires this)
if dt.tzinfo is None:
dt = timezone.make_aware(dt, timezone.utc)
return dt
except Exception:
return None
def sketchfab_license_to_internal(slug: Optional[str]) -> Optional[str]:
"""Map Sketchfab license slugs to internal icosa license codes.
Supported defaults:
- cc0 -> CREATIVE_COMMONS_0
- by -> CREATIVE_COMMONS_BY_4_0
Other Sketchfab licenses are currently not mapped to icosa choices by default.
"""
if not slug:
return None
slug = slug.lower().strip()
if slug == "cc0":
return "CREATIVE_COMMONS_0"
if slug == "by":
# Sketchfab uses CC BY 4.0 today for the BY family.
return "CREATIVE_COMMONS_BY_4_0"
if slug == "by-sa":
return "CREATIVE_COMMONS_BY_SA_4_0"
if slug == "by-nd":
return "CREATIVE_COMMONS_BY_ND_4_0"
if slug == "by-nc":
return "CREATIVE_COMMONS_NC_4_0"
if slug == "by-nc-sa":
return "CREATIVE_COMMONS_NC_SA_4_0"
if slug == "by-nc-nd":
return "CREATIVE_COMMONS_NC_ND_4_0"
# Unhandled licenses (by-nc, by-nd, by-sa, etc.) are not mapped
return None
def pick_thumbnail_url(model: Dict) -> Optional[str]:
thumbs = (model or {}).get("thumbnails", {}).get("images", [])
if not thumbs:
return None
# Choose the largest width image available
thumbs_sorted = sorted(thumbs, key=lambda x: x.get("width", 0), reverse=True)
return thumbs_sorted[0].get("url")
class SketchfabClient:
BASE = "https://api.sketchfab.com/v3"
def __init__(self, token: Optional[str] = None, timeout: int = 30):
self.token = token
self.timeout = timeout
self.session = requests.Session()
if token:
self.session.headers.update({"Authorization": f"Token {token}"})
def paged(self, url: str, params: Dict) -> Generator[Dict, None, None]:
next_url = url
next_params = params.copy()
while next_url:
resp = self.session.get(next_url, params=next_params, timeout=self.timeout)
if resp.status_code != 200:
raise CommandError(f"Sketchfab request failed: {resp.status_code} {resp.text}")
data = resp.json()
for item in data.get("results", []):
yield item
next_url = data.get("next")
next_params = {}
# Be nice to the API
time.sleep(0.1)
def search_models(
self,
*,
licenses: Iterable[str],
user: Optional[str] = None,
downloadable: bool = True,
per_page: int = 24,
sort_by: str = "-publishedAt",
) -> Generator[Dict, None, None]:
params = {
"type": "models",
"licenses": ",".join(licenses),
"per_page": per_page,
"downloadable": str(downloadable).lower(),
"sort_by": sort_by,
}
# The search API accepts a 'user' filter by username.
if user:
params["user"] = user
url = f"{self.BASE}/search"
yield from self.paged(url, params)
def list_user_models(
self,
*,
user: str,
licenses: Optional[Iterable[str]] = None,
downloadable: bool = True,
per_page: int = 24,
sort_by: str = "-publishedAt",
) -> Generator[Dict, None, None]:
"""List models for a user via the search endpoint.
Sketchfab's /models endpoint does not accept a user filter reliably; the documented
approach is the /search API with `type=models` and `user=<username>`.
"""
params = {
"type": "models",
"user": user,
"per_page": per_page,
"sort_by": sort_by,
}
if licenses:
params["licenses"] = ",".join(licenses)
if downloadable is not None:
params["downloadable"] = str(downloadable).lower()
url = f"{self.BASE}/search"
yield from self.paged(url, params)
def download_info(self, uid: str, *, max_retries: int = 5) -> Optional[Dict]:
"""Return download info for a model, if accessible.
Response typically contains keys like 'gltf', 'glb', 'usdz', 'source', each with a 'url'.
Requires a valid token for most models even if downloadable is true.
"""
for attempt in range(max_retries):
resp = self.session.get(f"{self.BASE}/models/{uid}/download", timeout=self.timeout)
if resp.status_code == 401:
# Unauthorized; token required
return None
if resp.status_code == 429:
# Rate limited - check for Retry-After header
retry_after = resp.headers.get("Retry-After")
if retry_after and retry_after.isdigit():
wait_time = int(retry_after)
else:
# Exponential backoff with longer waits: 5s, 10s, 20s, 40s, 80s
wait_time = 5 * (2 ** attempt)
print(f"Rate limited on {uid}, waiting {wait_time}s before retry {attempt + 1}/{max_retries}")
time.sleep(wait_time)
continue
if resp.status_code == 200:
return resp.json()
# Other error - log and return None
print(f"DEBUG: download_info({uid}) failed with status {resp.status_code}")
print(f"DEBUG: Response body: {resp.text[:500]}") # First 500 chars
return None
# All retries exhausted
print(f"DEBUG: download_info({uid}) failed after {max_retries} retries due to rate limiting")
return None
class Command(BaseCommand):
help = (
"Import assets from Sketchfab using their API. "
"Allows filtering by user and license. Defaults to CC0, CC-BY, and CC-BY-SA."
)
def add_arguments(self, parser):
parser.add_argument(
"--user",
dest="users",
metavar="USERNAME",
action="append",
default=[],
help="Sketchfab username to filter by (can be provided multiple times)",
)
parser.add_argument(
"--license",
dest="licenses",
default="cc0,by,by-sa",
help=(
"Comma-separated Sketchfab license slugs to include. "
"Defaults to 'cc0,by,by-sa' (CC0 Public Domain, CC BY 4.0, CC BY-SA 4.0)."
),
)
parser.add_argument(
"--max",
dest="max_items",
type=int,
default=None,
help="Maximum number of models to import",
)
parser.add_argument(
"--token",
dest="token",
default=os.environ.get("SKETCHFAB_TOKEN") or os.environ.get("DJANGO_SKETCHFAB_TOKEN"),
help="Sketchfab API token (or set SKETCHFAB_TOKEN env)",
)
parser.add_argument(
"--update-existing",
dest="update_existing",
action="store_true",
help="Update models if they already exist",
)
parser.add_argument(
"--delay",
dest="delay",
type=float,
default=1.0,
help="Delay in seconds between model imports to avoid rate limiting (default: 1.0)",
)
def handle(self, *args, **options):
users: List[str] = options["users"] or []
# Normalize user-provided license slugs (accept cc-by-sa -> by-sa)
raw_licenses = options["licenses"] or "cc0,by,by-sa"
licenses_in = [x.strip().lower() for x in raw_licenses.split(",") if x.strip()]
licenses = []
for l in licenses_in:
if l in ("cc-by", "cc_by", "by-4.0", "by4.0"):
licenses.append("by")
elif l in ("cc-by-sa", "cc_by_sa", "by-sa", "bysa", "by-sa-4.0"):
licenses.append("by-sa")
else:
licenses.append(l)
max_items = options.get("max_items")
token = options.get("token")
update_existing = options.get("update_existing", False)
delay = options.get("delay", 1.0)
client = SketchfabClient(token=token)
count = 0
seen = 0
eligible = 0
targets: Iterable[Dict]
if users:
# Iterate per-user, filtering by license locally if needed
def iter_all():
for user in users:
if options.get("verbosity", 1) >= 2:
self.stdout.write(f"Querying user='{user}' licenses={licenses} downloadable=true")
for model in client.list_user_models(user=user, licenses=licenses, downloadable=True):
yield model
targets = iter_all()
else:
# Global search with license filter
targets = client.search_models(licenses=licenses)
for model in targets:
seen += 1
# Enforce license filter if the endpoint didn't do it for us
lic = (model.get("license") or {}).get("label")
lic_slug = None
if lic:
# Derive a slug-like form from label when not present
l = lic.lower()
if "cc0" in l or "public domain" in l:
lic_slug = "cc0"
elif "sharealike" in l or "share alike" in l:
lic_slug = "by-sa"
elif "attribution" in l and "no" not in l and "non" not in l:
# Heuristic for CC BY
lic_slug = "by"
if users and licenses and lic_slug and lic_slug not in licenses:
if options.get("verbosity", 1) >= 3:
self.stdout.write(f"Skipping by license: {model.get('uid')} label={lic}")
continue
uid = model.get("uid")
if not uid:
continue
# If max reached, stop early
if max_items is not None and count >= max_items:
break
# Skip non-downloadable models when we cannot fetch direct file URLs
if not model.get("isDownloadable", False):
if options.get("verbosity", 1) >= 2:
self.stdout.write(f"Skipping not-downloadable: {model.get('uid')} {model.get('name')}")
continue
eligible += 1
try:
asset = self.create_or_update_asset_from_model(client, model, update_existing=update_existing)
if asset is not None:
count += 1
self.stdout.write(f"Imported {asset.url} ({asset.name})")
# Rate limit: wait between models to avoid overwhelming the API
time.sleep(delay)
except CommandError as exc:
self.stderr.write(f"Skipping {uid}: {exc}")
# Brief delay even on errors to respect rate limits
time.sleep(delay * 0.5)
if options.get("verbosity", 1) >= 1:
self.stdout.write(f"Seen={seen}, eligible(downloadable+license)={eligible}, imported={count}")
self.stdout.write(self.style.SUCCESS(f"Finished. Imported {count} models."))
def create_or_update_asset_from_model(
self,
client: SketchfabClient,
model: Dict,
*,
update_existing: bool = False,
) -> Optional[Asset]:
uid = model.get("uid")
if not uid:
raise CommandError("Missing uid in model data")
asset_url = f"sketchfab-{uid}"
# Lookup existing
asset = Asset.objects.filter(url=asset_url).first()
created = False
if not asset:
created = True
asset = Asset(url=asset_url)
else:
if not update_existing:
# Nothing to do
return None
# Check download availability BEFORE creating the asset to avoid orphaned records
download = client.download_info(uid)
if not download:
raise CommandError(
"Could not fetch download URLs. Ensure the model is downloadable and a valid token is provided via --token or SKETCHFAB_TOKEN."
)
# Prepare owner
user = model.get("user") or {}
username = (user.get("username") or "").strip() or f"user-{user.get('uid','unknown')}"
displayname = user.get("displayName") or username
# Find a unique owner URL, trying username first, then username-1, username-2, etc.
owner_url = username
suffix = 1
while True:
owner = AssetOwner.objects.filter(url=owner_url).first()
if owner is None:
# URL is available, create new owner
owner = AssetOwner.objects.create(
url=owner_url,
displayname=displayname,
imported=True,
is_claimed=False,
)
if suffix > 1:
print(f"WARNING: Username '{username}' already exists, created owner with URL '{owner_url}'")
break
elif owner.imported and owner.displayname == displayname:
# Same owner already exists (from previous import), reuse it
break
else:
# Conflict with different owner, try next suffix
owner_url = f"{username}-{suffix}"
suffix += 1
if suffix > 100:
# Safety valve
raise CommandError(f"Could not find unique owner URL for username '{username}' after 100 attempts")
# Timestamps
created_at = parse_iso8601(model.get("createdAt")) or timezone.now()
updated_at = parse_iso8601(model.get("publishedAt")) or created_at
# Map license
license_label = (model.get("license") or {}).get("label")
license_slug = None
if license_label:
low = license_label.lower()
if "cc0" in low or "public domain" in low:
license_slug = "cc0"
elif "sharealike" in low or "share alike" in low:
license_slug = "by-sa"
elif "attribution" in low and "no" not in low and "non" not in low:
license_slug = "by"
internal_license = sketchfab_license_to_internal(license_slug)
# Core fields
if created and not asset.create_time:
asset.create_time = created_at
asset.update_time = updated_at
asset.name = model.get("name")
asset.description = model.get("description")
asset.visibility = PUBLIC
asset.state = ASSET_STATE_COMPLETE
asset.owner = owner
asset.imported_from = IMPORT_SOURCE
asset.polydata = model # Store raw sketchfab metadata
asset.historical_likes = int(model.get("likeCount") or 0)
asset.historical_views = int(model.get("viewCount") or 0)
if internal_license:
asset.license = internal_license
# Category mapping (first category name if provided)
cat_name = None
cats = model.get("categories") or []
if cats:
# categories sometimes carry only name strings
c0 = cats[0]
if isinstance(c0, dict):
cat_name = c0.get("name")
elif isinstance(c0, str):
cat_name = c0
if cat_name:
key = str(cat_name).strip().lower()
asset.category = CATEGORY_LABEL_MAP.get(key)
# Assign an id for new assets
if created:
asset.id = generate_snowflake()
asset.save()
# Tags
tags = model.get("tags") or []
tag_names = []
for t in tags:
if isinstance(t, dict):
tag_names.append(t.get("name") or t.get("slug"))
elif isinstance(t, str):
tag_names.append(t)
tag_objs = []
for name in filter(None, set(tag_names)):
tag, _ = Tag.objects.get_or_create(name=name)
tag_objs.append(tag)
if tag_objs:
asset.tags.set(tag_objs)
# Thumbnail: download and store locally if possible
if not asset.thumbnail:
thumb_url = pick_thumbnail_url(model)
if thumb_url:
try:
resp = requests.get(thumb_url, timeout=20)
if resp.status_code == 200:
content_type = resp.headers.get("Content-Type")
ext = mimetypes.guess_extension(content_type or "") or ".jpg"
if ext == ".jpe":
ext = ".jpg"
filename = f"thumbnail-{uid}{ext}"
asset.thumbnail.save(filename, ContentFile(resp.content), save=False)
asset.thumbnail_contenttype = content_type or "image/jpeg"
asset.save()
except Exception:
# Non-fatal
pass
# Formats/resources: prefer GLB if available, and download into storage
# (download info already fetched and validated earlier)
created_any_format = False
def download_to_contentfile(url: str, *, timeout: int = 60) -> Optional[ContentFile]:
try:
resp = requests.get(url, timeout=timeout)
if resp.status_code != 200:
return None
return ContentFile(resp.content)
except Exception:
return None
def add_format_from_url(url: str, fmt_type: str, *, role: Optional[str] = None, filename: Optional[str] = None):
nonlocal created_any_format
data = download_to_contentfile(url)
if not data:
return
# Infer filename and content type
content_type = None
try:
# attempt to fetch content type via HEAD for better accuracy
head = requests.head(url, timeout=15, allow_redirects=True)
content_type = head.headers.get("Content-Type")
except Exception:
pass
guessed_ext = mimetypes.guess_extension(content_type or "") or os.path.splitext(url.split("?")[0])[1] or ".bin"
if guessed_ext == ".jpe":
guessed_ext = ".jpg"
name = filename or f"{fmt_type.lower()}-{uid}{guessed_ext}"
fmt = Format.objects.create(asset=asset, format_type=fmt_type, role=role)
# Saving file to storage via FileField
res = Resource(asset=asset, format=fmt, contenttype=content_type or get_content_type(name) or "application/octet-stream")
res.file.save(name, data, save=True)
fmt.add_root_resource(res)
created_any_format = True
def add_formats_from_zip(url: str, *, preferred_ext_order: Optional[List[str]] = None):
nonlocal created_any_format
if preferred_ext_order is None:
preferred_ext_order = [
"glb",
"gltf",
"fbx",
"obj",
"usdz",
"ply",
"stl",
"vox",
"tilt",
"blocks",
]
try:
resp = requests.get(url, timeout=90)
if resp.status_code != 200:
return
zf = zipfile.ZipFile(io.BytesIO(resp.content))
except Exception:
return
# Build UploadedFormats from zip members
uploaded: List[UploadedFormat] = []
for info in zf.infolist():
if info.is_dir():
continue
fname = info.filename
# Ignore hidden or MACOSX metadata
base = basename(fname)
if not base or base.startswith(".__") or "/." in fname or base.startswith("."):
continue
try:
with zf.open(info) as fp:
data = fp.read()
except Exception:
continue
# Construct an in-memory uploaded file
su = SimpleUploadedFile(base, data, content_type=get_content_type(base) or "application/octet-stream")
ext = base.split(".")[-1].lower() if "." in base else ""
details = validate_file(su, ext)
if details is not None:
uploaded.append(details)
if not uploaded:
return
# Choose mainfile by extension preference first, then by mainfile flag
def pref_index(ext: str) -> int:
try:
return preferred_ext_order.index(ext)
except ValueError:
return len(preferred_ext_order) + 100
# Filter potential mains
mains = [u for u in uploaded if u.mainfile]
if not mains:
mains = uploaded
# Choose by extension order on the original filename
mains_sorted = sorted(mains, key=lambda u: pref_index(u.file.name.split(".")[-1].lower()))
main = mains_sorted[0]
subs = [u for u in uploaded if u is not main]
# Hand off to existing helper to build Format + Resources in storage
process_main_file(main, subs, asset, gltf_to_convert=None)
created_any_format = True
# The download payload usually has entries like {'glb': {'url': ...}, 'gltf': {'url': ...}, 'usdz': {'url': ...}}
glb_url = (download.get("glb") or {}).get("url")
if glb_url:
add_format_from_url(glb_url, "GLB", role="SKETCHFAB_GLB")
# Provide USDZ if present (not viewer-preferred, but useful to store)
usdz_url = (download.get("usdz") or {}).get("url")
if usdz_url:
add_format_from_url(usdz_url, "USDZ", role="SKETCHFAB_USDZ")
# GLTF archive (zip): unpack to root + resources
gltf_url = (download.get("gltf") or {}).get("url")
if gltf_url:
add_formats_from_zip(gltf_url, preferred_ext_order=["gltf", "glb", "fbx", "obj"]) # prefer GLTF as main
# Source archive (zip): prefer FBX, then OBJ, then others
source_url = (download.get("source") or {}).get("url")
if source_url:
add_formats_from_zip(source_url, preferred_ext_order=["fbx", "obj", "gltf", "glb", "ply", "stl"]) # prefer authoring formats
# Assign preferred viewer format if possible
asset.assign_preferred_viewer_format()
# Final save in case any denorms/validations occur
asset.save()
return asset
import hashlib
import io
import logging
import mimetypes
import os
import time
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Set, Tuple
from urllib.parse import urlparse
import requests
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.utils import timezone
from django.core.files.base import ContentFile
from PIL import Image
from icosa.models import (
ASSET_STATE_COMPLETE,
PUBLIC,
Asset,
AssetOwner,
Format,
Resource,
Tag,
)
logger = logging.getLogger(__name__)
SUPPORTED_FILE_TYPES: Tuple[str, ...] = ("glb", "gltf", "obj", "stl")
SUPPORTED_FILE_TYPE_SET = set(SUPPORTED_FILE_TYPES)
IMPORT_SOURCE = "smithsonian"
API_URL = "https://3d-api.si.edu/api/v1.0/content/file/search"
OPEN_ACCESS_API_URL = "https://api.si.edu/openaccess/api/v1.0/search"
DEFAULT_API_KEY = "DEMO_KEY" # Can be overridden with --api-key
DEFAULT_OWNER = {
"url": "smithsonian",
"displayname": "Smithsonian 3D"
}
@dataclass
class SmithsonianResource:
uri: str
usage: Optional[str]
quality: Optional[str]
model_type: Optional[str]
file_type: Optional[str]
extra: Dict[str, object] = field(default_factory=dict)
@dataclass
class SmithsonianAsset:
title: str
model_url: str
model_entries: List[SmithsonianResource] = field(default_factory=list)
image_entries: List[SmithsonianResource] = field(default_factory=list)
seen_uris: Set[str] = field(default_factory=set, repr=False)
record_id: Optional[str] = None
record_link: Optional[str] = None
unit_code: Optional[str] = None
object_name: Optional[str] = None
description: Optional[str] = None
license: Optional[str] = None
credit: Optional[str] = None
tags: List[str] = field(default_factory=list)
additional_metadata: Dict[str, object] = field(default_factory=dict)
def to_metadata(self) -> Dict[str, object]:
"""Return serialisable metadata for storage on the Asset."""
metadata = {
"title": self.title,
"model_url": self.model_url,
"models": [entry.__dict__ for entry in self.model_entries],
"images": [entry.__dict__ for entry in self.image_entries],
}
# Add rich metadata fields if present
if self.record_id:
metadata["record_id"] = self.record_id
if self.record_link:
metadata["record_link"] = self.record_link
if self.unit_code:
metadata["unit_code"] = self.unit_code
if self.object_name:
metadata["object_name"] = self.object_name
if self.description:
metadata["description"] = self.description
if self.license:
metadata["license"] = self.license
if self.credit:
metadata["credit"] = self.credit
if self.additional_metadata:
metadata["additional_metadata"] = self.additional_metadata
return metadata
def add_entry(self, entry: SmithsonianResource) -> bool:
"""Add an entry to the asset if it hasn't been seen already."""
uri = entry.uri
if uri and uri in self.seen_uris:
return False
if uri:
self.seen_uris.add(uri)
usage = (entry.usage or "").lower()
if usage.startswith("image"):
self.image_entries.append(entry)
else:
self.model_entries.append(entry)
return True
def preferred_model_entry(self) -> Optional[SmithsonianResource]:
"""Return the best candidate to use as the root resource."""
if not self.model_entries:
return None
def sort_key(entry: SmithsonianResource) -> tuple:
usage_priority = {
"web3d": 0,
"app3d": 1,
"download3d": 2,
}.get((entry.usage or "").lower(), 3)
quality_priority_map = {
"high": 0,
"medium": 1,
"ar": 2,
"low": 3,
"full_resolution": 4,
"thumb": 5,
}
quality_priority = quality_priority_map.get((entry.quality or "").lower(), 6)
# When priorities match, prefer longer urls (heuristic for higher fidelity variants).
return (usage_priority, quality_priority, -(len(entry.uri) if entry.uri else 0))
return sorted(self.model_entries, key=sort_key)[0]
def preferred_image_entry(self) -> Optional[SmithsonianResource]:
"""Return the best candidate thumbnail image."""
if not self.image_entries:
return None
def sort_key(entry: SmithsonianResource) -> tuple:
usage_priority = {
"image_thumb": 0,
"image_thumbnail": 0,
"image_small": 1,
"image_medium": 2,
"image_large": 3,
"image_master": 4,
}.get((entry.usage or "").lower(), 5)
quality_priority = {
"thumb": 0,
"low": 1,
"medium": 2,
"high": 3,
"full_resolution": 4,
}.get((entry.quality or "").lower(), 5)
return (usage_priority, quality_priority, -(len(entry.uri) if entry.uri else 0))
return sorted(self.image_entries, key=sort_key)[0]
class SmithsonianAPIClient:
def __init__(
self,
file_types: Iterable[str],
rate_limit: float = 0.5,
rows_per_page: int = 100,
api_key: str = DEFAULT_API_KEY,
):
self.file_types = list(dict.fromkeys(file_type.lower() for file_type in file_types))
self.rate_limit = rate_limit
self.rows_per_page = rows_per_page
self.api_key = api_key
self.session = requests.Session()
def fetch(self) -> Iterable[List[Dict[str, object]]]:
for file_type in self.file_types:
start = 0
total = None
while True:
params = {
"file_type": file_type,
"start": start,
"rows": self.rows_per_page,
}
response = self.session.get(API_URL, params=params, timeout=60)
try:
response.raise_for_status()
except requests.HTTPError as exc: # pragma: no cover - defensive.
raise CommandError(
f"Failed to fetch Smithsonian data for file_type={file_type}: {exc}"
) from exc
payload = response.json()
rows = payload.get("rows", [])
total = payload.get("rowCount", total)
logger.info(
"Fetched %s rows for file_type=%s at offset %s", len(rows), file_type, start
)
yield rows
start += self.rows_per_page
if total is not None and start >= total:
break
if not rows:
break
time.sleep(self.rate_limit)
def fetch_by_model_url(self, model_url: str) -> List[Dict[str, object]]:
start = 0
collected: List[Dict[str, object]] = []
while True:
params = {
"model_url": model_url,
"start": start,
"rows": self.rows_per_page,
}
response = self.session.get(API_URL, params=params, timeout=60)
try:
response.raise_for_status()
except requests.HTTPError as exc: # pragma: no cover - defensive.
raise CommandError(
f"Failed to fetch additional Smithsonian data for {model_url}: {exc}"
) from exc
payload = response.json()
rows = payload.get("rows", [])
collected.extend(rows)
if len(rows) < self.rows_per_page or not rows:
break
start += self.rows_per_page
time.sleep(self.rate_limit)
return collected
def fetch_open_access_metadata(self, model_url: str) -> Optional[Dict[str, object]]:
"""Fetch rich metadata from the Smithsonian Open Access API for a 3D package."""
try:
params = {
"q": model_url,
"api_key": self.api_key,
"rows": 1,
}
response = self.session.get(OPEN_ACCESS_API_URL, params=params, timeout=60)
response.raise_for_status()
payload = response.json()
rows = payload.get("response", {}).get("rows", [])
if rows:
return rows[0]
return None
except requests.RequestException as exc:
logger.warning("Failed to fetch Open Access metadata for %s: %s", model_url, exc)
return None
class Command(BaseCommand):
help = "Import Smithsonian 3D models into Icosa"
# Mapping of Smithsonian unit codes to our categories
UNIT_CODE_CATEGORY_MAP = {
"nasm": "TRANSPORT", # National Air and Space Museum
"nmah": "HISTORY", # National Museum of American History
"nmnh": "NATURE", # National Museum of Natural History
"nmnhmammals": "ANIMALS", # NMNH - Mammals
"nmnhbirds": "ANIMALS", # NMNH - Birds
"nmnhfishes": "ANIMALS", # NMNH - Fishes
"nmnhreptiles": "ANIMALS", # NMNH - Reptiles
"nmnhamphibians": "ANIMALS", # NMNH - Amphibians
"nmnhinvertebratezoo": "ANIMALS", # NMNH - Invertebrate Zoology
"nmnhanthro": "CULTURE", # NMNH - Anthropology
"nmnhbotany": "NATURE", # NMNH - Botany
"nmnhentomology": "ANIMALS", # NMNH - Entomology
"nmnhiz": "ANIMALS", # NMNH - Invertebrate Zoology
"nmnhminsci": "SCIENCE", # NMNH - Mineral Sciences
"nmnhpaleo": "SCIENCE", # NMNH - Paleobiology
"npg": "PEOPLE", # National Portrait Gallery
"saam": "ART", # Smithsonian American Art Museum
"acm": "CULTURE", # Anacostia Community Museum
"fsg": "ART", # Freer Gallery of Art and Arthur M. Sackler Gallery
"hmsg": "ART", # Hirshhorn Museum and Sculpture Garden
"npm": "HISTORY", # National Postal Museum
"chndm": "ART", # Cooper Hewitt, Smithsonian Design Museum
"nzp": "ANIMALS", # National Zoological Park
"si": "MISCELLANEOUS", # Smithsonian Institution (general)
"cfch": "CULTURE", # Center for Folklife and Cultural Heritage
}
def add_arguments(self, parser):
parser.add_argument(
"--rows",
type=int,
default=100,
help="Number of rows to fetch per API call",
)
parser.add_argument(
"--rate-limit",
type=float,
default=0.5,
help="Seconds to wait between API requests",
)
parser.add_argument(
"--max-assets",
type=int,
default=None,
help="Optional limit on the number of assets to import",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Fetch data but do not write to the database",
)
parser.add_argument(
"--fix-thumbs",
action="store_true",
help="Only download missing thumbnails for already-imported assets",
)
parser.add_argument(
"--update-existing",
action="store_true",
help="Update existing assets with fresh metadata (default: skip existing)",
)
parser.add_argument(
"--api-key",
type=str,
default=DEFAULT_API_KEY,
help=f"Smithsonian Open Access API key (default: {DEFAULT_API_KEY})",
)
@staticmethod
def normalise_metadata(rows: Iterable[Dict[str, object]]) -> Dict[str, SmithsonianAsset]:
"""Extract basic file information from 3D API rows. Rich metadata comes from Open Access API."""
assets: Dict[str, SmithsonianAsset] = {}
for row in rows:
content = row.get("content", {})
if not isinstance(content, dict):
continue
entry = Command.resource_from_content(content)
if entry is None:
continue
model_url = content.get("model_url")
if not model_url:
continue
title = row.get("title") or "Untitled Smithsonian Model"
asset = assets.get(model_url)
if asset is None:
asset = SmithsonianAsset(
title=title,
model_url=model_url,
)
assets[model_url] = asset
if entry.uri and Command.should_include_entry(entry):
asset.add_entry(entry)
return assets
@staticmethod
def resource_from_content(content: Dict[str, object]) -> Optional[SmithsonianResource]:
uri = content.get("uri")
if not uri:
return None
return SmithsonianResource(
uri=uri,
usage=content.get("usage"),
quality=content.get("quality"),
model_type=content.get("model_type"),
file_type=content.get("file_type"),
extra={
key: value
for key, value in content.items()
if key
not in {"uri", "usage", "quality", "model_type", "file_type", "model_url"}
},
)
@staticmethod
def is_image_usage(usage: Optional[str]) -> bool:
return (usage or "").lower().startswith("image")
@classmethod
def infer_file_type(cls, entry: SmithsonianResource) -> Optional[str]:
detected = entry.extra.get("detected_file_type")
if isinstance(detected, str) and detected:
return detected.lower()
for candidate in (entry.model_type, entry.file_type):
if candidate:
detected_type = candidate.lower()
entry.extra.setdefault("detected_file_type", detected_type)
return detected_type
path = urlparse(entry.uri).path
extension = os.path.splitext(path)[1].lstrip(".").lower()
if extension:
entry.extra.setdefault("detected_file_type", extension)
return extension
return None
@classmethod
def should_include_entry(cls, entry: SmithsonianResource) -> bool:
if cls.is_image_usage(entry.usage):
return True
detected_type = cls.infer_file_type(entry)
return bool(detected_type and detected_type in SUPPORTED_FILE_TYPE_SET)
@staticmethod
def guess_content_type(uri: str, default: Optional[str] = None) -> Optional[str]:
content_type, _ = mimetypes.guess_type(uri)
if content_type:
return content_type
return default
@classmethod
def extract_unit_code(cls, record_id: Optional[str]) -> Optional[str]:
"""Extract unit code from Smithsonian record ID like 'nasm_A20120325000'."""
if not record_id:
return None
parts = record_id.split("_")
if len(parts) >= 1:
return parts[0].lower()
return None
@classmethod
def determine_category(cls, unit_code: Optional[str]) -> Optional[str]:
"""Map Smithsonian unit code to our category."""
if not unit_code:
return None
unit_lower = unit_code.lower()
# Try exact match first
category = cls.UNIT_CODE_CATEGORY_MAP.get(unit_lower)
if category:
return category
# Fallback: try prefix matching (e.g., "nmnhsomething" -> "nmnh")
# Sort by length descending to match longest prefix first
for prefix in sorted(cls.UNIT_CODE_CATEGORY_MAP.keys(), key=len, reverse=True):
if unit_lower.startswith(prefix):
return cls.UNIT_CODE_CATEGORY_MAP[prefix]
return None
@classmethod
def parse_license(cls, license_text: Optional[str]) -> Optional[str]:
"""Convert Smithsonian license text to our license constant."""
if not license_text:
return None
license_lower = license_text.lower()
if "cc0" in license_lower or "public domain" in license_lower:
return "CREATIVE_COMMONS_0"
# Default to None if we can't determine
return None
@staticmethod
def ensure_owner() -> AssetOwner:
owner, _ = AssetOwner.objects.get_or_create(
url=DEFAULT_OWNER["url"],
defaults={
"displayname": DEFAULT_OWNER["displayname"],
"imported": True,
"is_claimed": False,
},
)
return owner
@staticmethod
def asset_identifier(model_url: str) -> str:
safe_url = model_url.replace(":", "-")
return f"smithsonian-{safe_url}"
@classmethod
def determine_format_type(cls, entry: SmithsonianResource) -> Optional[str]:
detected_type = cls.infer_file_type(entry)
if not detected_type:
return None
if detected_type in {"glb", "gltf"}:
return "GLTF2"
if detected_type == "obj":
return "OBJ"
if detected_type == "stl":
return "STL"
return None
@classmethod
def determine_content_type(cls, uri: str, format_type: Optional[str]) -> Optional[str]:
guessed = cls.guess_content_type(uri)
if guessed:
return guessed
extension = os.path.splitext(urlparse(uri).path)[1].lower()
if extension == ".glb":
return "model/gltf-binary"
if extension == ".gltf":
return "model/gltf+json"
if extension == ".obj":
return "text/plain"
if extension == ".stl":
return "model/stl"
if format_type == "GLTF2":
return "model/gltf-binary"
if format_type == "OBJ":
return "text/plain"
if format_type == "STL":
return "model/stl"
return "application/octet-stream"
@staticmethod
def build_format_role(format_type: str, entry: SmithsonianResource, index: int) -> str:
parts = [format_type]
if entry.usage:
parts.append(entry.usage.upper().replace("-", "_").replace(" ", "_"))
if entry.quality:
parts.append(entry.quality.upper().replace("-", "_").replace(" ", "_"))
parts.append(str(index))
role = "SMITHSONIAN_" + "_".join(filter(None, parts))
return role[:255]
def download_thumbnail(
self, entry: SmithsonianResource
) -> Tuple[Optional[ContentFile], Optional[str], int, str]:
if not entry.uri:
return None, None, 0, "no URI provided"
try:
response = requests.get(entry.uri, timeout=60)
response.raise_for_status()
except requests.RequestException as exc: # pragma: no cover - network failure handling
logger.warning("Failed to download thumbnail %s: %s", entry.uri, exc)
return None, None, 0, f"request error: {exc}"
raw_size = len(response.content)
try:
# Process the image to normalize format and aspect ratio
with Image.open(io.BytesIO(response.content)) as im:
# Sample top-left pixel color for background
bg_color = (255, 255, 255) # default white
try:
if im.mode in ("RGB", "RGBA", "L", "LA", "P"):
pixel = im.getpixel((0, 0))
if isinstance(pixel, int):
# Grayscale
bg_color = (pixel, pixel, pixel)
elif len(pixel) >= 3:
# RGB or RGBA
bg_color = tuple(pixel[:3])
elif len(pixel) == 2:
# LA (luminance + alpha)
bg_color = (pixel[0], pixel[0], pixel[0])
except Exception:
# If sampling fails, stick with white
pass
# Ensure RGB (discard alpha on background color if present)
if im.mode in ("RGBA", "LA", "P"):
bg = Image.new("RGB", im.size, bg_color)
if im.mode == "P" and "transparency" in im.info:
im = im.convert("RGBA")
if im.mode in ("RGBA", "LA"):
alpha = im.split()[-1]
bg.paste(im.convert("RGB"), mask=alpha)
im = bg
else:
bg.paste(im.convert("RGB"))
im = bg
else:
im = im.convert("RGB")
# Fit image into an 8:5 box without upscaling image content
target_ar = 8 / 5
max_w, max_h = 1600, 1000 # upper bound for large sources
w, h = im.size
# Scale down if larger than max box; never scale up
scale = min(1.0, min(max_w / w, max_h / h))
new_w = int(w * scale)
new_h = int(h * scale)
if scale < 1.0:
im = im.resize((new_w, new_h), Image.LANCZOS)
else:
new_w, new_h = w, h
# Compute minimal padding to achieve 8:5 aspect ratio canvas
if new_w / new_h < target_ar:
canvas_w = int(round(new_h * target_ar))
canvas_h = new_h
else:
canvas_w = new_w
canvas_h = int(round(new_w / target_ar))
# Add 10% padding around the image using sampled background color
pad = int(0.1 * max(canvas_w, canvas_h))
padded_w = canvas_w + 2 * pad
padded_h = canvas_h + 2 * pad
canvas = Image.new("RGB", (padded_w, padded_h), bg_color)
# Center the image on the padded canvas
paste_x = (padded_w - canvas_w) // 2
paste_y = (padded_h - canvas_h) // 2
inner_canvas = Image.new("RGB", (canvas_w, canvas_h), bg_color)
img_x = (canvas_w - new_w) // 2
img_y = (canvas_h - new_h) // 2
inner_canvas.paste(im, (img_x, img_y))
canvas.paste(inner_canvas, (paste_x, paste_y))
# Save as JPEG
buf = io.BytesIO()
canvas.save(buf, format="JPEG", quality=90)
buf.seek(0)
processed_content = buf.read()
filename = f"thumbnail-{hashlib.sha256(entry.uri.encode('utf-8')).hexdigest()[:12]}.jpg"
content_type = "image/jpeg"
size = len(processed_content)
diagnostics = (
f"status={response.status_code}, raw_bytes={raw_size}, "
f"processed_bytes={size}, content_type={content_type}, "
f"original_size={w}x{h}, final_size={padded_w}x{padded_h}"
)
logger.debug("Processed thumbnail %s: %s", entry.uri, diagnostics)
return ContentFile(processed_content, name=filename), content_type, size, diagnostics
except Exception as exc:
logger.warning("Failed to process thumbnail image %s: %s", entry.uri, exc)
# Fall back to returning raw content if image processing fails
raw_content_type = response.headers.get("Content-Type")
if raw_content_type:
raw_content_type = raw_content_type.split(";")[0].strip()
extension = None
if raw_content_type:
extension = mimetypes.guess_extension(raw_content_type)
if not extension:
extension = os.path.splitext(urlparse(entry.uri).path)[1]
if not extension:
extension = ".jpg"
if extension == ".jpe":
extension = ".jpg"
content_type = raw_content_type or mimetypes.guess_type(f"thumbnail{extension}")[0]
filename = f"thumbnail-{hashlib.sha256(entry.uri.encode('utf-8')).hexdigest()[:12]}{extension}"
diagnostics = (
f"status={response.status_code}, bytes={raw_size}, "
f"content_type={content_type or 'unknown'}, extension={extension}, "
f"processing_error={exc}"
)
return ContentFile(response.content, name=filename), content_type, raw_size, diagnostics
def find_existing_asset(self, asset_data: SmithsonianAsset) -> Optional[Asset]:
asset_url = self.asset_identifier(asset_data.model_url)
asset = Asset.objects.filter(url=asset_url).first()
if asset:
return asset
asset = Asset.objects.filter(polydata__model_url=asset_data.model_url).first()
if asset:
return asset
model_uris = [entry.uri for entry in asset_data.model_entries if entry.uri]
if model_uris:
resource = (
Resource.objects.filter(external_url__in=model_uris)
.select_related("asset")
.first()
)
if resource:
return resource.asset
return None
def create_or_update_asset(
self,
asset_data: SmithsonianAsset,
owner: AssetOwner,
*,
verbosity: int = 1,
update_existing: bool = False,
) -> Optional[Asset]:
root_entry = asset_data.preferred_model_entry()
if root_entry is None:
raise CommandError(f"No usable model files found for {asset_data.model_url}")
asset_url = self.asset_identifier(asset_data.model_url)
asset = self.find_existing_asset(asset_data)
created = False
if asset is None:
created = True
asset = Asset(url=asset_url)
else:
# Asset already exists - skip if update_existing is False
if not update_existing:
if verbosity >= 2:
self.stdout.write(f"Skipping existing asset {asset_data.model_url}")
return None
now = timezone.now()
if created and not asset.create_time:
asset.create_time = now
asset.url = asset_url
asset.name = asset_data.title
asset.update_time = now
asset.visibility = PUBLIC
asset.state = ASSET_STATE_COMPLETE
asset.owner = owner
asset.imported_from = IMPORT_SOURCE
asset.polydata = asset_data.to_metadata()
# Set license
if asset_data.license:
parsed_license = self.parse_license(asset_data.license)
if parsed_license:
asset.license = parsed_license
# Build description from available metadata
description_parts = []
if asset_data.description:
description_parts.append(asset_data.description)
if asset_data.credit:
description_parts.append(f"Credit: {asset_data.credit}")
if description_parts:
asset.description = "\n\n".join(description_parts)
# Determine category from unit code
if asset_data.unit_code:
category = self.determine_category(asset_data.unit_code)
if category:
asset.category = category
if verbosity >= 1:
self.stdout.write(f" → Category: {category} (from unit_code: {asset_data.unit_code})")
else:
if verbosity >= 1:
self.stdout.write(f" → No category mapping for unit_code: {asset_data.unit_code}")
else:
if verbosity >= 1:
self.stdout.write(f" → No unit_code found")
if verbosity >= 1:
action = "Creating" if created else "Updating"
self.stdout.write(f"{action} asset for Smithsonian model {asset_data.model_url}")
if asset_data.license:
self.stdout.write(f" → License: {asset_data.license}")
if asset_data.description:
desc_preview = asset_data.description[:100] + "..." if len(asset_data.description) > 100 else asset_data.description
self.stdout.write(f" → Description: {desc_preview}")
asset.save()
# Add tags from Smithsonian metadata
if asset_data.tags:
if verbosity >= 2:
self.stdout.write(f" → Tags: {', '.join(asset_data.tags)}")
for tag_name in asset_data.tags:
tag, _ = Tag.objects.get_or_create(name=tag_name)
asset.tags.add(tag)
else:
if verbosity >= 2:
self.stdout.write(f" → No tags from metadata")
# Download thumbnail if asset doesn't have one or if updating existing assets
if not asset.thumbnail or update_existing:
thumbnail_entry = asset_data.preferred_image_entry()
if thumbnail_entry:
if verbosity >= 1:
self.stdout.write(f"Attempting thumbnail download from {thumbnail_entry.uri}")
file_obj, content_type, size, diagnostics = self.download_thumbnail(thumbnail_entry)
if file_obj:
asset.thumbnail.save(file_obj.name, file_obj, save=False)
asset.thumbnail_contenttype = content_type
if verbosity >= 1:
self.stdout.write(
f"Saved thumbnail {file_obj.name} ({size} bytes, content_type={content_type or 'unknown'}); {diagnostics}"
)
else:
if verbosity >= 1:
self.stdout.write(
f"Failed to download thumbnail from {thumbnail_entry.uri}; {diagnostics}"
)
else:
image_usages = sorted({(entry.usage or "unknown") for entry in asset_data.image_entries})
model_usages = sorted({(entry.usage or "unknown") for entry in asset_data.model_entries})
if verbosity >= 1:
self.stdout.write(
"No thumbnail entries available for "
f"{asset_data.model_url}; image_usages={image_usages or ['none']}, "
f"model_usages={model_usages or ['none']}"
)
elif verbosity >= 2:
self.stdout.write(f"Thumbnail already exists and --update-existing not set, skipping download")
with transaction.atomic():
asset.format_set.filter(role__startswith="SMITHSONIAN_").delete()
created_formats: List[Tuple[SmithsonianResource, Format]] = []
for index, entry in enumerate(asset_data.model_entries, start=1):
entry_format_type = self.determine_format_type(entry)
if entry_format_type is None:
if verbosity >= 2:
self.stdout.write(
"Skipping unsupported Smithsonian resource "
f"{entry.uri} for asset {asset_data.model_url}"
)
continue
format_role = self.build_format_role(entry_format_type, entry, index)
format_obj = Format.objects.create(
asset=asset,
format_type=entry_format_type,
role=format_role,
)
resource = Resource.objects.create(
asset=asset,
format=format_obj,
external_url=entry.uri,
contenttype=self.determine_content_type(entry.uri, entry_format_type),
)
format_obj.add_root_resource(resource)
created_formats.append((entry, format_obj))
if not created_formats:
raise CommandError(
f"No supported Smithsonian formats could be created for {asset_data.model_url}"
)
preferred_format = next((fmt for entry, fmt in created_formats if entry is root_entry), None)
if preferred_format:
preferred_format.is_preferred_for_gallery_viewer = True
preferred_format.save(update_fields=["is_preferred_for_gallery_viewer"])
asset.preferred_viewer_format_override = preferred_format
asset.is_viewer_compatible = True
else:
asset.preferred_viewer_format_override = None
asset.is_viewer_compatible = False
asset.update_time = timezone.now()
asset.save()
return asset
def handle(self, *args, **options):
rows = options["rows"]
rate_limit = options["rate_limit"]
max_assets = options["max_assets"]
dry_run = options["dry_run"]
fix_thumbs = options["fix_thumbs"]
update_existing = options["update_existing"]
api_key = options["api_key"]
verbosity = options.get("verbosity", 1)
client = SmithsonianAPIClient(
file_types=SUPPORTED_FILE_TYPES,
rate_limit=rate_limit,
rows_per_page=rows,
api_key=api_key,
)
owner = self.ensure_owner()
if fix_thumbs:
self.fix_missing_thumbnails(client, verbosity, dry_run)
return
imported = 0
skipped = 0
aggregated_assets: Dict[str, SmithsonianAsset] = {}
usable_asset_count = 0
stop_fetching = False
for page_rows in client.fetch():
page_assets: Dict[str, SmithsonianAsset] = {}
for model_url, asset_data in self.normalise_metadata(page_rows).items():
existing = aggregated_assets.get(model_url)
if existing:
had_models = bool(existing.model_entries)
if asset_data.title and asset_data.title != existing.title:
existing.title = asset_data.title
for entry in asset_data.model_entries:
existing.add_entry(entry)
for entry in asset_data.image_entries:
existing.add_entry(entry)
if not had_models and existing.model_entries:
usable_asset_count += 1
else:
aggregated_assets[model_url] = asset_data
page_assets[model_url] = asset_data
if asset_data.model_entries:
usable_asset_count += 1
# Process this page's assets immediately
if page_assets:
self.populate_missing_image_entries(client, page_assets, verbosity)
# Filter which assets to enrich and import
for model_url, asset_data in page_assets.items():
if not asset_data.model_entries:
if verbosity >= 2:
self.stdout.write(
f"Skipping {asset_data.model_url} because it has no usable model entries"
)
continue
# Check if we should process this asset
should_process = update_existing or self.find_existing_asset(asset_data) is None
if not should_process:
skipped += 1
if verbosity >= 2:
self.stdout.write(f"Skipping existing asset {model_url}")
continue
# Enrich with Open Access metadata
if verbosity >= 2:
self.stdout.write(f"Enriching {model_url} with Open Access metadata...")
oa_record = client.fetch_open_access_metadata(model_url)
if oa_record:
self.apply_open_access_metadata(asset_data, oa_record, verbosity)
else:
if verbosity >= 1:
self.stdout.write(f" → No Open Access metadata found for {model_url}")
# Write to database immediately
if dry_run:
self.stdout.write(f"Would import {asset_data.model_url}")
else:
result = self.create_or_update_asset(
asset_data,
owner,
verbosity=verbosity,
update_existing=update_existing,
)
if result is not None:
imported += 1
if verbosity >= 1:
self.stdout.write(f"Imported {asset_data.model_url}")
if max_assets is not None and imported >= max_assets:
self.stdout.write("Reached asset import limit")
stop_fetching = True
break
if stop_fetching:
break
if not dry_run:
if imported == 0 and skipped == 0:
self.stdout.write("No assets imported")
else:
self.stdout.write(f"Import complete: {imported} imported, {skipped} skipped")
def fix_missing_thumbnails(
self,
client: SmithsonianAPIClient,
verbosity: int,
dry_run: bool,
) -> None:
"""Download missing thumbnails for already-imported Smithsonian assets."""
from django.db.models import Q
from django.conf import settings
all_smithsonian_assets = Asset.objects.filter(
imported_from=IMPORT_SOURCE,
).select_related("owner")
# Filter assets that either have no thumbnail path OR the file doesn't exist
assets_without_thumbs = []
for asset in all_smithsonian_assets:
if not asset.thumbnail:
assets_without_thumbs.append(asset)
elif settings.LOCAL_MEDIA_STORAGE and hasattr(asset.thumbnail, 'path'):
# Check if local file exists
try:
if not os.path.exists(asset.thumbnail.path):
assets_without_thumbs.append(asset)
except (ValueError, AttributeError):
# thumbnail.path may raise ValueError if file doesn't exist
assets_without_thumbs.append(asset)
total = len(assets_without_thumbs)
if total == 0:
self.stdout.write("All Smithsonian assets already have thumbnails")
return
self.stdout.write(f"Found {total} Smithsonian assets without thumbnails (or missing files)")
fixed = 0
failed = 0
for asset in assets_without_thumbs:
model_url = asset.polydata.get("model_url") if asset.polydata else None
if not model_url:
if verbosity >= 2:
self.stdout.write(f"Skipping {asset.url}: no model_url in polydata")
failed += 1
continue
if verbosity >= 1:
self.stdout.write(f"Fetching thumbnail data for {model_url}")
try:
rows = client.fetch_by_model_url(model_url)
except Exception as exc:
self.stdout.write(f"API fetch failed for {model_url}: {exc}")
failed += 1
continue
asset_data = self.normalise_metadata(rows).get(model_url)
if not asset_data:
if verbosity >= 2:
self.stdout.write(f"No metadata found for {model_url}")
failed += 1
continue
thumbnail_entry = asset_data.preferred_image_entry()
if not thumbnail_entry:
if verbosity >= 1:
self.stdout.write(f"No thumbnail entry found for {model_url}")
failed += 1
continue
if dry_run:
self.stdout.write(f"Would download thumbnail for {asset.url} from {thumbnail_entry.uri}")
fixed += 1
continue
if verbosity >= 1:
self.stdout.write(f"Downloading thumbnail from {thumbnail_entry.uri}")
file_obj, content_type, size, diagnostics = self.download_thumbnail(thumbnail_entry)
if file_obj:
asset.thumbnail.save(file_obj.name, file_obj, save=True)
asset.thumbnail_contenttype = content_type
asset.save(update_fields=["thumbnail_contenttype"])
if verbosity >= 1:
self.stdout.write(
f"Saved thumbnail for {asset.url}: {file_obj.name} ({size} bytes); {diagnostics}"
)
fixed += 1
else:
if verbosity >= 1:
self.stdout.write(f"Failed to download thumbnail for {asset.url}; {diagnostics}")
failed += 1
self.stdout.write(f"Thumbnail fix complete: {fixed} fixed, {failed} failed")
def populate_missing_image_entries(
self,
client: SmithsonianAPIClient,
assets: Dict[str, SmithsonianAsset],
verbosity: int,
) -> None:
for asset in assets.values():
if asset.image_entries:
continue
supplementary_rows = client.fetch_by_model_url(asset.model_url)
added = 0
supplementary_usages = set()
for row in supplementary_rows:
content = row.get("content", {})
if not isinstance(content, dict):
continue
usage = content.get("usage")
if usage:
supplementary_usages.add(usage)
entry = self.resource_from_content(content)
if entry is None or not entry.uri:
continue
if not self.should_include_entry(entry):
continue
if asset.add_entry(entry):
added += 1
if verbosity >= 2:
supplementary_usage_list = sorted(supplementary_usages or {"none"})
self.stdout.write(
f"Supplementary fetch for {asset.model_url} returned "
f"{len(supplementary_rows)} rows; added {added} new entries; "
f"usages={supplementary_usage_list}"
)
if not asset.image_entries and supplementary_rows:
model_usages = sorted({(entry.usage or "unknown") for entry in asset.model_entries})
supplementary_usage_list = sorted(supplementary_usages or {"none"})
self.stdout.write(
"No image entries found for "
f"{asset.model_url} after supplementary fetch; "
f"supplementary_usages={supplementary_usage_list}, "
f"model_usages={model_usages or ['none']}"
)
def apply_open_access_metadata(
self,
asset: SmithsonianAsset,
oa_record: Dict[str, object],
verbosity: int,
) -> None:
"""Apply Open Access metadata to a single asset."""
# Extract metadata from the Open Access record
content = oa_record.get("content", {})
unit_code = oa_record.get("unitCode")
# Extract descriptiveNonRepeating fields
desc_non_rep = content.get("descriptiveNonRepeating", {})
if isinstance(desc_non_rep, dict):
if not asset.record_id:
asset.record_id = desc_non_rep.get("record_ID")
if not asset.record_link:
asset.record_link = desc_non_rep.get("record_link")
if not asset.unit_code and desc_non_rep.get("unit_code"):
asset.unit_code = desc_non_rep.get("unit_code")
# Extract title/object name
title_data = desc_non_rep.get("title", {})
if isinstance(title_data, dict):
object_name = title_data.get("content") or title_data.get("label")
if object_name and not asset.object_name:
asset.object_name = object_name
# Use top-level unitCode if not set
if not asset.unit_code and unit_code:
asset.unit_code = unit_code
# Extract freetext fields
freetext = content.get("freetext", {})
if isinstance(freetext, dict):
# Get description from notes
if not asset.description:
notes = freetext.get("notes", [])
if isinstance(notes, list):
# Combine summary and brief description
descriptions = []
for note in notes:
if isinstance(note, dict):
label = note.get("label", "").lower()
note_content = note.get("content", "")
if label in ["summary", "brief description"] and note_content:
descriptions.append(note_content)
if descriptions:
asset.description = "\n\n".join(descriptions)
# Get license/rights
if not asset.license:
rights = freetext.get("objectRights", [])
if isinstance(rights, list) and rights:
for right in rights:
if isinstance(right, dict):
rights_content = right.get("content", "")
if rights_content:
asset.license = rights_content
break
# Get credit line
if not asset.credit:
credit_line = freetext.get("creditLine", [])
if isinstance(credit_line, list) and credit_line:
for credit in credit_line:
if isinstance(credit, dict):
credit_content = credit.get("content", "")
if credit_content:
asset.credit = credit_content
break
# Extract tags from indexedStructured
indexed = content.get("indexedStructured", {})
if isinstance(indexed, dict):
tags_set = set()
# Get topic tags
topics = indexed.get("topic", [])
if isinstance(topics, list):
for topic in topics:
if isinstance(topic, str) and topic.strip():
tags_set.add(topic.strip())
# Get usage_flag tags
usage_flags = indexed.get("usage_flag", [])
if isinstance(usage_flags, list):
for flag in usage_flags:
if isinstance(flag, str) and flag.strip():
tags_set.add(flag.strip())
# Get object_type tags
object_types = indexed.get("object_type", [])
if isinstance(object_types, list):
for obj_type in object_types:
if isinstance(obj_type, str) and obj_type.strip():
tags_set.add(obj_type.strip())
# Store as sorted list
if tags_set:
asset.tags = sorted(tags_set)
if verbosity >= 1:
self.stdout.write(
f" → Open Access: unit_code={asset.unit_code}, "
f"record_id={asset.record_id}, license={asset.license}, "
f"has_description={bool(asset.description)}, tags={len(asset.tags)}"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment