Last active
July 9, 2019 21:48
-
-
Save sashka/f764e112b12a93eb8a539823298c9b07 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# encoding: utf-8 | |
from __future__ import print_function | |
import argparse | |
import gzip | |
import io | |
import os | |
from time import strftime | |
from urlparse import urljoin | |
from atomicfile import AtomicFile | |
from collections import Counter | |
from vitrina.frontend.settings import DATABASE, MEDIA_PATH | |
from vitrina.storage import Connection | |
from vitrina.storage.product import Product | |
from vitrina.storage.flatpage import Flatpage | |
from vitrina.storage.seo import SitemapLink | |
from vitrina.third_party.elementflow import xml as elementflow_xml | |
class AtomicFileGz(object): | |
def __init__(self, name, mode="w+b", createmode=None): | |
self._f = AtomicFile(name, mode, createmode) | |
self._gz = gzip.GzipFile(name, mode, fileobj=self._f) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, exc_tb): | |
if exc_type: | |
return | |
self.close() | |
def write(self, string): | |
self._gz.write(string) | |
def close(self): | |
self._gz.close() | |
self._f.close() | |
class Sitemap(object): | |
""" | |
Sitemap index generator. | |
Sitemap index contains no urlset itself, | |
they will be stored into separate urlset files created by ``Urlset`` class. | |
Urls assumption: sitemap and its urlset files are in the same catalog, | |
e.g. /www/example.com/xml/sitemap.xml and /www/example.com/xml/sitemap_products.xml | |
Usage sample: | |
with Sitemap('/tmp', 'http://example.com/tmp') as sitemap: | |
with sitemap.urlsets('products') as urlset: | |
for i in range(100000): | |
urlset.add_url('http://example.com/article/%d' % i, priority=0.8, changefreq='daily') | |
with sitemap.urlsets('promo') as urlset: | |
urlset.add_url('http://example.com/promo', priority=0.5, changefreq='weekly') | |
""" | |
def __init__(self, path, base_url, compress=False, indent=False): | |
self.path = path | |
self.urlsets = [] | |
self.base_url = base_url | |
self.compress = compress | |
self.indent = indent | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, exc_tb): | |
if exc_type: | |
return | |
self.flush() | |
def urlset(self, urlset_name): | |
generator = Urlset(self.path, urlset_name, self.compress, self.indent) | |
self.urlsets.append(generator) | |
return generator | |
def flush(self): | |
today = strftime('%Y-%m-%d') | |
with AtomicFile(os.path.join(self.path, 'sitemap.xml')) as f: | |
with elementflow_xml(f, 'sitemapindex', namespaces={'': 'http://www.sitemaps.org/schemas/sitemap/0.9'}, indent=self.indent) as xml: | |
for urlset in self.urlsets: | |
for fname in urlset.files: | |
with xml.container('sitemap'): | |
xml.element('loc', text=urljoin(self.base_url, fname)) | |
xml.element('lastmod', text=today) | |
class Urlset(object): | |
""" | |
Sitemap urlset generator. | |
To be unvoked via ``Sitemap``. | |
Creates one or more urlset files to keep less than 30K urls per file. | |
""" | |
def __init__(self, path, urlset_name, compress=False, indent=False, threshold=30000): | |
self.name = urlset_name | |
self.path = path | |
self.compress = compress | |
self.indent = indent | |
self.files = [] | |
self.threshold = threshold | |
self.today = strftime('%Y-%m-%d') | |
self._fname = None | |
self._f = None | |
self._url_count = None | |
self._xml = None | |
self._new_file() | |
def _new_file(self): | |
self._url_count = 0 | |
n = len(self.files) | |
seq = '_%d' % n if n else '' | |
trail = '.gz' if self.compress else '' | |
self._fname = 'sitemap_%s%s.xml%s' % (self.name, seq, trail) | |
if self.compress: | |
self._f = AtomicFileGz(os.path.join(self.path, self._fname)) | |
else: | |
self._f = AtomicFile(os.path.join(self.path, self._fname)) | |
# I'm going to simulate ``with``. | |
self._xml = elementflow_xml(self._f, 'urlset', namespaces={'': 'http://www.sitemaps.org/schemas/sitemap/0.9'}, indent=self.indent).__enter__() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, exc_tb): | |
if exc_type: | |
return | |
self.flush(exit=True) | |
def add_url(self, link, lastmod=None, priority=0.5, changefreq='weekly'): | |
dt = lastmod.strftime('%Y-%m-%d') if lastmod is not None else self.today | |
with self._xml.container('url'): | |
self._xml.element('loc', text=link) | |
self._xml.element('priority', text=str(priority)) | |
self._xml.element('changefreq', text=changefreq) | |
self._xml.element('lastmod', text=dt) | |
self._url_count += 1 | |
# New urlset file to be created when the threshold is surpassed. | |
if self._url_count > self.threshold: | |
self.flush() | |
def flush(self, exit=False): | |
if not self._url_count: | |
return | |
# Simulating end of ``with`` clause. | |
self._xml.__exit__(None, None, None) | |
self._f.close() | |
self.files.append(self._fname) | |
if not exit: | |
self._new_file() | |
def print_stats(filename, stats): | |
print(u'%s: %s' % (filename, ' '.join(['%s=%d' % (k, stats[k]) for k in stats]))) | |
def normalize_url(url, base='http://example.com/'): | |
if 'http://' in url: | |
return url | |
return urljoin(base, url) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Sitemap generator') | |
parser.add_argument('-D', '--datadir', help='Path to CSV or YAML files', | |
dest='datadir', type=str, action='store', default='csv') | |
args = parser.parse_args() | |
db = Connection(DATABASE.host, DATABASE.database, user=DATABASE.user, password=DATABASE.password, time_zone=DATABASE.time_zone) | |
stats = Counter() | |
xml_path = os.path.join(MEDIA_PATH, 'xml') | |
xml_url = 'http://example.com/xml/' | |
with Sitemap(xml_path, xml_url, compress=True) as sitemap: | |
with sitemap.urlset('products') as urlset: | |
for product in Product.list_active_skus(db): | |
urlset.add_url(normalize_url('/product/%d' % product.sku), priority=0.9, changefreq='weekly') | |
stats.update(product=1, total=1) | |
with sitemap.urlset('additional') as urlset: | |
for link in SitemapLink.get_all(db): | |
if link.is_public: | |
urlset.add_url(normalize_url(link.url), priority=0.8, changefreq='weekly') | |
stats.update(additional=1, total=1) | |
for link in Flatpage.get_all(db, active=True): | |
urlset.add_url(normalize_url('/page/' + link.safe_path), priority=0.8, changefreq='weekly') | |
stats.update(flatpage=1, additional=1, total=1) | |
for link in io.open(os.path.join(args.datadir, 'sitemap_static.csv'), 'rt', newline=''): | |
link = link.strip() | |
if link: | |
urlset.add_url(normalize_url(link), priority=0.5, changefreq='monthly') | |
stats.update(static=1, additional=1, total=1) | |
print_stats(os.path.join(xml_path, 'sitemap.xml'), stats) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment