Created
October 14, 2025 11:15
-
-
Save andybak/60c6c823175fff2631bbf7587d72ca23 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from django.core.management.base import BaseCommand | |
| from django.db.models import Count | |
| from icosa.models import Asset | |
| class Command(BaseCommand): | |
| help = ( | |
| "Delete assets that have no viewable formats. " | |
| "This cleans up orphaned assets that were created but never successfully imported." | |
| ) | |
| def add_arguments(self, parser): | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be deleted without actually deleting anything", | |
| ) | |
| parser.add_argument( | |
| "--yes", | |
| action="store_true", | |
| help="Skip confirmation prompt", | |
| ) | |
| parser.add_argument( | |
| "--source", | |
| dest="source", | |
| default=None, | |
| help="Only delete assets from a specific import source (e.g., 'sketchfab')", | |
| ) | |
| def handle(self, *args, **options): | |
| dry_run = options.get("dry_run", False) | |
| skip_confirm = options.get("yes", False) | |
| source = options.get("source") | |
| # Find assets with no formats | |
| assets_query = Asset.objects.annotate( | |
| format_count=Count("format_set") | |
| ).filter(format_count=0) | |
| # Filter by source if specified | |
| if source: | |
| assets_query = assets_query.filter(imported_from=source) | |
| assets = list(assets_query) | |
| count = len(assets) | |
| if count == 0: | |
| self.stdout.write(self.style.SUCCESS("No assets found without formats.")) | |
| return | |
| # Show what will be deleted | |
| self.stdout.write(f"\nFound {count} asset(s) without formats:") | |
| if options.get("verbosity", 1) >= 2: | |
| for asset in assets[:10]: # Show first 10 | |
| self.stdout.write(f" - {asset.url}: {asset.name} (source: {asset.imported_from})") | |
| if count > 10: | |
| self.stdout.write(f" ... and {count - 10} more") | |
| # Source breakdown | |
| if options.get("verbosity", 1) >= 1: | |
| sources = {} | |
| for asset in assets: | |
| source_name = asset.imported_from or "(no source)" | |
| sources[source_name] = sources.get(source_name, 0) + 1 | |
| self.stdout.write("\nBreakdown by source:") | |
| for source_name, source_count in sorted(sources.items()): | |
| self.stdout.write(f" {source_name}: {source_count}") | |
| if dry_run: | |
| self.stdout.write( | |
| self.style.WARNING(f"\n[DRY RUN] Would delete {count} asset(s). Run without --dry-run to actually delete.") | |
| ) | |
| return | |
| # Confirmation | |
| if not skip_confirm: | |
| self.stdout.write( | |
| self.style.WARNING(f"\nThis will permanently delete {count} asset(s) from the database.") | |
| ) | |
| confirm = input("Are you sure you want to continue? [y/N]: ") | |
| if confirm.lower() not in ["y", "yes"]: | |
| self.stdout.write("Cancelled.") | |
| return | |
| # Delete assets | |
| deleted_count = 0 | |
| for asset in assets: | |
| try: | |
| asset_url = asset.url | |
| asset.delete() | |
| deleted_count += 1 | |
| if options.get("verbosity", 1) >= 2: | |
| self.stdout.write(f"Deleted: {asset_url}") | |
| except Exception as exc: | |
| self.stderr.write(f"Error deleting {asset.url}: {exc}") | |
| self.stdout.write( | |
| self.style.SUCCESS(f"Successfully deleted {deleted_count} out of {count} asset(s).") | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import math | |
| import json | |
| from string import Template | |
| from typing import Dict, Optional, Iterable, List | |
| from django.core.management.base import BaseCommand, CommandError | |
| from icosa.models import Asset | |
| def _quat_from_lookat(position, target, up): | |
| try: | |
| px, py, pz = position | |
| tx, ty, tz = target | |
| ux, uy, uz = up | |
| fx, fy, fz = tx - px, ty - py, tz - pz | |
| fl = math.sqrt(fx * fx + fy * fy + fz * fz) or 1.0 | |
| fx, fy, fz = fx / fl, fy / fl, fz / fl | |
| rx, ry, rz = (fy * uz - fz * uy, fz * ux - fx * uz, fx * uy - fy * ux) | |
| rl = math.sqrt(rx * rx + ry * ry + rz * rz) or 1.0 | |
| rx, ry, rz = rx / rl, ry / rl, rz / rl | |
| ux2, uy2, uz2 = (ry * fz - rz * fy, rz * fx - rx * fz, rx * fy - ry * fx) | |
| m00, m01, m02 = rx, ux2, -fx | |
| m10, m11, m12 = ry, uy2, -fy | |
| m20, m21, m22 = rz, uz2, -fz | |
| trace = m00 + m11 + m22 | |
| if trace > 0: | |
| s = math.sqrt(trace + 1.0) * 2.0 | |
| w = 0.25 * s | |
| x = (m21 - m12) / s | |
| y = (m02 - m20) / s | |
| z = (m10 - m01) / s | |
| elif (m00 > m11) and (m00 > m22): | |
| s = math.sqrt(1.0 + m00 - m11 - m22) * 2.0 | |
| w = (m21 - m12) / s | |
| x = 0.25 * s | |
| y = (m01 + m10) / s | |
| z = (m02 + m20) / s | |
| elif m11 > m22: | |
| s = math.sqrt(1.0 + m11 - m00 - m22) * 2.0 | |
| w = (m02 - m20) / s | |
| x = (m01 + m10) / s | |
| y = 0.25 * s | |
| z = (m12 + m21) / s | |
| else: | |
| s = math.sqrt(1.0 + m22 - m00 - m11) * 2.0 | |
| w = (m10 - m01) / s | |
| x = (m02 + m20) / s | |
| y = (m12 + m21) / s | |
| z = 0.25 * s | |
| return [x, y, z, w] | |
| except Exception: | |
| return [0, 0, 0, 1] | |
| def map_viewer_snapshot_to_presentation(snapshot: Dict) -> Optional[Dict]: | |
| if not snapshot: | |
| return None | |
| camera = snapshot.get("cameraLookAt") or {} | |
| position = camera.get("position") | |
| target = camera.get("target") | |
| up = camera.get("up") or [0, 1, 0] | |
| fov_deg = snapshot.get("fov") | |
| bg = snapshot.get("background") or {} | |
| env = snapshot.get("currentEnvironment") | |
| pres: Dict = {"camera": {"type": "perspective", "perspective": {"znear": 0.1}}} | |
| if position: | |
| pres["camera"]["translation"] = position | |
| if position and target: | |
| pres["camera"]["rotation"] = _quat_from_lookat(position, target, up) | |
| pres["camera"].setdefault("GOOGLE_camera_settings", {})["pivot"] = target | |
| pres["camera"].setdefault("GOOGLE_camera_settings", {})["mode"] = "movableOrbit" | |
| if isinstance(fov_deg, (int, float)): | |
| pres["camera"].setdefault("perspective", {})["yfov"] = math.radians(fov_deg) | |
| if isinstance(bg.get("color"), list) and len(bg.get("color")) >= 3: | |
| r, g, b = bg["color"][:3] | |
| def clamp01(x): | |
| try: | |
| return max(0, min(1, float(x))) | |
| except Exception: | |
| return 0 | |
| r8 = int(round(clamp01(r) * 255)) | |
| g8 = int(round(clamp01(g) * 255)) | |
| b8 = int(round(clamp01(b) * 255)) | |
| pres["backgroundColor"] = f"#{r8:02x}{g8:02x}{b8:02x}" | |
| pres["GOOGLE_backgrounds"] = {"color": [r, g, b]} | |
| if env: | |
| pres["GOOGLE_lighting_rig"] = env | |
| pres["GOOGLE_lights_image_based"] = env | |
| pres["orientingRotation"] = {"w": 1} | |
| pres["GOOGLE_scene_rotation"] = {"rotation": [0, 0, 0, 1]} | |
| pres["GOOGLE_real_world_transform"] = {"scaling_factor": 1} | |
| return pres | |
| def fetch_sketchfab_viewer_snapshot(uid: str, timeout_ms: int = 20000) -> Optional[Dict]: | |
| try: | |
| from playwright.sync_api import sync_playwright | |
| except Exception as exc: | |
| raise CommandError("Playwright is not installed in this environment.") from exc | |
| viewer_js = "https://static.sketchfab.com/api/sketchfab-viewer-1.12.1.js" | |
| html_template = Template( | |
| """ | |
| <!doctype html><html><head><meta charset=\"utf-8\"><script src=\"$viewer_js\"></script></head> | |
| <body style=\"margin:0\"><iframe id=\"api-frame\" allow=\"autoplay; fullscreen; vr\" style=\"width:10px;height:10px;border:0\"></iframe> | |
| <script> | |
| const iframe=document.getElementById('api-frame'); | |
| const client=new window.Sketchfab(iframe); | |
| function call(api, name){return new Promise((resolve)=>{if(typeof api[name]!== 'function'){return resolve(undefined);}try{api[name]((v)=>resolve(v));}catch(e){resolve(undefined);}})} | |
| client.init('$uid', {autostart:1,ui_controls:0,ui_stop:0,success: function(api){api.addEventListener('viewerready', async function(){ | |
| const cameraLookAt=await call(api,'getCameraLookAt'); | |
| const fov=await call(api,'getFov'); | |
| const background=await call(api,'getBackground'); | |
| const currentEnvironment=await call(api,'getCurrentEnvironment'); | |
| const postProcessing=await call(api,'getPostProcessing'); | |
| const shading=await call(api,'getShading'); | |
| const viewerSettings=await call(api,'getViewerSettings'); | |
| window._snapshot={cameraLookAt,fov,background,currentEnvironment,postProcessing,shading,viewerSettings}; | |
| console.log('SNAPSHOT:'+JSON.stringify(window._snapshot)); | |
| });},error:function(){console.error('init error')}}); | |
| </script></body></html> | |
| """ | |
| ) | |
| html = html_template.substitute(uid=uid, viewer_js=viewer_js) | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| page.set_default_timeout(timeout_ms) | |
| snapshot = {} | |
| def on_console(msg): | |
| text = msg.text if isinstance(msg.text, str) else msg.text() | |
| if isinstance(text, str) and text.startswith("SNAPSHOT:"): | |
| try: | |
| snapshot.update(json.loads(text[len("SNAPSHOT:"):])) | |
| except Exception: | |
| pass | |
| page.on("console", on_console) | |
| from tempfile import NamedTemporaryFile | |
| import os | |
| with NamedTemporaryFile("w", delete=False, suffix=".html", encoding="utf-8") as f: | |
| f.write(html) | |
| html_path = f.name | |
| page.goto("file://" + os.path.abspath(html_path)) | |
| page.wait_for_timeout(12000) | |
| browser.close() | |
| return snapshot or None | |
| class Command(BaseCommand): | |
| help = "Enrich assets imported from Sketchfab with viewer presentation parameters (camera, background, environment, post-fx)." | |
| def add_arguments(self, parser): | |
| parser.add_argument("--asset", dest="assets", nargs="*", help="Asset.url values to process") | |
| parser.add_argument("--uid", dest="uids", nargs="*", help="Sketchfab model UIDs to process") | |
| parser.add_argument("--all", action="store_true", help="Process all assets imported from Sketchfab") | |
| parser.add_argument("--limit", type=int, default=None, help="Limit number of assets to process") | |
| parser.add_argument("--dry-run", action="store_true", help="Do not save; just print") | |
| def handle(self, *args, **opts): | |
| assets_arg = opts.get("assets") or [] | |
| uids_arg = opts.get("uids") or [] | |
| do_all = opts.get("all") | |
| limit = opts.get("limit") | |
| dry_run = opts.get("dry_run") | |
| targets: List[Asset] = [] | |
| if assets_arg: | |
| for aurl in assets_arg: | |
| asset = Asset.objects.filter(url=aurl).first() | |
| if asset: | |
| targets.append(asset) | |
| else: | |
| self.stderr.write(f"No asset with url={aurl}") | |
| if uids_arg: | |
| for uid in uids_arg: | |
| a = Asset.objects.filter(polydata__uid=uid).first() | |
| if a: | |
| targets.append(a) | |
| else: | |
| # Try by url convention | |
| a = Asset.objects.filter(url=f"sketchfab-{uid}").first() | |
| if a: | |
| targets.append(a) | |
| else: | |
| self.stderr.write(f"No asset found for uid={uid}") | |
| if do_all or (not targets and not uids_arg and not assets_arg): | |
| qs = Asset.objects.filter(imported_from="sketchfab").order_by("-create_time") | |
| if limit: | |
| qs = qs[:limit] | |
| targets.extend(list(qs)) | |
| if limit and len(targets) > limit: | |
| targets = targets[:limit] | |
| if not targets: | |
| self.stdout.write("Nothing to process") | |
| return | |
| processed = 0 | |
| for asset in targets: | |
| uid = None | |
| if asset.polydata and isinstance(asset.polydata, dict): | |
| uid = asset.polydata.get("uid") | |
| if not uid and asset.url and asset.url.startswith("sketchfab-"): | |
| uid = asset.url[len("sketchfab-") :] | |
| if not uid: | |
| self.stderr.write(f"Skipping {asset.url}: no Sketchfab uid found") | |
| continue | |
| self.stdout.write(f"Probing viewer for {asset.url} (uid={uid})...") | |
| snapshot = fetch_sketchfab_viewer_snapshot(uid) | |
| if not snapshot: | |
| self.stderr.write(f" → No snapshot captured") | |
| continue | |
| pres = map_viewer_snapshot_to_presentation(snapshot) | |
| if not pres: | |
| self.stderr.write(f" → No mappable presentation data") | |
| continue | |
| if dry_run: | |
| self.stdout.write(json.dumps(pres)) | |
| else: | |
| asset.presentation_params = pres | |
| asset.save(update_fields=["presentation_params"]) | |
| self.stdout.write(" → Saved presentation_params") | |
| processed += 1 | |
| self.stdout.write(self.style.SUCCESS(f"Done. Processed {processed} assets.")) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import io | |
| import mimetypes | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Iterable, List, Optional, Tuple | |
| from django.core.files.base import ContentFile | |
| from django.core.management.base import BaseCommand, CommandError | |
| from django.utils.text import slugify | |
| from django.utils import timezone | |
| from PIL import Image | |
| from icosa.helpers.file import get_content_type | |
| from icosa.helpers.snowflake import generate_snowflake | |
| from icosa.models import ( | |
| ASSET_STATE_COMPLETE, | |
| PUBLIC, | |
| Asset, | |
| AssetOwner, | |
| Format, | |
| Resource, | |
| Tag, | |
| ) | |
| from icosa.models.common import CATEGORY_LABEL_MAP | |
| IMPORT_SOURCE = "Poly Haven" | |
| def first_json_file(path: Path) -> Optional[Path]: | |
| for p in sorted(path.glob("*.json")): | |
| return p | |
| return None | |
| def pick_thumbnail_file(path: Path) -> Optional[Path]: | |
| """Only use an exact "thumbnail.webp" if present; otherwise no thumbnail.""" | |
| thumb_webp = path / "thumbnail.webp" | |
| if thumb_webp.exists() and thumb_webp.is_file(): | |
| return thumb_webp | |
| return None | |
| def pick_glb_file(path: Path) -> Optional[Path]: | |
| glbs = sorted(path.glob("*.glb")) | |
| if glbs: | |
| # If multiple, prefer one that does not look like LOD or low-res | |
| preferred = [ | |
| p | |
| for p in glbs | |
| if not any(k in p.name.lower() for k in ("lod", "low", "preview", "thumb")) | |
| ] | |
| return preferred[0] if preferred else glbs[0] | |
| return None | |
| def parse_datetime(value: Optional[str]) -> Optional[datetime]: | |
| if not value: | |
| return None | |
| try: | |
| # Try ISO first | |
| return datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| except Exception: | |
| return None | |
| def derive_license(meta: dict) -> Optional[str]: | |
| raw = None | |
| for key in ("license", "licence", "license_id", "licenseName", "license_slug"): | |
| v = meta.get(key) | |
| if v: | |
| raw = str(v) | |
| break | |
| if raw: | |
| low = raw.lower() | |
| if "cc0" in low or "public domain" in low or "creative commons 0" in low: | |
| return "CREATIVE_COMMONS_0" | |
| if "by-sa" in low: | |
| return "CREATIVE_COMMONS_BY_SA_4_0" | |
| if low in ("by", "cc-by", "creative commons by", "cc by"): | |
| return "CREATIVE_COMMONS_BY_4_0" | |
| return None | |
| class Command(BaseCommand): | |
| help = ( | |
| "Import local Poly Haven-style assets from a directory. " | |
| "Each subdirectory is treated as an asset folder; directories without a .glb are ignored." | |
| ) | |
| def add_arguments(self, parser): | |
| parser.add_argument( | |
| "--base-dir", | |
| dest="base_dir", | |
| default=os.environ.get("POLYHAVEN_DIR") or r"C:\\Users\\andyb\\3D Objects\\Poly Haven", | |
| help="Base directory containing Poly Haven asset folders", | |
| ) | |
| parser.add_argument( | |
| "--max", | |
| dest="max_items", | |
| type=int, | |
| default=None, | |
| help="Maximum number of items to import", | |
| ) | |
| parser.add_argument( | |
| "--update-existing", | |
| dest="update_existing", | |
| action="store_true", | |
| help="Update assets if they already exist", | |
| ) | |
| parser.add_argument( | |
| "--owner", | |
| dest="owner_slug", | |
| default="polyhaven", | |
| help="Owner slug to assign when author is not derivable", | |
| ) | |
| def handle(self, *args, **options): | |
| base_dir = Path(options["base_dir"]).expanduser() | |
| if not base_dir.exists() or not base_dir.is_dir(): | |
| raise CommandError(f"Base directory does not exist: {base_dir}") | |
| update_existing: bool = options.get("update_existing", False) | |
| max_items: Optional[int] = options.get("max_items") | |
| owner_slug_default: str = options.get("owner_slug") | |
| count = 0 | |
| scanned = 0 | |
| imported_dirs: List[Path] = [] | |
| for root, _dirs, _files in os.walk(base_dir): | |
| dirpath = Path(root) | |
| scanned += 1 | |
| glb = pick_glb_file(dirpath) | |
| if not glb: | |
| continue | |
| try: | |
| asset = self.create_or_update_from_dir(dirpath, glb, owner_slug_default, update_existing) | |
| if asset is not None: | |
| count += 1 | |
| imported_dirs.append(dirpath) | |
| self.stdout.write(f"Imported {asset.url} from {dirpath.name}") | |
| except CommandError as exc: | |
| self.stderr.write(f"Skipping {dirpath.name}: {exc}") | |
| if max_items is not None and count >= max_items: | |
| break | |
| self.stdout.write(self.style.SUCCESS(f"Finished. Scanned={scanned} imported={count}")) | |
| def create_or_update_from_dir( | |
| self, | |
| dirpath: Path, | |
| glb_path: Path, | |
| owner_slug_default: str, | |
| update_existing: bool, | |
| ) -> Optional[Asset]: | |
| meta_path = first_json_file(dirpath) | |
| meta: dict = {} | |
| meta_present = False | |
| if meta_path and meta_path.exists(): | |
| meta_present = True | |
| try: | |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) | |
| except Exception: | |
| meta = {} | |
| # Derive basic fields | |
| name = meta.get("name") or meta.get("title") or dirpath.name | |
| desc = meta.get("description") or meta.get("desc") | |
| # Prefer an explicit id/slug; else folder name | |
| ident = ( | |
| str(meta.get("id") or meta.get("slug") or slugify(name) or dirpath.name) | |
| .strip() | |
| .replace(" ", "-") | |
| ) | |
| asset_url = f"polyhaven-{ident}" | |
| # Owner: try author info; else default | |
| author_name = None | |
| for key in ("author", "artist", "creator"): | |
| v = meta.get(key) | |
| if isinstance(v, str) and v.strip(): | |
| author_name = v.strip() | |
| break | |
| if isinstance(v, dict): | |
| author_name = (v.get("name") or v.get("username") or v.get("id") or "").strip() or None | |
| if author_name: | |
| break | |
| if not author_name and isinstance(meta.get("authors"), list) and meta.get("authors"): | |
| first = meta["authors"][0] | |
| if isinstance(first, dict): | |
| author_name = (first.get("name") or first.get("username") or first.get("id") or "").strip() or None | |
| elif isinstance(first, str): | |
| author_name = first.strip() | |
| owner_slug = slugify(author_name) if author_name else owner_slug_default | |
| owner_display = author_name or owner_slug_default | |
| owner, _ = AssetOwner.objects.get_or_create( | |
| url=owner_slug, | |
| defaults={ | |
| "displayname": owner_display, | |
| "imported": True, | |
| "is_claimed": False, | |
| }, | |
| ) | |
| # Locate or create asset | |
| asset = Asset.objects.filter(url=asset_url).first() | |
| created = False | |
| if not asset: | |
| created = True | |
| asset = Asset(url=asset_url) | |
| else: | |
| if not update_existing: | |
| return None | |
| # Core fields | |
| created_at = parse_datetime(meta.get("created") or meta.get("created_at") or meta.get("date")) or timezone.now() | |
| updated_at = parse_datetime(meta.get("updated") or meta.get("modified") or meta.get("updated_at")) or created_at | |
| asset.name = name | |
| asset.description = desc | |
| if created and not asset.create_time: | |
| asset.create_time = created_at | |
| asset.update_time = updated_at | |
| asset.visibility = PUBLIC | |
| asset.curated = True | |
| asset.state = ASSET_STATE_COMPLETE | |
| asset.owner = owner | |
| asset.imported_from = IMPORT_SOURCE | |
| if meta_present: | |
| asset.polydata = meta | |
| # All Poly Haven assets are CC0 | |
| asset.license = "CREATIVE_COMMONS_0" | |
| # Category | |
| cat_name = None | |
| cats = meta.get("categories") or meta.get("category") | |
| if isinstance(cats, list) and cats: | |
| c0 = cats[0] | |
| cat_name = c0.get("name") if isinstance(c0, dict) else str(c0) | |
| elif isinstance(cats, str): | |
| cat_name = cats | |
| if cat_name: | |
| key = str(cat_name).strip().lower() | |
| asset.category = CATEGORY_LABEL_MAP.get(key) | |
| # Assign id for new assets | |
| if created: | |
| asset.id = generate_snowflake() | |
| asset.save() | |
| # Tags | |
| tags_raw: Iterable = meta.get("tags") or meta.get("keywords") or [] | |
| tag_names: List[str] = [] | |
| for t in tags_raw: | |
| if isinstance(t, dict): | |
| tag_names.append(t.get("name") or t.get("slug")) | |
| elif isinstance(t, str): | |
| tag_names.append(t) | |
| tag_objs = [] | |
| for name in filter(None, set(tag_names)): | |
| tag, _ = Tag.objects.get_or_create(name=name) | |
| tag_objs.append(tag) | |
| if tag_objs: | |
| asset.tags.set(tag_objs) | |
| # Thumbnail | |
| thumb_path = pick_thumbnail_file(dirpath) | |
| if thumb_path and ((not asset.thumbnail) or update_existing): | |
| # Convert webp to jpeg to satisfy thumbnail validators | |
| if thumb_path.suffix.lower() == ".webp": | |
| with Image.open(thumb_path) as im: | |
| # Ensure RGB (discard alpha on white background if present) | |
| if im.mode in ("RGBA", "LA"): | |
| bg = Image.new("RGB", im.size, (255, 255, 255)) | |
| alpha = im.split()[-1] if im.mode in ("RGBA", "LA") else None | |
| if alpha is not None: | |
| bg.paste(im.convert("RGB"), mask=alpha) | |
| else: | |
| bg.paste(im.convert("RGB")) | |
| im = bg | |
| else: | |
| im = im.convert("RGB") | |
| # Fit image into an 8:5 box without upscaling image content. | |
| target_ar = 8 / 5 | |
| max_w, max_h = 1600, 1000 # upper bound for large sources | |
| w, h = im.size | |
| # Scale down if larger than max box; never scale up | |
| scale = min(1.0, min(max_w / w, max_h / h)) | |
| new_w = int(w * scale) | |
| new_h = int(h * scale) | |
| if scale < 1.0: | |
| im = im.resize((new_w, new_h), Image.LANCZOS) | |
| else: | |
| new_w, new_h = w, h | |
| # Compute minimal padding to achieve 8:5 aspect ratio canvas | |
| if new_w / new_h < target_ar: | |
| canvas_w = int(round(new_h * target_ar)) | |
| canvas_h = new_h | |
| else: | |
| canvas_w = new_w | |
| canvas_h = int(round(new_w / target_ar)) | |
| # Add 10% white padding around the image | |
| pad = int(0.1 * max(canvas_w, canvas_h)) | |
| padded_w = canvas_w + 2 * pad | |
| padded_h = canvas_h + 2 * pad | |
| canvas = Image.new("RGB", (padded_w, padded_h), (255, 255, 255)) | |
| # Center the image on the padded canvas | |
| paste_x = (padded_w - canvas_w) // 2 | |
| paste_y = (padded_h - canvas_h) // 2 | |
| inner_canvas = Image.new("RGB", (canvas_w, canvas_h), (255, 255, 255)) | |
| img_x = (canvas_w - new_w) // 2 | |
| img_y = (canvas_h - new_h) // 2 | |
| inner_canvas.paste(im, (img_x, img_y)) | |
| canvas.paste(inner_canvas, (paste_x, paste_y)) | |
| buf = io.BytesIO() | |
| canvas.save(buf, format="JPEG", quality=90) | |
| buf.seek(0) | |
| jpg_name = thumb_path.with_suffix(".jpg").name | |
| asset.thumbnail.save(jpg_name, ContentFile(buf.read()), save=False) | |
| asset.thumbnail_contenttype = "image/jpeg" | |
| else: | |
| # Guess content type and save | |
| content_type = get_content_type(thumb_path.name) or mimetypes.guess_type(thumb_path.name)[0] or "image/jpeg" | |
| asset.thumbnail.save(thumb_path.name, ContentFile(thumb_path.read_bytes()), save=False) | |
| asset.thumbnail_contenttype = content_type | |
| asset.save() | |
| # Formats/resources: attach GLB as primary format (avoid duplicates) | |
| existing_glb = asset.format_set.filter(format_type="GLB").last() | |
| if not existing_glb: | |
| fmt = Format.objects.create(asset=asset, format_type="GLB", role="POLYHAVEN_GLB") | |
| glb_bytes = glb_path.read_bytes() | |
| content_type = get_content_type(glb_path.name) or mimetypes.guess_type(glb_path.name)[0] or "application/octet-stream" | |
| res = Resource(asset=asset, format=fmt, contenttype=content_type) | |
| res.file.save(glb_path.name, ContentFile(glb_bytes), save=True) | |
| fmt.add_root_resource(res) | |
| # Assign preferred viewer format and save | |
| asset.assign_preferred_viewer_format() | |
| asset.save() | |
| return asset |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import time | |
| import mimetypes | |
| import zipfile | |
| import io | |
| from os.path import basename | |
| from datetime import datetime | |
| from typing import Dict, Generator, Iterable, List, Optional | |
| import requests | |
| from django.core.files.base import ContentFile | |
| from django.core.management.base import BaseCommand, CommandError | |
| from django.utils import timezone | |
| from icosa.helpers.file import ( | |
| get_content_type, | |
| validate_file, | |
| process_main_file, | |
| UploadedFormat, | |
| ) | |
| from django.core.files.uploadedfile import SimpleUploadedFile | |
| from icosa.helpers.snowflake import generate_snowflake | |
| from icosa.models import ( | |
| ASSET_STATE_COMPLETE, | |
| PUBLIC, | |
| Asset, | |
| AssetOwner, | |
| Format, | |
| Resource, | |
| Tag, | |
| ) | |
| from icosa.models.common import CATEGORY_LABEL_MAP | |
| IMPORT_SOURCE = "sketchfab" | |
| def parse_iso8601(ts: Optional[str]) -> Optional[datetime]: | |
| if not ts: | |
| return None | |
| try: | |
| # Sketchfab returns ISO8601 strings; parse and ensure timezone-aware | |
| dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) | |
| # Ensure the datetime is timezone-aware (Django requires this) | |
| if dt.tzinfo is None: | |
| dt = timezone.make_aware(dt, timezone.utc) | |
| return dt | |
| except Exception: | |
| return None | |
| def sketchfab_license_to_internal(slug: Optional[str]) -> Optional[str]: | |
| """Map Sketchfab license slugs to internal icosa license codes. | |
| Supported defaults: | |
| - cc0 -> CREATIVE_COMMONS_0 | |
| - by -> CREATIVE_COMMONS_BY_4_0 | |
| Other Sketchfab licenses are currently not mapped to icosa choices by default. | |
| """ | |
| if not slug: | |
| return None | |
| slug = slug.lower().strip() | |
| if slug == "cc0": | |
| return "CREATIVE_COMMONS_0" | |
| if slug == "by": | |
| # Sketchfab uses CC BY 4.0 today for the BY family. | |
| return "CREATIVE_COMMONS_BY_4_0" | |
| if slug == "by-sa": | |
| return "CREATIVE_COMMONS_BY_SA_4_0" | |
| if slug == "by-nd": | |
| return "CREATIVE_COMMONS_BY_ND_4_0" | |
| if slug == "by-nc": | |
| return "CREATIVE_COMMONS_NC_4_0" | |
| if slug == "by-nc-sa": | |
| return "CREATIVE_COMMONS_NC_SA_4_0" | |
| if slug == "by-nc-nd": | |
| return "CREATIVE_COMMONS_NC_ND_4_0" | |
| # Unhandled licenses (by-nc, by-nd, by-sa, etc.) are not mapped | |
| return None | |
| def pick_thumbnail_url(model: Dict) -> Optional[str]: | |
| thumbs = (model or {}).get("thumbnails", {}).get("images", []) | |
| if not thumbs: | |
| return None | |
| # Choose the largest width image available | |
| thumbs_sorted = sorted(thumbs, key=lambda x: x.get("width", 0), reverse=True) | |
| return thumbs_sorted[0].get("url") | |
| class SketchfabClient: | |
| BASE = "https://api.sketchfab.com/v3" | |
| def __init__(self, token: Optional[str] = None, timeout: int = 30): | |
| self.token = token | |
| self.timeout = timeout | |
| self.session = requests.Session() | |
| if token: | |
| self.session.headers.update({"Authorization": f"Token {token}"}) | |
| def paged(self, url: str, params: Dict) -> Generator[Dict, None, None]: | |
| next_url = url | |
| next_params = params.copy() | |
| while next_url: | |
| resp = self.session.get(next_url, params=next_params, timeout=self.timeout) | |
| if resp.status_code != 200: | |
| raise CommandError(f"Sketchfab request failed: {resp.status_code} {resp.text}") | |
| data = resp.json() | |
| for item in data.get("results", []): | |
| yield item | |
| next_url = data.get("next") | |
| next_params = {} | |
| # Be nice to the API | |
| time.sleep(0.1) | |
| def search_models( | |
| self, | |
| *, | |
| licenses: Iterable[str], | |
| user: Optional[str] = None, | |
| downloadable: bool = True, | |
| per_page: int = 24, | |
| sort_by: str = "-publishedAt", | |
| ) -> Generator[Dict, None, None]: | |
| params = { | |
| "type": "models", | |
| "licenses": ",".join(licenses), | |
| "per_page": per_page, | |
| "downloadable": str(downloadable).lower(), | |
| "sort_by": sort_by, | |
| } | |
| # The search API accepts a 'user' filter by username. | |
| if user: | |
| params["user"] = user | |
| url = f"{self.BASE}/search" | |
| yield from self.paged(url, params) | |
| def list_user_models( | |
| self, | |
| *, | |
| user: str, | |
| licenses: Optional[Iterable[str]] = None, | |
| downloadable: bool = True, | |
| per_page: int = 24, | |
| sort_by: str = "-publishedAt", | |
| ) -> Generator[Dict, None, None]: | |
| """List models for a user via the search endpoint. | |
| Sketchfab's /models endpoint does not accept a user filter reliably; the documented | |
| approach is the /search API with `type=models` and `user=<username>`. | |
| """ | |
| params = { | |
| "type": "models", | |
| "user": user, | |
| "per_page": per_page, | |
| "sort_by": sort_by, | |
| } | |
| if licenses: | |
| params["licenses"] = ",".join(licenses) | |
| if downloadable is not None: | |
| params["downloadable"] = str(downloadable).lower() | |
| url = f"{self.BASE}/search" | |
| yield from self.paged(url, params) | |
| def download_info(self, uid: str, *, max_retries: int = 5) -> Optional[Dict]: | |
| """Return download info for a model, if accessible. | |
| Response typically contains keys like 'gltf', 'glb', 'usdz', 'source', each with a 'url'. | |
| Requires a valid token for most models even if downloadable is true. | |
| """ | |
| for attempt in range(max_retries): | |
| resp = self.session.get(f"{self.BASE}/models/{uid}/download", timeout=self.timeout) | |
| if resp.status_code == 401: | |
| # Unauthorized; token required | |
| return None | |
| if resp.status_code == 429: | |
| # Rate limited - check for Retry-After header | |
| retry_after = resp.headers.get("Retry-After") | |
| if retry_after and retry_after.isdigit(): | |
| wait_time = int(retry_after) | |
| else: | |
| # Exponential backoff with longer waits: 5s, 10s, 20s, 40s, 80s | |
| wait_time = 5 * (2 ** attempt) | |
| print(f"Rate limited on {uid}, waiting {wait_time}s before retry {attempt + 1}/{max_retries}") | |
| time.sleep(wait_time) | |
| continue | |
| if resp.status_code == 200: | |
| return resp.json() | |
| # Other error - log and return None | |
| print(f"DEBUG: download_info({uid}) failed with status {resp.status_code}") | |
| print(f"DEBUG: Response body: {resp.text[:500]}") # First 500 chars | |
| return None | |
| # All retries exhausted | |
| print(f"DEBUG: download_info({uid}) failed after {max_retries} retries due to rate limiting") | |
| return None | |
| class Command(BaseCommand): | |
| help = ( | |
| "Import assets from Sketchfab using their API. " | |
| "Allows filtering by user and license. Defaults to CC0, CC-BY, and CC-BY-SA." | |
| ) | |
| def add_arguments(self, parser): | |
| parser.add_argument( | |
| "--user", | |
| dest="users", | |
| metavar="USERNAME", | |
| action="append", | |
| default=[], | |
| help="Sketchfab username to filter by (can be provided multiple times)", | |
| ) | |
| parser.add_argument( | |
| "--license", | |
| dest="licenses", | |
| default="cc0,by,by-sa", | |
| help=( | |
| "Comma-separated Sketchfab license slugs to include. " | |
| "Defaults to 'cc0,by,by-sa' (CC0 Public Domain, CC BY 4.0, CC BY-SA 4.0)." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--max", | |
| dest="max_items", | |
| type=int, | |
| default=None, | |
| help="Maximum number of models to import", | |
| ) | |
| parser.add_argument( | |
| "--token", | |
| dest="token", | |
| default=os.environ.get("SKETCHFAB_TOKEN") or os.environ.get("DJANGO_SKETCHFAB_TOKEN"), | |
| help="Sketchfab API token (or set SKETCHFAB_TOKEN env)", | |
| ) | |
| parser.add_argument( | |
| "--update-existing", | |
| dest="update_existing", | |
| action="store_true", | |
| help="Update models if they already exist", | |
| ) | |
| parser.add_argument( | |
| "--delay", | |
| dest="delay", | |
| type=float, | |
| default=1.0, | |
| help="Delay in seconds between model imports to avoid rate limiting (default: 1.0)", | |
| ) | |
| def handle(self, *args, **options): | |
| users: List[str] = options["users"] or [] | |
| # Normalize user-provided license slugs (accept cc-by-sa -> by-sa) | |
| raw_licenses = options["licenses"] or "cc0,by,by-sa" | |
| licenses_in = [x.strip().lower() for x in raw_licenses.split(",") if x.strip()] | |
| licenses = [] | |
| for l in licenses_in: | |
| if l in ("cc-by", "cc_by", "by-4.0", "by4.0"): | |
| licenses.append("by") | |
| elif l in ("cc-by-sa", "cc_by_sa", "by-sa", "bysa", "by-sa-4.0"): | |
| licenses.append("by-sa") | |
| else: | |
| licenses.append(l) | |
| max_items = options.get("max_items") | |
| token = options.get("token") | |
| update_existing = options.get("update_existing", False) | |
| delay = options.get("delay", 1.0) | |
| client = SketchfabClient(token=token) | |
| count = 0 | |
| seen = 0 | |
| eligible = 0 | |
| targets: Iterable[Dict] | |
| if users: | |
| # Iterate per-user, filtering by license locally if needed | |
| def iter_all(): | |
| for user in users: | |
| if options.get("verbosity", 1) >= 2: | |
| self.stdout.write(f"Querying user='{user}' licenses={licenses} downloadable=true") | |
| for model in client.list_user_models(user=user, licenses=licenses, downloadable=True): | |
| yield model | |
| targets = iter_all() | |
| else: | |
| # Global search with license filter | |
| targets = client.search_models(licenses=licenses) | |
| for model in targets: | |
| seen += 1 | |
| # Enforce license filter if the endpoint didn't do it for us | |
| lic = (model.get("license") or {}).get("label") | |
| lic_slug = None | |
| if lic: | |
| # Derive a slug-like form from label when not present | |
| l = lic.lower() | |
| if "cc0" in l or "public domain" in l: | |
| lic_slug = "cc0" | |
| elif "sharealike" in l or "share alike" in l: | |
| lic_slug = "by-sa" | |
| elif "attribution" in l and "no" not in l and "non" not in l: | |
| # Heuristic for CC BY | |
| lic_slug = "by" | |
| if users and licenses and lic_slug and lic_slug not in licenses: | |
| if options.get("verbosity", 1) >= 3: | |
| self.stdout.write(f"Skipping by license: {model.get('uid')} label={lic}") | |
| continue | |
| uid = model.get("uid") | |
| if not uid: | |
| continue | |
| # If max reached, stop early | |
| if max_items is not None and count >= max_items: | |
| break | |
| # Skip non-downloadable models when we cannot fetch direct file URLs | |
| if not model.get("isDownloadable", False): | |
| if options.get("verbosity", 1) >= 2: | |
| self.stdout.write(f"Skipping not-downloadable: {model.get('uid')} {model.get('name')}") | |
| continue | |
| eligible += 1 | |
| try: | |
| asset = self.create_or_update_asset_from_model(client, model, update_existing=update_existing) | |
| if asset is not None: | |
| count += 1 | |
| self.stdout.write(f"Imported {asset.url} ({asset.name})") | |
| # Rate limit: wait between models to avoid overwhelming the API | |
| time.sleep(delay) | |
| except CommandError as exc: | |
| self.stderr.write(f"Skipping {uid}: {exc}") | |
| # Brief delay even on errors to respect rate limits | |
| time.sleep(delay * 0.5) | |
| if options.get("verbosity", 1) >= 1: | |
| self.stdout.write(f"Seen={seen}, eligible(downloadable+license)={eligible}, imported={count}") | |
| self.stdout.write(self.style.SUCCESS(f"Finished. Imported {count} models.")) | |
| def create_or_update_asset_from_model( | |
| self, | |
| client: SketchfabClient, | |
| model: Dict, | |
| *, | |
| update_existing: bool = False, | |
| ) -> Optional[Asset]: | |
| uid = model.get("uid") | |
| if not uid: | |
| raise CommandError("Missing uid in model data") | |
| asset_url = f"sketchfab-{uid}" | |
| # Lookup existing | |
| asset = Asset.objects.filter(url=asset_url).first() | |
| created = False | |
| if not asset: | |
| created = True | |
| asset = Asset(url=asset_url) | |
| else: | |
| if not update_existing: | |
| # Nothing to do | |
| return None | |
| # Check download availability BEFORE creating the asset to avoid orphaned records | |
| download = client.download_info(uid) | |
| if not download: | |
| raise CommandError( | |
| "Could not fetch download URLs. Ensure the model is downloadable and a valid token is provided via --token or SKETCHFAB_TOKEN." | |
| ) | |
| # Prepare owner | |
| user = model.get("user") or {} | |
| username = (user.get("username") or "").strip() or f"user-{user.get('uid','unknown')}" | |
| displayname = user.get("displayName") or username | |
| # Find a unique owner URL, trying username first, then username-1, username-2, etc. | |
| owner_url = username | |
| suffix = 1 | |
| while True: | |
| owner = AssetOwner.objects.filter(url=owner_url).first() | |
| if owner is None: | |
| # URL is available, create new owner | |
| owner = AssetOwner.objects.create( | |
| url=owner_url, | |
| displayname=displayname, | |
| imported=True, | |
| is_claimed=False, | |
| ) | |
| if suffix > 1: | |
| print(f"WARNING: Username '{username}' already exists, created owner with URL '{owner_url}'") | |
| break | |
| elif owner.imported and owner.displayname == displayname: | |
| # Same owner already exists (from previous import), reuse it | |
| break | |
| else: | |
| # Conflict with different owner, try next suffix | |
| owner_url = f"{username}-{suffix}" | |
| suffix += 1 | |
| if suffix > 100: | |
| # Safety valve | |
| raise CommandError(f"Could not find unique owner URL for username '{username}' after 100 attempts") | |
| # Timestamps | |
| created_at = parse_iso8601(model.get("createdAt")) or timezone.now() | |
| updated_at = parse_iso8601(model.get("publishedAt")) or created_at | |
| # Map license | |
| license_label = (model.get("license") or {}).get("label") | |
| license_slug = None | |
| if license_label: | |
| low = license_label.lower() | |
| if "cc0" in low or "public domain" in low: | |
| license_slug = "cc0" | |
| elif "sharealike" in low or "share alike" in low: | |
| license_slug = "by-sa" | |
| elif "attribution" in low and "no" not in low and "non" not in low: | |
| license_slug = "by" | |
| internal_license = sketchfab_license_to_internal(license_slug) | |
| # Core fields | |
| if created and not asset.create_time: | |
| asset.create_time = created_at | |
| asset.update_time = updated_at | |
| asset.name = model.get("name") | |
| asset.description = model.get("description") | |
| asset.visibility = PUBLIC | |
| asset.state = ASSET_STATE_COMPLETE | |
| asset.owner = owner | |
| asset.imported_from = IMPORT_SOURCE | |
| asset.polydata = model # Store raw sketchfab metadata | |
| asset.historical_likes = int(model.get("likeCount") or 0) | |
| asset.historical_views = int(model.get("viewCount") or 0) | |
| if internal_license: | |
| asset.license = internal_license | |
| # Category mapping (first category name if provided) | |
| cat_name = None | |
| cats = model.get("categories") or [] | |
| if cats: | |
| # categories sometimes carry only name strings | |
| c0 = cats[0] | |
| if isinstance(c0, dict): | |
| cat_name = c0.get("name") | |
| elif isinstance(c0, str): | |
| cat_name = c0 | |
| if cat_name: | |
| key = str(cat_name).strip().lower() | |
| asset.category = CATEGORY_LABEL_MAP.get(key) | |
| # Assign an id for new assets | |
| if created: | |
| asset.id = generate_snowflake() | |
| asset.save() | |
| # Tags | |
| tags = model.get("tags") or [] | |
| tag_names = [] | |
| for t in tags: | |
| if isinstance(t, dict): | |
| tag_names.append(t.get("name") or t.get("slug")) | |
| elif isinstance(t, str): | |
| tag_names.append(t) | |
| tag_objs = [] | |
| for name in filter(None, set(tag_names)): | |
| tag, _ = Tag.objects.get_or_create(name=name) | |
| tag_objs.append(tag) | |
| if tag_objs: | |
| asset.tags.set(tag_objs) | |
| # Thumbnail: download and store locally if possible | |
| if not asset.thumbnail: | |
| thumb_url = pick_thumbnail_url(model) | |
| if thumb_url: | |
| try: | |
| resp = requests.get(thumb_url, timeout=20) | |
| if resp.status_code == 200: | |
| content_type = resp.headers.get("Content-Type") | |
| ext = mimetypes.guess_extension(content_type or "") or ".jpg" | |
| if ext == ".jpe": | |
| ext = ".jpg" | |
| filename = f"thumbnail-{uid}{ext}" | |
| asset.thumbnail.save(filename, ContentFile(resp.content), save=False) | |
| asset.thumbnail_contenttype = content_type or "image/jpeg" | |
| asset.save() | |
| except Exception: | |
| # Non-fatal | |
| pass | |
| # Formats/resources: prefer GLB if available, and download into storage | |
| # (download info already fetched and validated earlier) | |
| created_any_format = False | |
| def download_to_contentfile(url: str, *, timeout: int = 60) -> Optional[ContentFile]: | |
| try: | |
| resp = requests.get(url, timeout=timeout) | |
| if resp.status_code != 200: | |
| return None | |
| return ContentFile(resp.content) | |
| except Exception: | |
| return None | |
| def add_format_from_url(url: str, fmt_type: str, *, role: Optional[str] = None, filename: Optional[str] = None): | |
| nonlocal created_any_format | |
| data = download_to_contentfile(url) | |
| if not data: | |
| return | |
| # Infer filename and content type | |
| content_type = None | |
| try: | |
| # attempt to fetch content type via HEAD for better accuracy | |
| head = requests.head(url, timeout=15, allow_redirects=True) | |
| content_type = head.headers.get("Content-Type") | |
| except Exception: | |
| pass | |
| guessed_ext = mimetypes.guess_extension(content_type or "") or os.path.splitext(url.split("?")[0])[1] or ".bin" | |
| if guessed_ext == ".jpe": | |
| guessed_ext = ".jpg" | |
| name = filename or f"{fmt_type.lower()}-{uid}{guessed_ext}" | |
| fmt = Format.objects.create(asset=asset, format_type=fmt_type, role=role) | |
| # Saving file to storage via FileField | |
| res = Resource(asset=asset, format=fmt, contenttype=content_type or get_content_type(name) or "application/octet-stream") | |
| res.file.save(name, data, save=True) | |
| fmt.add_root_resource(res) | |
| created_any_format = True | |
| def add_formats_from_zip(url: str, *, preferred_ext_order: Optional[List[str]] = None): | |
| nonlocal created_any_format | |
| if preferred_ext_order is None: | |
| preferred_ext_order = [ | |
| "glb", | |
| "gltf", | |
| "fbx", | |
| "obj", | |
| "usdz", | |
| "ply", | |
| "stl", | |
| "vox", | |
| "tilt", | |
| "blocks", | |
| ] | |
| try: | |
| resp = requests.get(url, timeout=90) | |
| if resp.status_code != 200: | |
| return | |
| zf = zipfile.ZipFile(io.BytesIO(resp.content)) | |
| except Exception: | |
| return | |
| # Build UploadedFormats from zip members | |
| uploaded: List[UploadedFormat] = [] | |
| for info in zf.infolist(): | |
| if info.is_dir(): | |
| continue | |
| fname = info.filename | |
| # Ignore hidden or MACOSX metadata | |
| base = basename(fname) | |
| if not base or base.startswith(".__") or "/." in fname or base.startswith("."): | |
| continue | |
| try: | |
| with zf.open(info) as fp: | |
| data = fp.read() | |
| except Exception: | |
| continue | |
| # Construct an in-memory uploaded file | |
| su = SimpleUploadedFile(base, data, content_type=get_content_type(base) or "application/octet-stream") | |
| ext = base.split(".")[-1].lower() if "." in base else "" | |
| details = validate_file(su, ext) | |
| if details is not None: | |
| uploaded.append(details) | |
| if not uploaded: | |
| return | |
| # Choose mainfile by extension preference first, then by mainfile flag | |
| def pref_index(ext: str) -> int: | |
| try: | |
| return preferred_ext_order.index(ext) | |
| except ValueError: | |
| return len(preferred_ext_order) + 100 | |
| # Filter potential mains | |
| mains = [u for u in uploaded if u.mainfile] | |
| if not mains: | |
| mains = uploaded | |
| # Choose by extension order on the original filename | |
| mains_sorted = sorted(mains, key=lambda u: pref_index(u.file.name.split(".")[-1].lower())) | |
| main = mains_sorted[0] | |
| subs = [u for u in uploaded if u is not main] | |
| # Hand off to existing helper to build Format + Resources in storage | |
| process_main_file(main, subs, asset, gltf_to_convert=None) | |
| created_any_format = True | |
| # The download payload usually has entries like {'glb': {'url': ...}, 'gltf': {'url': ...}, 'usdz': {'url': ...}} | |
| glb_url = (download.get("glb") or {}).get("url") | |
| if glb_url: | |
| add_format_from_url(glb_url, "GLB", role="SKETCHFAB_GLB") | |
| # Provide USDZ if present (not viewer-preferred, but useful to store) | |
| usdz_url = (download.get("usdz") or {}).get("url") | |
| if usdz_url: | |
| add_format_from_url(usdz_url, "USDZ", role="SKETCHFAB_USDZ") | |
| # GLTF archive (zip): unpack to root + resources | |
| gltf_url = (download.get("gltf") or {}).get("url") | |
| if gltf_url: | |
| add_formats_from_zip(gltf_url, preferred_ext_order=["gltf", "glb", "fbx", "obj"]) # prefer GLTF as main | |
| # Source archive (zip): prefer FBX, then OBJ, then others | |
| source_url = (download.get("source") or {}).get("url") | |
| if source_url: | |
| add_formats_from_zip(source_url, preferred_ext_order=["fbx", "obj", "gltf", "glb", "ply", "stl"]) # prefer authoring formats | |
| # Assign preferred viewer format if possible | |
| asset.assign_preferred_viewer_format() | |
| # Final save in case any denorms/validations occur | |
| asset.save() | |
| return asset |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import hashlib | |
| import io | |
| import logging | |
| import mimetypes | |
| import os | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Iterable, List, Optional, Set, Tuple | |
| from urllib.parse import urlparse | |
| import requests | |
| from django.core.management.base import BaseCommand, CommandError | |
| from django.db import transaction | |
| from django.utils import timezone | |
| from django.core.files.base import ContentFile | |
| from PIL import Image | |
| from icosa.models import ( | |
| ASSET_STATE_COMPLETE, | |
| PUBLIC, | |
| Asset, | |
| AssetOwner, | |
| Format, | |
| Resource, | |
| Tag, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| SUPPORTED_FILE_TYPES: Tuple[str, ...] = ("glb", "gltf", "obj", "stl") | |
| SUPPORTED_FILE_TYPE_SET = set(SUPPORTED_FILE_TYPES) | |
| IMPORT_SOURCE = "smithsonian" | |
| API_URL = "https://3d-api.si.edu/api/v1.0/content/file/search" | |
| OPEN_ACCESS_API_URL = "https://api.si.edu/openaccess/api/v1.0/search" | |
| DEFAULT_API_KEY = "DEMO_KEY" # Can be overridden with --api-key | |
| DEFAULT_OWNER = { | |
| "url": "smithsonian", | |
| "displayname": "Smithsonian 3D" | |
| } | |
| @dataclass | |
| class SmithsonianResource: | |
| uri: str | |
| usage: Optional[str] | |
| quality: Optional[str] | |
| model_type: Optional[str] | |
| file_type: Optional[str] | |
| extra: Dict[str, object] = field(default_factory=dict) | |
| @dataclass | |
| class SmithsonianAsset: | |
| title: str | |
| model_url: str | |
| model_entries: List[SmithsonianResource] = field(default_factory=list) | |
| image_entries: List[SmithsonianResource] = field(default_factory=list) | |
| seen_uris: Set[str] = field(default_factory=set, repr=False) | |
| record_id: Optional[str] = None | |
| record_link: Optional[str] = None | |
| unit_code: Optional[str] = None | |
| object_name: Optional[str] = None | |
| description: Optional[str] = None | |
| license: Optional[str] = None | |
| credit: Optional[str] = None | |
| tags: List[str] = field(default_factory=list) | |
| additional_metadata: Dict[str, object] = field(default_factory=dict) | |
| def to_metadata(self) -> Dict[str, object]: | |
| """Return serialisable metadata for storage on the Asset.""" | |
| metadata = { | |
| "title": self.title, | |
| "model_url": self.model_url, | |
| "models": [entry.__dict__ for entry in self.model_entries], | |
| "images": [entry.__dict__ for entry in self.image_entries], | |
| } | |
| # Add rich metadata fields if present | |
| if self.record_id: | |
| metadata["record_id"] = self.record_id | |
| if self.record_link: | |
| metadata["record_link"] = self.record_link | |
| if self.unit_code: | |
| metadata["unit_code"] = self.unit_code | |
| if self.object_name: | |
| metadata["object_name"] = self.object_name | |
| if self.description: | |
| metadata["description"] = self.description | |
| if self.license: | |
| metadata["license"] = self.license | |
| if self.credit: | |
| metadata["credit"] = self.credit | |
| if self.additional_metadata: | |
| metadata["additional_metadata"] = self.additional_metadata | |
| return metadata | |
| def add_entry(self, entry: SmithsonianResource) -> bool: | |
| """Add an entry to the asset if it hasn't been seen already.""" | |
| uri = entry.uri | |
| if uri and uri in self.seen_uris: | |
| return False | |
| if uri: | |
| self.seen_uris.add(uri) | |
| usage = (entry.usage or "").lower() | |
| if usage.startswith("image"): | |
| self.image_entries.append(entry) | |
| else: | |
| self.model_entries.append(entry) | |
| return True | |
| def preferred_model_entry(self) -> Optional[SmithsonianResource]: | |
| """Return the best candidate to use as the root resource.""" | |
| if not self.model_entries: | |
| return None | |
| def sort_key(entry: SmithsonianResource) -> tuple: | |
| usage_priority = { | |
| "web3d": 0, | |
| "app3d": 1, | |
| "download3d": 2, | |
| }.get((entry.usage or "").lower(), 3) | |
| quality_priority_map = { | |
| "high": 0, | |
| "medium": 1, | |
| "ar": 2, | |
| "low": 3, | |
| "full_resolution": 4, | |
| "thumb": 5, | |
| } | |
| quality_priority = quality_priority_map.get((entry.quality or "").lower(), 6) | |
| # When priorities match, prefer longer urls (heuristic for higher fidelity variants). | |
| return (usage_priority, quality_priority, -(len(entry.uri) if entry.uri else 0)) | |
| return sorted(self.model_entries, key=sort_key)[0] | |
| def preferred_image_entry(self) -> Optional[SmithsonianResource]: | |
| """Return the best candidate thumbnail image.""" | |
| if not self.image_entries: | |
| return None | |
| def sort_key(entry: SmithsonianResource) -> tuple: | |
| usage_priority = { | |
| "image_thumb": 0, | |
| "image_thumbnail": 0, | |
| "image_small": 1, | |
| "image_medium": 2, | |
| "image_large": 3, | |
| "image_master": 4, | |
| }.get((entry.usage or "").lower(), 5) | |
| quality_priority = { | |
| "thumb": 0, | |
| "low": 1, | |
| "medium": 2, | |
| "high": 3, | |
| "full_resolution": 4, | |
| }.get((entry.quality or "").lower(), 5) | |
| return (usage_priority, quality_priority, -(len(entry.uri) if entry.uri else 0)) | |
| return sorted(self.image_entries, key=sort_key)[0] | |
| class SmithsonianAPIClient: | |
| def __init__( | |
| self, | |
| file_types: Iterable[str], | |
| rate_limit: float = 0.5, | |
| rows_per_page: int = 100, | |
| api_key: str = DEFAULT_API_KEY, | |
| ): | |
| self.file_types = list(dict.fromkeys(file_type.lower() for file_type in file_types)) | |
| self.rate_limit = rate_limit | |
| self.rows_per_page = rows_per_page | |
| self.api_key = api_key | |
| self.session = requests.Session() | |
| def fetch(self) -> Iterable[List[Dict[str, object]]]: | |
| for file_type in self.file_types: | |
| start = 0 | |
| total = None | |
| while True: | |
| params = { | |
| "file_type": file_type, | |
| "start": start, | |
| "rows": self.rows_per_page, | |
| } | |
| response = self.session.get(API_URL, params=params, timeout=60) | |
| try: | |
| response.raise_for_status() | |
| except requests.HTTPError as exc: # pragma: no cover - defensive. | |
| raise CommandError( | |
| f"Failed to fetch Smithsonian data for file_type={file_type}: {exc}" | |
| ) from exc | |
| payload = response.json() | |
| rows = payload.get("rows", []) | |
| total = payload.get("rowCount", total) | |
| logger.info( | |
| "Fetched %s rows for file_type=%s at offset %s", len(rows), file_type, start | |
| ) | |
| yield rows | |
| start += self.rows_per_page | |
| if total is not None and start >= total: | |
| break | |
| if not rows: | |
| break | |
| time.sleep(self.rate_limit) | |
| def fetch_by_model_url(self, model_url: str) -> List[Dict[str, object]]: | |
| start = 0 | |
| collected: List[Dict[str, object]] = [] | |
| while True: | |
| params = { | |
| "model_url": model_url, | |
| "start": start, | |
| "rows": self.rows_per_page, | |
| } | |
| response = self.session.get(API_URL, params=params, timeout=60) | |
| try: | |
| response.raise_for_status() | |
| except requests.HTTPError as exc: # pragma: no cover - defensive. | |
| raise CommandError( | |
| f"Failed to fetch additional Smithsonian data for {model_url}: {exc}" | |
| ) from exc | |
| payload = response.json() | |
| rows = payload.get("rows", []) | |
| collected.extend(rows) | |
| if len(rows) < self.rows_per_page or not rows: | |
| break | |
| start += self.rows_per_page | |
| time.sleep(self.rate_limit) | |
| return collected | |
| def fetch_open_access_metadata(self, model_url: str) -> Optional[Dict[str, object]]: | |
| """Fetch rich metadata from the Smithsonian Open Access API for a 3D package.""" | |
| try: | |
| params = { | |
| "q": model_url, | |
| "api_key": self.api_key, | |
| "rows": 1, | |
| } | |
| response = self.session.get(OPEN_ACCESS_API_URL, params=params, timeout=60) | |
| response.raise_for_status() | |
| payload = response.json() | |
| rows = payload.get("response", {}).get("rows", []) | |
| if rows: | |
| return rows[0] | |
| return None | |
| except requests.RequestException as exc: | |
| logger.warning("Failed to fetch Open Access metadata for %s: %s", model_url, exc) | |
| return None | |
| class Command(BaseCommand): | |
| help = "Import Smithsonian 3D models into Icosa" | |
| # Mapping of Smithsonian unit codes to our categories | |
| UNIT_CODE_CATEGORY_MAP = { | |
| "nasm": "TRANSPORT", # National Air and Space Museum | |
| "nmah": "HISTORY", # National Museum of American History | |
| "nmnh": "NATURE", # National Museum of Natural History | |
| "nmnhmammals": "ANIMALS", # NMNH - Mammals | |
| "nmnhbirds": "ANIMALS", # NMNH - Birds | |
| "nmnhfishes": "ANIMALS", # NMNH - Fishes | |
| "nmnhreptiles": "ANIMALS", # NMNH - Reptiles | |
| "nmnhamphibians": "ANIMALS", # NMNH - Amphibians | |
| "nmnhinvertebratezoo": "ANIMALS", # NMNH - Invertebrate Zoology | |
| "nmnhanthro": "CULTURE", # NMNH - Anthropology | |
| "nmnhbotany": "NATURE", # NMNH - Botany | |
| "nmnhentomology": "ANIMALS", # NMNH - Entomology | |
| "nmnhiz": "ANIMALS", # NMNH - Invertebrate Zoology | |
| "nmnhminsci": "SCIENCE", # NMNH - Mineral Sciences | |
| "nmnhpaleo": "SCIENCE", # NMNH - Paleobiology | |
| "npg": "PEOPLE", # National Portrait Gallery | |
| "saam": "ART", # Smithsonian American Art Museum | |
| "acm": "CULTURE", # Anacostia Community Museum | |
| "fsg": "ART", # Freer Gallery of Art and Arthur M. Sackler Gallery | |
| "hmsg": "ART", # Hirshhorn Museum and Sculpture Garden | |
| "npm": "HISTORY", # National Postal Museum | |
| "chndm": "ART", # Cooper Hewitt, Smithsonian Design Museum | |
| "nzp": "ANIMALS", # National Zoological Park | |
| "si": "MISCELLANEOUS", # Smithsonian Institution (general) | |
| "cfch": "CULTURE", # Center for Folklife and Cultural Heritage | |
| } | |
| def add_arguments(self, parser): | |
| parser.add_argument( | |
| "--rows", | |
| type=int, | |
| default=100, | |
| help="Number of rows to fetch per API call", | |
| ) | |
| parser.add_argument( | |
| "--rate-limit", | |
| type=float, | |
| default=0.5, | |
| help="Seconds to wait between API requests", | |
| ) | |
| parser.add_argument( | |
| "--max-assets", | |
| type=int, | |
| default=None, | |
| help="Optional limit on the number of assets to import", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Fetch data but do not write to the database", | |
| ) | |
| parser.add_argument( | |
| "--fix-thumbs", | |
| action="store_true", | |
| help="Only download missing thumbnails for already-imported assets", | |
| ) | |
| parser.add_argument( | |
| "--update-existing", | |
| action="store_true", | |
| help="Update existing assets with fresh metadata (default: skip existing)", | |
| ) | |
| parser.add_argument( | |
| "--api-key", | |
| type=str, | |
| default=DEFAULT_API_KEY, | |
| help=f"Smithsonian Open Access API key (default: {DEFAULT_API_KEY})", | |
| ) | |
| @staticmethod | |
| def normalise_metadata(rows: Iterable[Dict[str, object]]) -> Dict[str, SmithsonianAsset]: | |
| """Extract basic file information from 3D API rows. Rich metadata comes from Open Access API.""" | |
| assets: Dict[str, SmithsonianAsset] = {} | |
| for row in rows: | |
| content = row.get("content", {}) | |
| if not isinstance(content, dict): | |
| continue | |
| entry = Command.resource_from_content(content) | |
| if entry is None: | |
| continue | |
| model_url = content.get("model_url") | |
| if not model_url: | |
| continue | |
| title = row.get("title") or "Untitled Smithsonian Model" | |
| asset = assets.get(model_url) | |
| if asset is None: | |
| asset = SmithsonianAsset( | |
| title=title, | |
| model_url=model_url, | |
| ) | |
| assets[model_url] = asset | |
| if entry.uri and Command.should_include_entry(entry): | |
| asset.add_entry(entry) | |
| return assets | |
| @staticmethod | |
| def resource_from_content(content: Dict[str, object]) -> Optional[SmithsonianResource]: | |
| uri = content.get("uri") | |
| if not uri: | |
| return None | |
| return SmithsonianResource( | |
| uri=uri, | |
| usage=content.get("usage"), | |
| quality=content.get("quality"), | |
| model_type=content.get("model_type"), | |
| file_type=content.get("file_type"), | |
| extra={ | |
| key: value | |
| for key, value in content.items() | |
| if key | |
| not in {"uri", "usage", "quality", "model_type", "file_type", "model_url"} | |
| }, | |
| ) | |
| @staticmethod | |
| def is_image_usage(usage: Optional[str]) -> bool: | |
| return (usage or "").lower().startswith("image") | |
| @classmethod | |
| def infer_file_type(cls, entry: SmithsonianResource) -> Optional[str]: | |
| detected = entry.extra.get("detected_file_type") | |
| if isinstance(detected, str) and detected: | |
| return detected.lower() | |
| for candidate in (entry.model_type, entry.file_type): | |
| if candidate: | |
| detected_type = candidate.lower() | |
| entry.extra.setdefault("detected_file_type", detected_type) | |
| return detected_type | |
| path = urlparse(entry.uri).path | |
| extension = os.path.splitext(path)[1].lstrip(".").lower() | |
| if extension: | |
| entry.extra.setdefault("detected_file_type", extension) | |
| return extension | |
| return None | |
| @classmethod | |
| def should_include_entry(cls, entry: SmithsonianResource) -> bool: | |
| if cls.is_image_usage(entry.usage): | |
| return True | |
| detected_type = cls.infer_file_type(entry) | |
| return bool(detected_type and detected_type in SUPPORTED_FILE_TYPE_SET) | |
| @staticmethod | |
| def guess_content_type(uri: str, default: Optional[str] = None) -> Optional[str]: | |
| content_type, _ = mimetypes.guess_type(uri) | |
| if content_type: | |
| return content_type | |
| return default | |
| @classmethod | |
| def extract_unit_code(cls, record_id: Optional[str]) -> Optional[str]: | |
| """Extract unit code from Smithsonian record ID like 'nasm_A20120325000'.""" | |
| if not record_id: | |
| return None | |
| parts = record_id.split("_") | |
| if len(parts) >= 1: | |
| return parts[0].lower() | |
| return None | |
| @classmethod | |
| def determine_category(cls, unit_code: Optional[str]) -> Optional[str]: | |
| """Map Smithsonian unit code to our category.""" | |
| if not unit_code: | |
| return None | |
| unit_lower = unit_code.lower() | |
| # Try exact match first | |
| category = cls.UNIT_CODE_CATEGORY_MAP.get(unit_lower) | |
| if category: | |
| return category | |
| # Fallback: try prefix matching (e.g., "nmnhsomething" -> "nmnh") | |
| # Sort by length descending to match longest prefix first | |
| for prefix in sorted(cls.UNIT_CODE_CATEGORY_MAP.keys(), key=len, reverse=True): | |
| if unit_lower.startswith(prefix): | |
| return cls.UNIT_CODE_CATEGORY_MAP[prefix] | |
| return None | |
| @classmethod | |
| def parse_license(cls, license_text: Optional[str]) -> Optional[str]: | |
| """Convert Smithsonian license text to our license constant.""" | |
| if not license_text: | |
| return None | |
| license_lower = license_text.lower() | |
| if "cc0" in license_lower or "public domain" in license_lower: | |
| return "CREATIVE_COMMONS_0" | |
| # Default to None if we can't determine | |
| return None | |
| @staticmethod | |
| def ensure_owner() -> AssetOwner: | |
| owner, _ = AssetOwner.objects.get_or_create( | |
| url=DEFAULT_OWNER["url"], | |
| defaults={ | |
| "displayname": DEFAULT_OWNER["displayname"], | |
| "imported": True, | |
| "is_claimed": False, | |
| }, | |
| ) | |
| return owner | |
| @staticmethod | |
| def asset_identifier(model_url: str) -> str: | |
| safe_url = model_url.replace(":", "-") | |
| return f"smithsonian-{safe_url}" | |
| @classmethod | |
| def determine_format_type(cls, entry: SmithsonianResource) -> Optional[str]: | |
| detected_type = cls.infer_file_type(entry) | |
| if not detected_type: | |
| return None | |
| if detected_type in {"glb", "gltf"}: | |
| return "GLTF2" | |
| if detected_type == "obj": | |
| return "OBJ" | |
| if detected_type == "stl": | |
| return "STL" | |
| return None | |
| @classmethod | |
| def determine_content_type(cls, uri: str, format_type: Optional[str]) -> Optional[str]: | |
| guessed = cls.guess_content_type(uri) | |
| if guessed: | |
| return guessed | |
| extension = os.path.splitext(urlparse(uri).path)[1].lower() | |
| if extension == ".glb": | |
| return "model/gltf-binary" | |
| if extension == ".gltf": | |
| return "model/gltf+json" | |
| if extension == ".obj": | |
| return "text/plain" | |
| if extension == ".stl": | |
| return "model/stl" | |
| if format_type == "GLTF2": | |
| return "model/gltf-binary" | |
| if format_type == "OBJ": | |
| return "text/plain" | |
| if format_type == "STL": | |
| return "model/stl" | |
| return "application/octet-stream" | |
| @staticmethod | |
| def build_format_role(format_type: str, entry: SmithsonianResource, index: int) -> str: | |
| parts = [format_type] | |
| if entry.usage: | |
| parts.append(entry.usage.upper().replace("-", "_").replace(" ", "_")) | |
| if entry.quality: | |
| parts.append(entry.quality.upper().replace("-", "_").replace(" ", "_")) | |
| parts.append(str(index)) | |
| role = "SMITHSONIAN_" + "_".join(filter(None, parts)) | |
| return role[:255] | |
| def download_thumbnail( | |
| self, entry: SmithsonianResource | |
| ) -> Tuple[Optional[ContentFile], Optional[str], int, str]: | |
| if not entry.uri: | |
| return None, None, 0, "no URI provided" | |
| try: | |
| response = requests.get(entry.uri, timeout=60) | |
| response.raise_for_status() | |
| except requests.RequestException as exc: # pragma: no cover - network failure handling | |
| logger.warning("Failed to download thumbnail %s: %s", entry.uri, exc) | |
| return None, None, 0, f"request error: {exc}" | |
| raw_size = len(response.content) | |
| try: | |
| # Process the image to normalize format and aspect ratio | |
| with Image.open(io.BytesIO(response.content)) as im: | |
| # Sample top-left pixel color for background | |
| bg_color = (255, 255, 255) # default white | |
| try: | |
| if im.mode in ("RGB", "RGBA", "L", "LA", "P"): | |
| pixel = im.getpixel((0, 0)) | |
| if isinstance(pixel, int): | |
| # Grayscale | |
| bg_color = (pixel, pixel, pixel) | |
| elif len(pixel) >= 3: | |
| # RGB or RGBA | |
| bg_color = tuple(pixel[:3]) | |
| elif len(pixel) == 2: | |
| # LA (luminance + alpha) | |
| bg_color = (pixel[0], pixel[0], pixel[0]) | |
| except Exception: | |
| # If sampling fails, stick with white | |
| pass | |
| # Ensure RGB (discard alpha on background color if present) | |
| if im.mode in ("RGBA", "LA", "P"): | |
| bg = Image.new("RGB", im.size, bg_color) | |
| if im.mode == "P" and "transparency" in im.info: | |
| im = im.convert("RGBA") | |
| if im.mode in ("RGBA", "LA"): | |
| alpha = im.split()[-1] | |
| bg.paste(im.convert("RGB"), mask=alpha) | |
| im = bg | |
| else: | |
| bg.paste(im.convert("RGB")) | |
| im = bg | |
| else: | |
| im = im.convert("RGB") | |
| # Fit image into an 8:5 box without upscaling image content | |
| target_ar = 8 / 5 | |
| max_w, max_h = 1600, 1000 # upper bound for large sources | |
| w, h = im.size | |
| # Scale down if larger than max box; never scale up | |
| scale = min(1.0, min(max_w / w, max_h / h)) | |
| new_w = int(w * scale) | |
| new_h = int(h * scale) | |
| if scale < 1.0: | |
| im = im.resize((new_w, new_h), Image.LANCZOS) | |
| else: | |
| new_w, new_h = w, h | |
| # Compute minimal padding to achieve 8:5 aspect ratio canvas | |
| if new_w / new_h < target_ar: | |
| canvas_w = int(round(new_h * target_ar)) | |
| canvas_h = new_h | |
| else: | |
| canvas_w = new_w | |
| canvas_h = int(round(new_w / target_ar)) | |
| # Add 10% padding around the image using sampled background color | |
| pad = int(0.1 * max(canvas_w, canvas_h)) | |
| padded_w = canvas_w + 2 * pad | |
| padded_h = canvas_h + 2 * pad | |
| canvas = Image.new("RGB", (padded_w, padded_h), bg_color) | |
| # Center the image on the padded canvas | |
| paste_x = (padded_w - canvas_w) // 2 | |
| paste_y = (padded_h - canvas_h) // 2 | |
| inner_canvas = Image.new("RGB", (canvas_w, canvas_h), bg_color) | |
| img_x = (canvas_w - new_w) // 2 | |
| img_y = (canvas_h - new_h) // 2 | |
| inner_canvas.paste(im, (img_x, img_y)) | |
| canvas.paste(inner_canvas, (paste_x, paste_y)) | |
| # Save as JPEG | |
| buf = io.BytesIO() | |
| canvas.save(buf, format="JPEG", quality=90) | |
| buf.seek(0) | |
| processed_content = buf.read() | |
| filename = f"thumbnail-{hashlib.sha256(entry.uri.encode('utf-8')).hexdigest()[:12]}.jpg" | |
| content_type = "image/jpeg" | |
| size = len(processed_content) | |
| diagnostics = ( | |
| f"status={response.status_code}, raw_bytes={raw_size}, " | |
| f"processed_bytes={size}, content_type={content_type}, " | |
| f"original_size={w}x{h}, final_size={padded_w}x{padded_h}" | |
| ) | |
| logger.debug("Processed thumbnail %s: %s", entry.uri, diagnostics) | |
| return ContentFile(processed_content, name=filename), content_type, size, diagnostics | |
| except Exception as exc: | |
| logger.warning("Failed to process thumbnail image %s: %s", entry.uri, exc) | |
| # Fall back to returning raw content if image processing fails | |
| raw_content_type = response.headers.get("Content-Type") | |
| if raw_content_type: | |
| raw_content_type = raw_content_type.split(";")[0].strip() | |
| extension = None | |
| if raw_content_type: | |
| extension = mimetypes.guess_extension(raw_content_type) | |
| if not extension: | |
| extension = os.path.splitext(urlparse(entry.uri).path)[1] | |
| if not extension: | |
| extension = ".jpg" | |
| if extension == ".jpe": | |
| extension = ".jpg" | |
| content_type = raw_content_type or mimetypes.guess_type(f"thumbnail{extension}")[0] | |
| filename = f"thumbnail-{hashlib.sha256(entry.uri.encode('utf-8')).hexdigest()[:12]}{extension}" | |
| diagnostics = ( | |
| f"status={response.status_code}, bytes={raw_size}, " | |
| f"content_type={content_type or 'unknown'}, extension={extension}, " | |
| f"processing_error={exc}" | |
| ) | |
| return ContentFile(response.content, name=filename), content_type, raw_size, diagnostics | |
| def find_existing_asset(self, asset_data: SmithsonianAsset) -> Optional[Asset]: | |
| asset_url = self.asset_identifier(asset_data.model_url) | |
| asset = Asset.objects.filter(url=asset_url).first() | |
| if asset: | |
| return asset | |
| asset = Asset.objects.filter(polydata__model_url=asset_data.model_url).first() | |
| if asset: | |
| return asset | |
| model_uris = [entry.uri for entry in asset_data.model_entries if entry.uri] | |
| if model_uris: | |
| resource = ( | |
| Resource.objects.filter(external_url__in=model_uris) | |
| .select_related("asset") | |
| .first() | |
| ) | |
| if resource: | |
| return resource.asset | |
| return None | |
| def create_or_update_asset( | |
| self, | |
| asset_data: SmithsonianAsset, | |
| owner: AssetOwner, | |
| *, | |
| verbosity: int = 1, | |
| update_existing: bool = False, | |
| ) -> Optional[Asset]: | |
| root_entry = asset_data.preferred_model_entry() | |
| if root_entry is None: | |
| raise CommandError(f"No usable model files found for {asset_data.model_url}") | |
| asset_url = self.asset_identifier(asset_data.model_url) | |
| asset = self.find_existing_asset(asset_data) | |
| created = False | |
| if asset is None: | |
| created = True | |
| asset = Asset(url=asset_url) | |
| else: | |
| # Asset already exists - skip if update_existing is False | |
| if not update_existing: | |
| if verbosity >= 2: | |
| self.stdout.write(f"Skipping existing asset {asset_data.model_url}") | |
| return None | |
| now = timezone.now() | |
| if created and not asset.create_time: | |
| asset.create_time = now | |
| asset.url = asset_url | |
| asset.name = asset_data.title | |
| asset.update_time = now | |
| asset.visibility = PUBLIC | |
| asset.state = ASSET_STATE_COMPLETE | |
| asset.owner = owner | |
| asset.imported_from = IMPORT_SOURCE | |
| asset.polydata = asset_data.to_metadata() | |
| # Set license | |
| if asset_data.license: | |
| parsed_license = self.parse_license(asset_data.license) | |
| if parsed_license: | |
| asset.license = parsed_license | |
| # Build description from available metadata | |
| description_parts = [] | |
| if asset_data.description: | |
| description_parts.append(asset_data.description) | |
| if asset_data.credit: | |
| description_parts.append(f"Credit: {asset_data.credit}") | |
| if description_parts: | |
| asset.description = "\n\n".join(description_parts) | |
| # Determine category from unit code | |
| if asset_data.unit_code: | |
| category = self.determine_category(asset_data.unit_code) | |
| if category: | |
| asset.category = category | |
| if verbosity >= 1: | |
| self.stdout.write(f" → Category: {category} (from unit_code: {asset_data.unit_code})") | |
| else: | |
| if verbosity >= 1: | |
| self.stdout.write(f" → No category mapping for unit_code: {asset_data.unit_code}") | |
| else: | |
| if verbosity >= 1: | |
| self.stdout.write(f" → No unit_code found") | |
| if verbosity >= 1: | |
| action = "Creating" if created else "Updating" | |
| self.stdout.write(f"{action} asset for Smithsonian model {asset_data.model_url}") | |
| if asset_data.license: | |
| self.stdout.write(f" → License: {asset_data.license}") | |
| if asset_data.description: | |
| desc_preview = asset_data.description[:100] + "..." if len(asset_data.description) > 100 else asset_data.description | |
| self.stdout.write(f" → Description: {desc_preview}") | |
| asset.save() | |
| # Add tags from Smithsonian metadata | |
| if asset_data.tags: | |
| if verbosity >= 2: | |
| self.stdout.write(f" → Tags: {', '.join(asset_data.tags)}") | |
| for tag_name in asset_data.tags: | |
| tag, _ = Tag.objects.get_or_create(name=tag_name) | |
| asset.tags.add(tag) | |
| else: | |
| if verbosity >= 2: | |
| self.stdout.write(f" → No tags from metadata") | |
| # Download thumbnail if asset doesn't have one or if updating existing assets | |
| if not asset.thumbnail or update_existing: | |
| thumbnail_entry = asset_data.preferred_image_entry() | |
| if thumbnail_entry: | |
| if verbosity >= 1: | |
| self.stdout.write(f"Attempting thumbnail download from {thumbnail_entry.uri}") | |
| file_obj, content_type, size, diagnostics = self.download_thumbnail(thumbnail_entry) | |
| if file_obj: | |
| asset.thumbnail.save(file_obj.name, file_obj, save=False) | |
| asset.thumbnail_contenttype = content_type | |
| if verbosity >= 1: | |
| self.stdout.write( | |
| f"Saved thumbnail {file_obj.name} ({size} bytes, content_type={content_type or 'unknown'}); {diagnostics}" | |
| ) | |
| else: | |
| if verbosity >= 1: | |
| self.stdout.write( | |
| f"Failed to download thumbnail from {thumbnail_entry.uri}; {diagnostics}" | |
| ) | |
| else: | |
| image_usages = sorted({(entry.usage or "unknown") for entry in asset_data.image_entries}) | |
| model_usages = sorted({(entry.usage or "unknown") for entry in asset_data.model_entries}) | |
| if verbosity >= 1: | |
| self.stdout.write( | |
| "No thumbnail entries available for " | |
| f"{asset_data.model_url}; image_usages={image_usages or ['none']}, " | |
| f"model_usages={model_usages or ['none']}" | |
| ) | |
| elif verbosity >= 2: | |
| self.stdout.write(f"Thumbnail already exists and --update-existing not set, skipping download") | |
| with transaction.atomic(): | |
| asset.format_set.filter(role__startswith="SMITHSONIAN_").delete() | |
| created_formats: List[Tuple[SmithsonianResource, Format]] = [] | |
| for index, entry in enumerate(asset_data.model_entries, start=1): | |
| entry_format_type = self.determine_format_type(entry) | |
| if entry_format_type is None: | |
| if verbosity >= 2: | |
| self.stdout.write( | |
| "Skipping unsupported Smithsonian resource " | |
| f"{entry.uri} for asset {asset_data.model_url}" | |
| ) | |
| continue | |
| format_role = self.build_format_role(entry_format_type, entry, index) | |
| format_obj = Format.objects.create( | |
| asset=asset, | |
| format_type=entry_format_type, | |
| role=format_role, | |
| ) | |
| resource = Resource.objects.create( | |
| asset=asset, | |
| format=format_obj, | |
| external_url=entry.uri, | |
| contenttype=self.determine_content_type(entry.uri, entry_format_type), | |
| ) | |
| format_obj.add_root_resource(resource) | |
| created_formats.append((entry, format_obj)) | |
| if not created_formats: | |
| raise CommandError( | |
| f"No supported Smithsonian formats could be created for {asset_data.model_url}" | |
| ) | |
| preferred_format = next((fmt for entry, fmt in created_formats if entry is root_entry), None) | |
| if preferred_format: | |
| preferred_format.is_preferred_for_gallery_viewer = True | |
| preferred_format.save(update_fields=["is_preferred_for_gallery_viewer"]) | |
| asset.preferred_viewer_format_override = preferred_format | |
| asset.is_viewer_compatible = True | |
| else: | |
| asset.preferred_viewer_format_override = None | |
| asset.is_viewer_compatible = False | |
| asset.update_time = timezone.now() | |
| asset.save() | |
| return asset | |
| def handle(self, *args, **options): | |
| rows = options["rows"] | |
| rate_limit = options["rate_limit"] | |
| max_assets = options["max_assets"] | |
| dry_run = options["dry_run"] | |
| fix_thumbs = options["fix_thumbs"] | |
| update_existing = options["update_existing"] | |
| api_key = options["api_key"] | |
| verbosity = options.get("verbosity", 1) | |
| client = SmithsonianAPIClient( | |
| file_types=SUPPORTED_FILE_TYPES, | |
| rate_limit=rate_limit, | |
| rows_per_page=rows, | |
| api_key=api_key, | |
| ) | |
| owner = self.ensure_owner() | |
| if fix_thumbs: | |
| self.fix_missing_thumbnails(client, verbosity, dry_run) | |
| return | |
| imported = 0 | |
| skipped = 0 | |
| aggregated_assets: Dict[str, SmithsonianAsset] = {} | |
| usable_asset_count = 0 | |
| stop_fetching = False | |
| for page_rows in client.fetch(): | |
| page_assets: Dict[str, SmithsonianAsset] = {} | |
| for model_url, asset_data in self.normalise_metadata(page_rows).items(): | |
| existing = aggregated_assets.get(model_url) | |
| if existing: | |
| had_models = bool(existing.model_entries) | |
| if asset_data.title and asset_data.title != existing.title: | |
| existing.title = asset_data.title | |
| for entry in asset_data.model_entries: | |
| existing.add_entry(entry) | |
| for entry in asset_data.image_entries: | |
| existing.add_entry(entry) | |
| if not had_models and existing.model_entries: | |
| usable_asset_count += 1 | |
| else: | |
| aggregated_assets[model_url] = asset_data | |
| page_assets[model_url] = asset_data | |
| if asset_data.model_entries: | |
| usable_asset_count += 1 | |
| # Process this page's assets immediately | |
| if page_assets: | |
| self.populate_missing_image_entries(client, page_assets, verbosity) | |
| # Filter which assets to enrich and import | |
| for model_url, asset_data in page_assets.items(): | |
| if not asset_data.model_entries: | |
| if verbosity >= 2: | |
| self.stdout.write( | |
| f"Skipping {asset_data.model_url} because it has no usable model entries" | |
| ) | |
| continue | |
| # Check if we should process this asset | |
| should_process = update_existing or self.find_existing_asset(asset_data) is None | |
| if not should_process: | |
| skipped += 1 | |
| if verbosity >= 2: | |
| self.stdout.write(f"Skipping existing asset {model_url}") | |
| continue | |
| # Enrich with Open Access metadata | |
| if verbosity >= 2: | |
| self.stdout.write(f"Enriching {model_url} with Open Access metadata...") | |
| oa_record = client.fetch_open_access_metadata(model_url) | |
| if oa_record: | |
| self.apply_open_access_metadata(asset_data, oa_record, verbosity) | |
| else: | |
| if verbosity >= 1: | |
| self.stdout.write(f" → No Open Access metadata found for {model_url}") | |
| # Write to database immediately | |
| if dry_run: | |
| self.stdout.write(f"Would import {asset_data.model_url}") | |
| else: | |
| result = self.create_or_update_asset( | |
| asset_data, | |
| owner, | |
| verbosity=verbosity, | |
| update_existing=update_existing, | |
| ) | |
| if result is not None: | |
| imported += 1 | |
| if verbosity >= 1: | |
| self.stdout.write(f"Imported {asset_data.model_url}") | |
| if max_assets is not None and imported >= max_assets: | |
| self.stdout.write("Reached asset import limit") | |
| stop_fetching = True | |
| break | |
| if stop_fetching: | |
| break | |
| if not dry_run: | |
| if imported == 0 and skipped == 0: | |
| self.stdout.write("No assets imported") | |
| else: | |
| self.stdout.write(f"Import complete: {imported} imported, {skipped} skipped") | |
| def fix_missing_thumbnails( | |
| self, | |
| client: SmithsonianAPIClient, | |
| verbosity: int, | |
| dry_run: bool, | |
| ) -> None: | |
| """Download missing thumbnails for already-imported Smithsonian assets.""" | |
| from django.db.models import Q | |
| from django.conf import settings | |
| all_smithsonian_assets = Asset.objects.filter( | |
| imported_from=IMPORT_SOURCE, | |
| ).select_related("owner") | |
| # Filter assets that either have no thumbnail path OR the file doesn't exist | |
| assets_without_thumbs = [] | |
| for asset in all_smithsonian_assets: | |
| if not asset.thumbnail: | |
| assets_without_thumbs.append(asset) | |
| elif settings.LOCAL_MEDIA_STORAGE and hasattr(asset.thumbnail, 'path'): | |
| # Check if local file exists | |
| try: | |
| if not os.path.exists(asset.thumbnail.path): | |
| assets_without_thumbs.append(asset) | |
| except (ValueError, AttributeError): | |
| # thumbnail.path may raise ValueError if file doesn't exist | |
| assets_without_thumbs.append(asset) | |
| total = len(assets_without_thumbs) | |
| if total == 0: | |
| self.stdout.write("All Smithsonian assets already have thumbnails") | |
| return | |
| self.stdout.write(f"Found {total} Smithsonian assets without thumbnails (or missing files)") | |
| fixed = 0 | |
| failed = 0 | |
| for asset in assets_without_thumbs: | |
| model_url = asset.polydata.get("model_url") if asset.polydata else None | |
| if not model_url: | |
| if verbosity >= 2: | |
| self.stdout.write(f"Skipping {asset.url}: no model_url in polydata") | |
| failed += 1 | |
| continue | |
| if verbosity >= 1: | |
| self.stdout.write(f"Fetching thumbnail data for {model_url}") | |
| try: | |
| rows = client.fetch_by_model_url(model_url) | |
| except Exception as exc: | |
| self.stdout.write(f"API fetch failed for {model_url}: {exc}") | |
| failed += 1 | |
| continue | |
| asset_data = self.normalise_metadata(rows).get(model_url) | |
| if not asset_data: | |
| if verbosity >= 2: | |
| self.stdout.write(f"No metadata found for {model_url}") | |
| failed += 1 | |
| continue | |
| thumbnail_entry = asset_data.preferred_image_entry() | |
| if not thumbnail_entry: | |
| if verbosity >= 1: | |
| self.stdout.write(f"No thumbnail entry found for {model_url}") | |
| failed += 1 | |
| continue | |
| if dry_run: | |
| self.stdout.write(f"Would download thumbnail for {asset.url} from {thumbnail_entry.uri}") | |
| fixed += 1 | |
| continue | |
| if verbosity >= 1: | |
| self.stdout.write(f"Downloading thumbnail from {thumbnail_entry.uri}") | |
| file_obj, content_type, size, diagnostics = self.download_thumbnail(thumbnail_entry) | |
| if file_obj: | |
| asset.thumbnail.save(file_obj.name, file_obj, save=True) | |
| asset.thumbnail_contenttype = content_type | |
| asset.save(update_fields=["thumbnail_contenttype"]) | |
| if verbosity >= 1: | |
| self.stdout.write( | |
| f"Saved thumbnail for {asset.url}: {file_obj.name} ({size} bytes); {diagnostics}" | |
| ) | |
| fixed += 1 | |
| else: | |
| if verbosity >= 1: | |
| self.stdout.write(f"Failed to download thumbnail for {asset.url}; {diagnostics}") | |
| failed += 1 | |
| self.stdout.write(f"Thumbnail fix complete: {fixed} fixed, {failed} failed") | |
| def populate_missing_image_entries( | |
| self, | |
| client: SmithsonianAPIClient, | |
| assets: Dict[str, SmithsonianAsset], | |
| verbosity: int, | |
| ) -> None: | |
| for asset in assets.values(): | |
| if asset.image_entries: | |
| continue | |
| supplementary_rows = client.fetch_by_model_url(asset.model_url) | |
| added = 0 | |
| supplementary_usages = set() | |
| for row in supplementary_rows: | |
| content = row.get("content", {}) | |
| if not isinstance(content, dict): | |
| continue | |
| usage = content.get("usage") | |
| if usage: | |
| supplementary_usages.add(usage) | |
| entry = self.resource_from_content(content) | |
| if entry is None or not entry.uri: | |
| continue | |
| if not self.should_include_entry(entry): | |
| continue | |
| if asset.add_entry(entry): | |
| added += 1 | |
| if verbosity >= 2: | |
| supplementary_usage_list = sorted(supplementary_usages or {"none"}) | |
| self.stdout.write( | |
| f"Supplementary fetch for {asset.model_url} returned " | |
| f"{len(supplementary_rows)} rows; added {added} new entries; " | |
| f"usages={supplementary_usage_list}" | |
| ) | |
| if not asset.image_entries and supplementary_rows: | |
| model_usages = sorted({(entry.usage or "unknown") for entry in asset.model_entries}) | |
| supplementary_usage_list = sorted(supplementary_usages or {"none"}) | |
| self.stdout.write( | |
| "No image entries found for " | |
| f"{asset.model_url} after supplementary fetch; " | |
| f"supplementary_usages={supplementary_usage_list}, " | |
| f"model_usages={model_usages or ['none']}" | |
| ) | |
| def apply_open_access_metadata( | |
| self, | |
| asset: SmithsonianAsset, | |
| oa_record: Dict[str, object], | |
| verbosity: int, | |
| ) -> None: | |
| """Apply Open Access metadata to a single asset.""" | |
| # Extract metadata from the Open Access record | |
| content = oa_record.get("content", {}) | |
| unit_code = oa_record.get("unitCode") | |
| # Extract descriptiveNonRepeating fields | |
| desc_non_rep = content.get("descriptiveNonRepeating", {}) | |
| if isinstance(desc_non_rep, dict): | |
| if not asset.record_id: | |
| asset.record_id = desc_non_rep.get("record_ID") | |
| if not asset.record_link: | |
| asset.record_link = desc_non_rep.get("record_link") | |
| if not asset.unit_code and desc_non_rep.get("unit_code"): | |
| asset.unit_code = desc_non_rep.get("unit_code") | |
| # Extract title/object name | |
| title_data = desc_non_rep.get("title", {}) | |
| if isinstance(title_data, dict): | |
| object_name = title_data.get("content") or title_data.get("label") | |
| if object_name and not asset.object_name: | |
| asset.object_name = object_name | |
| # Use top-level unitCode if not set | |
| if not asset.unit_code and unit_code: | |
| asset.unit_code = unit_code | |
| # Extract freetext fields | |
| freetext = content.get("freetext", {}) | |
| if isinstance(freetext, dict): | |
| # Get description from notes | |
| if not asset.description: | |
| notes = freetext.get("notes", []) | |
| if isinstance(notes, list): | |
| # Combine summary and brief description | |
| descriptions = [] | |
| for note in notes: | |
| if isinstance(note, dict): | |
| label = note.get("label", "").lower() | |
| note_content = note.get("content", "") | |
| if label in ["summary", "brief description"] and note_content: | |
| descriptions.append(note_content) | |
| if descriptions: | |
| asset.description = "\n\n".join(descriptions) | |
| # Get license/rights | |
| if not asset.license: | |
| rights = freetext.get("objectRights", []) | |
| if isinstance(rights, list) and rights: | |
| for right in rights: | |
| if isinstance(right, dict): | |
| rights_content = right.get("content", "") | |
| if rights_content: | |
| asset.license = rights_content | |
| break | |
| # Get credit line | |
| if not asset.credit: | |
| credit_line = freetext.get("creditLine", []) | |
| if isinstance(credit_line, list) and credit_line: | |
| for credit in credit_line: | |
| if isinstance(credit, dict): | |
| credit_content = credit.get("content", "") | |
| if credit_content: | |
| asset.credit = credit_content | |
| break | |
| # Extract tags from indexedStructured | |
| indexed = content.get("indexedStructured", {}) | |
| if isinstance(indexed, dict): | |
| tags_set = set() | |
| # Get topic tags | |
| topics = indexed.get("topic", []) | |
| if isinstance(topics, list): | |
| for topic in topics: | |
| if isinstance(topic, str) and topic.strip(): | |
| tags_set.add(topic.strip()) | |
| # Get usage_flag tags | |
| usage_flags = indexed.get("usage_flag", []) | |
| if isinstance(usage_flags, list): | |
| for flag in usage_flags: | |
| if isinstance(flag, str) and flag.strip(): | |
| tags_set.add(flag.strip()) | |
| # Get object_type tags | |
| object_types = indexed.get("object_type", []) | |
| if isinstance(object_types, list): | |
| for obj_type in object_types: | |
| if isinstance(obj_type, str) and obj_type.strip(): | |
| tags_set.add(obj_type.strip()) | |
| # Store as sorted list | |
| if tags_set: | |
| asset.tags = sorted(tags_set) | |
| if verbosity >= 1: | |
| self.stdout.write( | |
| f" → Open Access: unit_code={asset.unit_code}, " | |
| f"record_id={asset.record_id}, license={asset.license}, " | |
| f"has_description={bool(asset.description)}, tags={len(asset.tags)}" | |
| ) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment