Created
April 27, 2026 19:05
-
-
Save Esl1h/7a2a65f6f78d0e4e65fa58084366b6e9 to your computer and use it in GitHub Desktop.
Python Scripts to migrate my blog posts from Hashnode to Astro
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Download Hashnode CDN images from migrated posts and rewrite URLs locally. | |
| Run from the esli.blog project root AFTER migrate_hashnode.py: | |
| python3 scripts/download_images.py | |
| python3 scripts/download_images.py --dry-run | |
| python3 scripts/download_images.py --posts-dir src/data/blog | |
| Images are saved to: | |
| public/images/<slug>/<filename> (inline images) | |
| public/images/covers/<slug>.<ext> (ogImage / cover) | |
| Markdown and frontmatter URLs are rewritten in-place. | |
| Idempotent: already-downloaded images are not re-fetched. | |
| """ | |
| import argparse | |
| import hashlib | |
| import re | |
| import sys | |
| import urllib.request | |
| import urllib.error | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| POSTS_DIR = Path("src/data/blog") | |
| IMAGES_DIR = Path("public/images") | |
| COVERS_DIR = Path("src/assets/blog/covers") # processed by Astro's image pipeline | |
| COVERS_REL = "../../assets/blog/covers" # relative from src/data/blog/ | |
| HASHNODE_CDN_RE = re.compile(r"https://cdn\.hashnode\.com/[^\s\"')>]+") | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def url_to_filename(url: str) -> str: | |
| """Derive a filename from a CDN URL, preserving extension.""" | |
| path = urlparse(url).path | |
| name = Path(path).name # e.g. "5KidIZxr9Q.png" or "1234/filename.jpg" | |
| # Keep only the last segment and normalise | |
| name = Path(path.split("/")[-1]).name | |
| # Some URLs end with query params or have no extension — add .jpg fallback | |
| if "." not in name or len(name.split(".")[-1]) > 5: | |
| h = hashlib.md5(url.encode()).hexdigest()[:8] | |
| name = f"{h}.jpg" | |
| return name | |
| def download(url: str, dest: Path) -> bool: | |
| """Download url → dest. Returns True on success, False on error.""" | |
| if dest.exists(): | |
| return True # already downloaded | |
| dest.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) | |
| with urllib.request.urlopen(req, timeout=30) as resp: | |
| dest.write_bytes(resp.read()) | |
| return True | |
| except Exception as exc: | |
| print(f" WARN download failed [{exc}]: {url}") | |
| return False | |
| def public_path(dest: Path) -> str: | |
| """Convert public/images/... absolute path to /images/... URL.""" | |
| return "/" + str(dest.relative_to(Path("public"))) | |
| # --------------------------------------------------------------------------- | |
| # Per-post processing | |
| # --------------------------------------------------------------------------- | |
| def process_post(post_path: Path, dry_run: bool) -> tuple[int, int]: | |
| """ | |
| Scan one post, download its CDN images, rewrite URLs. | |
| Returns (downloaded, failed). | |
| """ | |
| slug = post_path.stem | |
| text = post_path.read_text(encoding="utf-8") | |
| # Collect all unique Hashnode CDN URLs in this file | |
| urls = list(dict.fromkeys(HASHNODE_CDN_RE.findall(text))) | |
| if not urls: | |
| return 0, 0 | |
| downloaded = 0 | |
| failed = 0 | |
| replacements: dict[str, str] = {} | |
| for url in urls: | |
| is_cover = bool(re.search(rf'^ogImage:.*{re.escape(url)}', text, re.MULTILINE)) | |
| if is_cover: | |
| # Cover images go into src/assets/blog/covers/ so Astro's image() | |
| # schema can process them through the asset pipeline. | |
| fname = url_to_filename(url) | |
| ext = Path(fname).suffix or ".jpg" | |
| dest = COVERS_DIR / f"{slug}{ext}" | |
| local_url = f"{COVERS_REL}/{slug}{ext}" | |
| else: | |
| fname = url_to_filename(url) | |
| dest = IMAGES_DIR / slug / fname | |
| local_url = public_path(dest) | |
| if dry_run: | |
| label = "COVER" if is_cover else "INLINE" | |
| print(f" DRY-RUN [{label}]: {url[:80]}\n → {local_url}") | |
| replacements[url] = local_url | |
| downloaded += 1 | |
| continue | |
| ok = download(url, dest) | |
| if ok: | |
| replacements[url] = local_url | |
| downloaded += 1 | |
| else: | |
| failed += 1 | |
| if replacements and not dry_run: | |
| new_text = text | |
| for old_url, new_url in replacements.items(): | |
| new_text = new_text.replace(old_url, new_url) | |
| if new_text != text: | |
| post_path.write_text(new_text, encoding="utf-8") | |
| return downloaded, failed | |
| # --------------------------------------------------------------------------- | |
| # Entry point | |
| # --------------------------------------------------------------------------- | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Download Hashnode CDN images and rewrite URLs in migrated posts" | |
| ) | |
| parser.add_argument( | |
| "--posts-dir", | |
| default=str(POSTS_DIR), | |
| help=f"Directory containing migrated .md posts (default: {POSTS_DIR})", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be downloaded without actually fetching", | |
| ) | |
| args = parser.parse_args() | |
| posts_dir = Path(args.posts_dir) | |
| if not posts_dir.exists(): | |
| print(f"ERROR: posts directory not found: {posts_dir}", file=sys.stderr) | |
| sys.exit(1) | |
| posts = sorted(posts_dir.glob("*.md")) | |
| if not posts: | |
| print("No .md files found.") | |
| return | |
| COVERS_DIR.mkdir(parents=True, exist_ok=True) | |
| print(f"Posts : {posts_dir} ({len(posts)} files)") | |
| print(f"Inline : {IMAGES_DIR}") | |
| print(f"Covers : {COVERS_DIR} (relative ref: {COVERS_REL}/)") | |
| print(f"Mode : {'DRY-RUN' if args.dry_run else 'LIVE'}\n") | |
| total_dl = total_fail = 0 | |
| for post in posts: | |
| dl, fail = process_post(post, args.dry_run) | |
| if dl or fail: | |
| print(f" {post.name}: {dl} downloaded, {fail} failed") | |
| total_dl += dl | |
| total_fail += fail | |
| print(f"\nDone: {total_dl} images downloaded/found, {total_fail} failed") | |
| if total_fail: | |
| print(" Failed URLs were left unchanged — check them manually.") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Migrate Hashnode backup .md files to Astro AstroPaper format. | |
| Run from the esli.blog project root: | |
| python3 scripts/migrate_hashnode.py | |
| python3 scripts/migrate_hashnode.py --dry-run | |
| python3 scripts/migrate_hashnode.py --source /custom/path/to/backup | |
| Idempotent: skips posts where the destination file already exists. | |
| Re-run whenever new posts appear in the Hashnode backup repo. | |
| """ | |
| import argparse | |
| import re | |
| import sys | |
| from pathlib import Path | |
| import yaml | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| # Path to the Hashnode backup repo (relative to this script's grandparent) | |
| DEFAULT_SOURCE = Path(__file__).resolve().parent.parent.parent / "esli.blog.br" | |
| # Destination inside the Astro project (relative to project root) | |
| DEST_DIR = Path("src/data/blog") | |
| REDIRECTS_FILE = Path("public/_redirects") | |
| AUTHOR = "Esli Silva" | |
| # --------------------------------------------------------------------------- | |
| # Frontmatter parsing | |
| # --------------------------------------------------------------------------- | |
| def _fallback_kv_parse(raw: str) -> dict: | |
| """Line-by-line key: value parser — used when PyYAML chokes on curly quotes.""" | |
| fm: dict = {} | |
| for line in raw.splitlines(): | |
| if ":" not in line: | |
| continue | |
| key, _, value = line.partition(":") | |
| key = key.strip() | |
| value = value.strip() | |
| # Strip surrounding straight or curly quotes | |
| if value and value[0] in ('"', "“", "”") and value[-1] in ('"', "“", "”"): | |
| value = value[1:-1] | |
| fm[key] = value | |
| return fm | |
| def parse_frontmatter(text: str) -> tuple[dict, str] | tuple[None, None]: | |
| """Split YAML frontmatter from body. Returns (dict, body) or (None, None).""" | |
| m = re.match(r"^---\r?\n(.*?)\r?\n---\r?\n(.*)", text, re.DOTALL) | |
| if not m: | |
| return None, None | |
| raw_fm, body = m.group(1), m.group(2) | |
| # Normalise curly/smart quotes to straight quotes so PyYAML can parse | |
| normalised = raw_fm.replace("“", '"').replace("”", '"') | |
| try: | |
| fm = yaml.safe_load(normalised) or {} | |
| except yaml.YAMLError: | |
| fm = _fallback_kv_parse(raw_fm) | |
| if not fm: | |
| return None, None | |
| return fm, body | |
| def parse_tags(raw) -> list[str]: | |
| """Accept a YAML string 'a, b, c' or a list ['a','b'].""" | |
| if isinstance(raw, list): | |
| return [str(t).strip() for t in raw if str(t).strip()] | |
| if isinstance(raw, str): | |
| return [t.strip() for t in raw.split(",") if t.strip()] | |
| return [] | |
| # --------------------------------------------------------------------------- | |
| # Content transformation | |
| # --------------------------------------------------------------------------- | |
| def extract_description(fm: dict, content: str) -> str: | |
| """ | |
| Return a short description from: | |
| 1. Hashnode 'subtitle' field (if present) | |
| 2. First non-empty prose paragraph of the post body | |
| """ | |
| subtitle = fm.get("subtitle", "") | |
| if subtitle and len(subtitle) > 20: | |
| return subtitle[:250] | |
| skip_prefixes = ("#", "![", "%[", "<", "---", "```", "|", ">", "-", "*", "1.") | |
| for line in content.splitlines(): | |
| line = line.strip() | |
| if not line or any(line.startswith(p) for p in skip_prefixes): | |
| continue | |
| # Strip inline markdown | |
| clean = re.sub(r"\[([^\]]+)\]\([^)]*\)", r"\1", line) | |
| clean = re.sub(r"\*{1,3}(.+?)\*{1,3}", r"\1", clean) | |
| clean = re.sub(r"`([^`]+)`", r"\1", clean) | |
| clean = clean.strip() | |
| if len(clean) > 30: | |
| if len(clean) > 250: | |
| clean = clean[:250].rsplit(" ", 1)[0] + "..." | |
| return clean | |
| return fm.get("title", "")[:250] | |
| def fix_image_attrs(content: str) -> str: | |
| """ | |
| Hashnode adds align="center" inside the markdown image URL field: | |
|  | |
| Strip those attributes so the URL is valid. | |
| """ | |
| return re.sub( | |
| r'(!\[[^\]]*\]\()([^)]+?)(\s+align="[^"]*")(\))', | |
| r"\1\2\4", | |
| content, | |
| ) | |
| def convert_embeds(content: str, own_domain: str = "esli.blog.br") -> str: | |
| """ | |
| Convert Hashnode %[url] embeds: | |
| - YouTube → <iframe> | |
| - Own blog internal links → /posts/slug | |
| - Everything else → plain markdown link | |
| """ | |
| def replace(m: re.Match) -> str: | |
| url = m.group(1).strip().rstrip("]") # occasional stray bracket | |
| url = url.strip() | |
| # YouTube | |
| yt = re.search( | |
| r"youtu(?:\.be/|be\.com/(?:watch\?v=|embed/))([a-zA-Z0-9_-]+)", | |
| url, | |
| ) | |
| if yt: | |
| vid = yt.group(1) | |
| return ( | |
| f'<iframe width="100%" height="400" ' | |
| f'src="https://www.youtube.com/embed/{vid}" ' | |
| f'frameborder="0" allowfullscreen></iframe>' | |
| ) | |
| # Internal self-link %[https://esli.blog.br/slug] | |
| own_re = re.compile( | |
| r"https?://(?:www\.)?" + re.escape(own_domain) + r"/([^)\s#?]+)" | |
| ) | |
| own_m = own_re.match(url) | |
| if own_m: | |
| path = own_m.group(1).rstrip("/") | |
| fragment = "" | |
| if "#" in path: | |
| path, fragment = path.split("#", 1) | |
| fragment = "#" + fragment | |
| return f"[{path}](/posts/{path}{fragment})" | |
| # Generic fallback: plain link | |
| return f"[{url}]({url})" | |
| return re.sub(r"%\[([^\]]+)\]", replace, content) | |
| # --------------------------------------------------------------------------- | |
| # YAML frontmatter writer | |
| # --------------------------------------------------------------------------- | |
| def _qs(value: str) -> str: | |
| """Quote-and-escape a string for YAML frontmatter.""" | |
| escaped = str(value).replace("\\", "\\\\").replace('"', '\\"') | |
| return f'"{escaped}"' | |
| def build_frontmatter(fm: dict, content: str, is_draft: bool) -> str: | |
| date = fm.get("datePublished", "") | |
| # yaml.safe_load may parse it as a datetime object | |
| if hasattr(date, "isoformat"): | |
| date = date.isoformat().replace("+00:00", "Z") | |
| lines = [ | |
| "---", | |
| f"author: {AUTHOR}", | |
| f"pubDatetime: {date}", | |
| f"title: {_qs(fm['title'])}", | |
| f"featured: false", | |
| f"draft: {str(is_draft).lower()}", | |
| ] | |
| tags = parse_tags(fm.get("tags", "")) or ["others"] | |
| lines.append("tags:") | |
| for tag in tags: | |
| lines.append(f" - {tag}") | |
| cover = fm.get("cover", "") | |
| if cover: | |
| lines.append(f"ogImage: {_qs(cover)}") | |
| canonical = fm.get("canonical", "") | |
| if canonical: | |
| lines.append(f"canonicalURL: {_qs(canonical)}") | |
| description = extract_description(fm, content) | |
| lines.append(f"description: {_qs(description)}") | |
| lines.append("---") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Core migration | |
| # --------------------------------------------------------------------------- | |
| def migrate_file(src: Path, dry_run: bool) -> str | None: | |
| """ | |
| Migrate a single Hashnode .md file. | |
| Returns the slug on success, None if skipped/error. | |
| """ | |
| text = src.read_text(encoding="utf-8") | |
| fm, content = parse_frontmatter(text) | |
| if fm is None: | |
| print(f" SKIP (no frontmatter): {src.name}") | |
| return None | |
| slug = fm.get("slug") | |
| if not slug: | |
| print(f" SKIP (no slug): {src.name}") | |
| return None | |
| if not fm.get("title") or not fm.get("datePublished"): | |
| print(f" SKIP (missing title/date): {src.name}") | |
| return None | |
| dest = DEST_DIR / f"{slug}.md" | |
| if dest.exists(): | |
| return None # idempotent — already migrated, silent skip | |
| is_draft = src.stem.startswith("draft-") | |
| try: | |
| body = fix_image_attrs(content) | |
| body = convert_embeds(body) | |
| header = build_frontmatter(fm, body, is_draft) | |
| output = f"{header}\n{body}" | |
| except Exception as exc: | |
| print(f" ERROR ({src.name}): {exc}") | |
| return None | |
| if dry_run: | |
| print(f" DRY-RUN: {src.name} → {dest}") | |
| else: | |
| dest.write_text(output, encoding="utf-8") | |
| label = "DRAFT " if is_draft else "" | |
| print(f" {label}MIGRATED: {slug}") | |
| return slug | |
| def update_redirects(slugs: list[str], dry_run: bool) -> None: | |
| if not slugs: | |
| return | |
| existing: set[str] = set() | |
| if REDIRECTS_FILE.exists(): | |
| existing = set(REDIRECTS_FILE.read_text(encoding="utf-8").splitlines()) | |
| new_lines = [ | |
| f"/{slug} /posts/{slug} 301" | |
| for slug in slugs | |
| if f"/{slug} /posts/{slug} 301" not in existing | |
| ] | |
| if not new_lines: | |
| print("\nRedirects: nothing new to add.") | |
| return | |
| if dry_run: | |
| print(f"\nDRY-RUN: would add {len(new_lines)} redirect(s) to {REDIRECTS_FILE}:") | |
| for line in new_lines[:5]: | |
| print(f" {line}") | |
| if len(new_lines) > 5: | |
| print(f" ... and {len(new_lines) - 5} more") | |
| return | |
| REDIRECTS_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with REDIRECTS_FILE.open("a", encoding="utf-8") as f: | |
| for line in new_lines: | |
| f.write(line + "\n") | |
| print(f"\nRedirects: added {len(new_lines)} entries to {REDIRECTS_FILE}") | |
| # --------------------------------------------------------------------------- | |
| # Entry point | |
| # --------------------------------------------------------------------------- | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Migrate Hashnode backup to Astro") | |
| parser.add_argument( | |
| "--source", | |
| default=str(DEFAULT_SOURCE), | |
| help="Path to Hashnode backup repo (default: ../esli.blog.br)", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Preview actions without writing any files", | |
| ) | |
| args = parser.parse_args() | |
| source_dir = Path(args.source) | |
| if not source_dir.exists(): | |
| print(f"ERROR: source not found: {source_dir}", file=sys.stderr) | |
| sys.exit(1) | |
| DEST_DIR.mkdir(parents=True, exist_ok=True) | |
| print(f"Source : {source_dir}") | |
| print(f"Dest : {DEST_DIR}") | |
| print(f"Mode : {'DRY-RUN' if args.dry_run else 'LIVE'}\n") | |
| source_files = sorted(source_dir.glob("*.md")) | |
| migrated: list[str] = [] | |
| skipped = 0 | |
| errors = 0 | |
| for src in source_files: | |
| result = migrate_file(src, args.dry_run) | |
| if result is None: | |
| skipped += 1 | |
| else: | |
| migrated.append(result) | |
| update_redirects(migrated, args.dry_run) | |
| print( | |
| f"\nDone: {len(migrated)} migrated, " | |
| f"{skipped} skipped (already done or invalid), " | |
| f"{errors} errors" | |
| ) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment