Last active
April 27, 2023 21:04
-
-
Save TheJJ/374dc4964d1cf87c8ceb424677d4e771 to your computer and use it in GitHub Desktop.
simple multi-file upload website for local file sharing - it queries a save location on the server interactively
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Simple Multi-File upload. | |
Select files in the web browser - on the server machine a popup asks for the save location. | |
(c) 2023 Jonas Jelten <[email protected]> | |
License: GPLv3 | |
Needs: | |
- PySide6 (Qt6) | |
- FastAPI | |
- uvicorn | |
""" | |
import argparse | |
import asyncio | |
import os | |
from typing import Annotated, Optional | |
from dataclasses import dataclass | |
from fastapi import FastAPI, File, UploadFile | |
from fastapi.responses import HTMLResponse, RedirectResponse | |
import uvicorn | |
from PySide6.QtWidgets import QApplication, QFileDialog | |
app = FastAPI() | |
START_RESPONSE = HTMLResponse(content=""" | |
<head> | |
<title>File Upload</title> | |
<script> | |
function validateForm() { | |
let files = document.forms["uploader"]["files"]; | |
if (files.files.length == 0) { | |
document.getElementById("status").innerHTML = "no file selected"; | |
return false; | |
} | |
document.getElementById("status").innerHTML = "uploading..."; | |
return true; | |
} | |
</script> | |
</head> | |
<body> | |
<h2>File upload</h2> | |
<form name="uploader" action="/" method="post" enctype="multipart/form-data" | |
onSubmit="return validateForm()"> | |
<input name="files" type="file" id="files" onchange="this.form.submit();" multiple> | |
<div id="status"/> | |
</form> | |
</body> | |
""") | |
SUCCESS_RESPONSE = HTMLResponse(content=""" | |
<body> | |
<h2>Success!</h2> | |
<button onClick="location.href='/'">Upload another file</button> | |
</body> | |
""") | |
FAIL_RESPONSE = HTMLResponse(content=""" | |
<body> | |
<h2>Failed!</h2> | |
<button onClick="location.href='/'">Try again</button> | |
</body> | |
""") | |
def pformatsize(size_bytes, commaplaces=1): | |
prefixes = ((1, 'K'), (2, 'M'), (3, 'G'), (4, 'T'), (5, 'P'), (6, 'E'), (7, 'Z')) | |
for exp, name in prefixes: | |
if abs(size_bytes) >= 1024 ** exp and abs(size_bytes) < 1024 ** (exp + 1): | |
new_size = size_bytes / 1024 ** exp | |
fstring = "%%.%df%%s" % commaplaces | |
return fstring % (new_size, name) | |
return "%.1fB" % size_bytes | |
@app.post("/") | |
async def create_upload_files(files: list[UploadFile]): | |
filename = None | |
directory = None | |
if not files: | |
return FAIL_RESPONSE | |
elif len(files) == 1: | |
filename = saveSelect(files[0].filename) | |
if not filename: | |
return FAIL_RESPONSE | |
else: | |
directory = saveSelectDirectory() | |
if not directory: | |
return FAIL_RESPONSE | |
for file in files: | |
if filename: | |
output_path = filename | |
elif directory: | |
output_path = os.path.join(directory, file.filename) | |
else: | |
print("no output filename was selected") | |
return FAIL_RESPONSE | |
filesize = 0 | |
with open(output_path, "wb") as wfile: | |
while True: | |
# 128MiB-blocks | |
filedata = await file.read(128 * 1024 ** 2) | |
if not filedata: | |
break | |
filesize += len(filedata) | |
wfile.write(filedata) | |
print(f"{wfile.name!r}: {pformatsize(filesize)} written") | |
return SUCCESS_RESPONSE | |
@app.get("/") | |
async def start(): | |
return START_RESPONSE | |
def saveSelect(defaultname=None) -> Optional[str]: | |
ret = QFileDialog.getSaveFileName(None, "Save to file", os.path.join(os.getcwd(), defaultname)) | |
return ret[0] | |
def saveSelectDirectory() -> Optional[str]: | |
return QFileDialog.getExistingDirectory(None, "Save to directory", os.getcwd()) | |
async def serve(host, port): | |
QApplication() | |
uvicorn_config = uvicorn.Config( | |
app, | |
host=host, | |
port=port, | |
) | |
webserver = uvicorn.Server(uvicorn_config) | |
await webserver.serve() | |
def main(): | |
cli = argparse.ArgumentParser() | |
cli.add_argument('--host', default="", help='what ip to listen on, default: all') | |
cli.add_argument('--port', '-p', default=8000, type=int, help='what ip to listen on. default: %(default)s') | |
args = cli.parse_args() | |
asyncio.run(serve(args.host, args.port)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment