Last active
July 28, 2025 07:49
-
-
Save Nempickaxe/b6bf67cb4612f207a6a47a780cf670e3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import markdownify | |
import re | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, unquote, urlparse | |
from dotenv import load_dotenv | |
# === CONFIGURATION === | |
load_dotenv() | |
BASE_URL = os.getenv("BASE_URL") | |
JSESSIONID = os.getenv("JSESSIONID") | |
ROOT_PAGE_ID = os.getenv("ROOT_PAGE_ID", "1000386655") | |
COOKIES = { | |
"JSESSIONID": JSESSIONID, | |
} | |
HEADERS = { | |
"Accept": "application/json" | |
} | |
OUTPUT_DIR = "confluence_obsidian" | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
def clean_title(title): | |
return re.sub(r'[\\/*?:"<>|]', "", title) | |
def fetch_page_content(page_id): | |
url = f"{BASE_URL}/rest/api/content/{page_id}?expand=body.storage,version" | |
res = requests.get(url, cookies=COOKIES, headers=HEADERS) | |
res.raise_for_status() | |
return res.json() | |
def fetch_page_children(page_id): | |
url = f"{BASE_URL}/rest/api/content/{page_id}/child/page?limit=1000" | |
res = requests.get(url, cookies=COOKIES, headers=HEADERS) | |
res.raise_for_status() | |
return res.json().get("results", []) | |
def fetch_attachments(page_id): | |
url = f"{BASE_URL}/rest/api/content/{page_id}/child/attachment?limit=1000" | |
res = requests.get(url, cookies=COOKIES, headers=HEADERS) | |
res.raise_for_status() | |
return res.json().get("results", []) | |
def download_file(full_url, dest_folder, filename_hint=None): | |
os.makedirs(dest_folder, exist_ok=True) | |
response = requests.get(full_url, cookies=COOKIES, stream=True) | |
response.raise_for_status() | |
parsed_url = urlparse(full_url) | |
basename = os.path.basename(parsed_url.path) | |
filename = filename_hint or basename | |
filename = unquote(filename) | |
local_path = os.path.join(dest_folder, filename) | |
with open(local_path, 'wb') as f: | |
for chunk in response.iter_content(1024): | |
f.write(chunk) | |
return filename | |
def convert_html_and_download_assets(html, page_folder, page_id): | |
soup = BeautifulSoup(html, 'html.parser') | |
attachments = fetch_attachments(page_id) | |
downloaded = {} | |
# Download all attachments | |
for att in attachments: | |
name = att["title"] | |
link = att["_links"]["download"] | |
url = urljoin(BASE_URL, link) | |
try: | |
print(f"Downloading attachment: {name} from {url}") | |
local_file = download_file(url, page_folder, name) | |
downloaded[name] = name # Use filename only for Obsidian | |
except Exception as e: | |
print(f"Failed to download attachment {name}: {e}") | |
# Handle drawio macros | |
for macro in soup.find_all("ac:structured-macro", {"ac:name": "drawio"}): | |
diagram_name_tag = macro.find("ac:parameter", {"ac:name": "diagramName"}) | |
diagram_name = diagram_name_tag.text.strip() if diagram_name_tag else None | |
if diagram_name: | |
for ext in [".drawio.svg", ".svg", ".png"]: | |
full_name = diagram_name + ext | |
if full_name in downloaded: | |
macro.replace_with(f"![[{full_name}]]") | |
break | |
else: | |
raw = diagram_name + ".drawio" | |
if raw in downloaded: | |
macro.replace_with(f"[[{raw}]]") | |
else: | |
macro.replace_with(f"`[Draw.io diagram not found: {diagram_name}]`") | |
# Replace <ac:image> | |
for ac_img in soup.find_all("ac:image"): | |
attachment_tag = ac_img.find("ri:attachment") | |
if attachment_tag: | |
file_name = attachment_tag.get("ri:filename") | |
if file_name in downloaded: | |
ac_img.replace_with(f"![[{file_name}]]") | |
# Replace <img> | |
for img in soup.find_all("img"): | |
src = img.get("src", "") | |
if "download/attachments/" in src: | |
file_name = os.path.basename(urlparse(src).path) | |
if file_name in downloaded: | |
img.replace_with(f"![[{file_name}]]") | |
# Inline SVG block | |
for svg in soup.find_all("svg"): | |
svg_code = str(svg) | |
svg_filename = "inline_svg.svg" | |
svg_path = os.path.join(page_folder, svg_filename) | |
try: | |
with open(svg_path, "w", encoding="utf-8") as f: | |
f.write(svg_code) | |
downloaded[svg_filename] = svg_filename | |
svg.replace_with(f"![[{svg_filename}]]") | |
except Exception as e: | |
print(f"Failed to write inline SVG: {e}") | |
# <ac:link> attachment links | |
for ac_link in soup.find_all("ac:link"): | |
attachment_tag = ac_link.find("ri:attachment") | |
if attachment_tag: | |
file_name = attachment_tag.get("ri:filename") | |
if file_name in downloaded: | |
ac_link.replace_with(f"[[{file_name}]]") | |
return markdownify.markdownify(str(soup), heading_style="ATX") | |
def save_markdown(title, content, path, child_titles): | |
filename = clean_title(title) + ".md" | |
full_path = os.path.join(path, filename) | |
with open(full_path, "w", encoding="utf-8") as f: | |
f.write(f"# {title}\n\n") | |
f.write(content) | |
if child_titles: | |
f.write("\n---\n\n### Child Pages\n") | |
for child_title in child_titles: | |
f.write(f"- [[{clean_title(child_title)}]]\n") | |
return full_path | |
def export_page(page_id, path): | |
page = fetch_page_content(page_id) | |
title = page["title"] | |
html = page["body"]["storage"]["value"] | |
page_folder = os.path.join(path, clean_title(title)) | |
os.makedirs(page_folder, exist_ok=True) | |
markdown = convert_html_and_download_assets(html, page_folder, page_id) | |
child_pages = fetch_page_children(page_id) | |
child_titles = [child["title"] for child in child_pages] | |
save_markdown(title, markdown, path, child_titles) | |
for child in child_pages: | |
export_page(child["id"], page_folder) | |
def main(): | |
export_page(ROOT_PAGE_ID, OUTPUT_DIR) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment