Last active
February 21, 2021 09:52
-
-
Save mitsuse/762ba0316153b93d2fc73b26768edb35 to your computer and use it in GitHub Desktop.
Collect image URLs from Pixabay, Unsplash and Pexels.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from typing import Any | |
from typing import Dict | |
from typing import Optional | |
from typing import Sequence | |
from typing import Tuple | |
from typing import IO | |
from typing_extensions import Literal | |
from typing_extensions import Protocol | |
import sys | |
import json | |
from dataclasses import dataclass | |
from dataclasses import asdict | |
from enum import Enum | |
from io import BytesIO | |
from os import path | |
import typer | |
from requests import Session | |
from requests.adapters import HTTPAdapter | |
from requests.adapters import Retry | |
FAILED_TO_REQUET_FOR_PIXABAY_API = 64 | |
INVALID_CONFIGURATION = 65 | |
app = typer.Typer() | |
@dataclass(frozen=True) | |
class Request: | |
method: Literal["get", "post"] | |
endpoint: str | |
headers: Dict[str, str] | |
parameters: Any | |
@dataclass(frozen=True) | |
class Response: | |
service: str | |
user: str | |
page: str | |
image: str | |
class ImageType(str, Enum): | |
all = "all" | |
photo = "photo" | |
illustration = "illustration" | |
vector = "vector" | |
class Service(str, Enum): | |
pixabay = "pixabay" | |
unsplash = "unsplash" | |
pexels = "pexels" | |
@app.command() | |
def main( | |
access_key: str = typer.Option(..., envvar="PIXABAY_API_KEY"), | |
service: Service = typer.Option(Service.pixabay), | |
query: str = typer.Option(...), | |
page: int = typer.Option(1), | |
count_per_page: int = typer.Option(100), | |
image_type: ImageType = typer.Option(ImageType.photo), | |
sleep: Optional[float] = typer.Option(None), | |
try_once: bool = typer.Option(False), | |
) -> None: | |
import time | |
if service in (Service.unsplash, Service.pexels) and image_type != ImageType.photo: | |
fail( | |
f"{str.capitalize(service)} API doesn't support filtering by image-type except photo.", | |
INVALID_CONFIGURATION, | |
) | |
coder: MessageCoder | |
if service == Service.pixabay: | |
coder = PixabayCoder(access_key=access_key, count_per_page=count_per_page) | |
elif service == Service.unsplash: | |
coder = UnsplashCoder(access_key=access_key, count_per_page=count_per_page) | |
elif service == Service.pexels: | |
coder = PexelsCoder(access_key=access_key, count_per_page=count_per_page) | |
else: | |
raise NotImplemented | |
adapter = HTTPAdapter(max_retries=Retry(total=20, backoff_factor=0.5)) | |
session = Session() | |
session.mount("https://", adapter) | |
if path.exists(query) and path.isfile(query): | |
with open(query) as f: | |
query_ = map(str.rstrip, f.readlines()) | |
else: | |
query_ = iter((query,)) | |
for q in query_: | |
page_ = page | |
while True: | |
request = coder.encode_request( | |
q, | |
image_type=image_type, | |
page=page_, | |
) | |
response = session.request( | |
method=request.method, | |
url=request.endpoint, | |
params=request.parameters, | |
headers=request.headers, | |
) | |
if response.status_code != 200: | |
fail( | |
f"Failed to request for {str.capitalize(service)} API (status code: {response.status_code}, page: {page}).", | |
FAILED_TO_REQUET_FOR_PIXABAY_API, | |
) | |
seq_item, total, last = coder.decode_response(BytesIO(response.content)) | |
for item in tuple(map(lambda x: asdict(x), seq_item)): | |
print(json.dumps(item)) | |
if last or len(seq_item) * page_ >= total or try_once: | |
break | |
if sleep is not None: | |
time.sleep(sleep) | |
page_ += 1 | |
class MessageCoder(Protocol): | |
def encode_request( | |
self, | |
query: str, | |
image_type: ImageType, | |
page: int, | |
) -> Request: | |
... | |
def decode_response( | |
self, | |
content: IO[bytes], | |
) -> Tuple[Sequence[Response], int, bool]: | |
... | |
class PixabayCoder: | |
def __init__(self, access_key: str, count_per_page: int) -> None: | |
self.__access_key = access_key | |
self.__count_per_page = min(count_per_page, 100) | |
def encode_request( | |
self, | |
query: str, | |
image_type: ImageType, | |
page: int, | |
) -> Request: | |
return Request( | |
method="get", | |
endpoint="https://pixabay.com/api/", | |
headers={}, | |
parameters={ | |
"key": self.__access_key, | |
"q": query, | |
"image_type": image_type, | |
"page": str(page), | |
"per_page": str(self.__count_per_page), | |
}, | |
) | |
def decode_response( | |
self, | |
content: IO[bytes], | |
) -> Tuple[Sequence[Response], int, bool]: | |
content_ = json.load(content) | |
hits = content_["hits"] | |
total = content_["totalHits"] | |
items = tuple( | |
( | |
Response( | |
service="Pixabay", | |
user=str(h["user"]), | |
page=str(h["pageURL"]), | |
image=str(h["largeImageURL"]), | |
) | |
for h in hits | |
) | |
) | |
last = len(items) < self.__count_per_page | |
return items, total, last | |
class UnsplashCoder: | |
def __init__(self, access_key: str, count_per_page: int) -> None: | |
self.__access_key = access_key | |
self.__count_per_page = min(count_per_page, 30) | |
def encode_request( | |
self, | |
query: str, | |
image_type: ImageType, | |
page: int, | |
) -> Request: | |
return Request( | |
method="get", | |
endpoint="https://api.unsplash.com/search/photos", | |
headers={ | |
"Authorization": f"Client-ID {self.__access_key}", | |
"Accept-Version": "v1", | |
}, | |
parameters={ | |
"query": query, | |
"page": page, | |
"per_page": self.__count_per_page, | |
}, | |
) | |
def decode_response( | |
self, | |
content: IO[bytes], | |
) -> Tuple[Sequence[Response], int, bool]: | |
content_ = json.load(content) | |
hits = content_["results"] | |
total = content_["total"] | |
items = tuple( | |
( | |
Response( | |
service="Unsplash", | |
user=str(h["user"]["name"]), | |
page=str(h["links"]["html"]), | |
image=str(h["urls"]["raw"]), | |
) | |
for h in hits | |
) | |
) | |
last = len(items) == 0 | |
return items, total, last | |
class PexelsCoder: | |
def __init__(self, access_key: str, count_per_page: int) -> None: | |
self.__access_key = access_key | |
self.__count_per_page = min(count_per_page, 80) | |
def encode_request( | |
self, | |
query: str, | |
image_type: ImageType, | |
page: int, | |
) -> Request: | |
return Request( | |
method="get", | |
endpoint="https://api.pexels.com/v1/search", | |
headers={ | |
"Authorization": self.__access_key, | |
}, | |
parameters={ | |
"query": query, | |
"page": page, | |
"per_page": self.__count_per_page, | |
}, | |
) | |
def decode_response( | |
self, | |
content: IO[bytes], | |
) -> Tuple[Sequence[Response], int, bool]: | |
content_ = json.load(content) | |
hits = content_["photos"] | |
total = content_["total_results"] | |
items = tuple( | |
( | |
Response( | |
service="Pexels", | |
user=str(h["photographer"]), | |
page=str(h["url"]), | |
image=str(h["src"]["large"]), | |
) | |
for h in hits | |
) | |
) | |
last = len(items) == 0 | |
return items, total, last | |
def fail(message: str, exit_code: int) -> None: | |
print(f"error: {message}", file=sys.stderr) | |
sys.exit(exit_code) | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment