Created
August 24, 2024 02:01
-
-
Save mythz/b8517969d55026496ab8043985ffa81f to your computer and use it in GitHub Desktop.
ffmpeg-api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
from aiohttp import web | |
import asyncio | |
import os | |
import tempfile | |
import uuid | |
# Ensure ffmpeg is installed and accessible in your system PATH | |
routes = web.RouteTableDef() | |
async def run_ffmpeg_command(command): | |
process = await asyncio.create_subprocess_shell( | |
command, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE | |
) | |
stdout, stderr = await process.communicate() | |
if process.returncode != 0: | |
raise Exception(f"FFmpeg command failed: {stderr.decode()}") | |
return stdout.decode() | |
@routes.post('/convert') | |
async def convert_image(request): | |
data = await request.post() | |
if 'image' not in data or not data['image'].filename: | |
return web.Response(status=400, text="No image file provided") | |
input_file = data['image'].filename | |
output_format = data.get('format', 'png') | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_input = os.path.join(temp_dir, f"input_{uuid.uuid4()}{os.path.splitext(input_file)[1]}") | |
temp_output = os.path.join(temp_dir, f"output_{uuid.uuid4()}.{output_format}") | |
# Save uploaded file | |
with open(temp_input, 'wb') as f: | |
f.write(data['image'].file.read()) | |
# Run FFmpeg command | |
command = f"ffmpeg -i {temp_input} {temp_output}" | |
await run_ffmpeg_command(command) | |
# Send the converted file | |
return web.FileResponse(temp_output, headers={'Content-Disposition': f'attachment; filename="converted.{output_format}"'}) | |
@routes.post('/resize') | |
async def resize_image(request): | |
data = await request.post() | |
if 'image' not in data or not data['image'].filename: | |
return web.Response(status=400, text="No image file provided") | |
input_file = data['image'].filename | |
width = data.get('width', '300') | |
height = data.get('height', '300') | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_input = os.path.join(temp_dir, f"input_{uuid.uuid4()}{os.path.splitext(input_file)[1]}") | |
temp_output = os.path.join(temp_dir, f"output_{uuid.uuid4()}{os.path.splitext(input_file)[1]}") | |
# Save uploaded file | |
with open(temp_input, 'wb') as f: | |
f.write(data['image'].file.read()) | |
# Run FFmpeg command | |
command = f"ffmpeg -i {temp_input} -vf scale={width}:{height} {temp_output}" | |
await run_ffmpeg_command(command) | |
# Send the resized file | |
return web.FileResponse(temp_output, headers={'Content-Disposition': f'attachment; filename="resized{os.path.splitext(input_file)[1]}"'}) | |
@routes.post('/rotate') | |
async def rotate_image(request): | |
data = await request.post() | |
if 'image' not in data or not data['image'].filename: | |
return web.Response(status=400, text="No image file provided") | |
input_file = data['image'].filename | |
angle = data.get('angle', '90') | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_input = os.path.join(temp_dir, f"input_{uuid.uuid4()}{os.path.splitext(input_file)[1]}") | |
temp_output = os.path.join(temp_dir, f"output_{uuid.uuid4()}{os.path.splitext(input_file)[1]}") | |
# Save uploaded file | |
with open(temp_input, 'wb') as f: | |
f.write(data['image'].file.read()) | |
# Run FFmpeg command | |
command = f"ffmpeg -i {temp_input} -vf rotate={angle}*PI/180 {temp_output}" | |
await run_ffmpeg_command(command) | |
# Send the rotated file | |
return web.FileResponse(temp_output, headers={'Content-Disposition': f'attachment; filename="rotated{os.path.splitext(input_file)[1]}"'}) | |
app = web.Application() | |
app.add_routes(routes) | |
if __name__ == '__main__': | |
web.run_app(app) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment