Skip to content

Instantly share code, notes, and snippets.

@autrilla
Created July 4, 2019 16:44
Show Gist options
  • Select an option

  • Save autrilla/da5410682fa771b2eb5ba41bb5d922c0 to your computer and use it in GitHub Desktop.

Select an option

Save autrilla/da5410682fa771b2eb5ba41bb5d922c0 to your computer and use it in GitHub Desktop.
import sqlalchemy
import humanize
import time
import logging
import re
import os
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
DB_USER = os.environ['DB_USER']
DB_PASSWORD = os.environ['DB_PASSWORD']
DB_HOST = os.environ['DB_HOST']
DB_DATABASE = os.environ['DB_DATABASE']
CONNECTION_STRING = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_DATABASE}"
FIXED_SIZES = {
"FLOAT": 4,
"DOUBLE": 8,
"TINYINT": 1,
"SMALLINT": 2,
"MEDIUMINT": 3,
"INTEGER": 4,
"BIGINT": 8,
"DATETIME": 8,
"DATE": 3,
}
INDEX_SIZE_LIMIT_BY_ROW_TYPE = {
"dynamic": 3072,
"compressed": 3072,
"redundant": 767,
"compact": 767,
}
UNICODE_CHARACTER_SIZE = 4
COMPOSITE_INDEX_SIZE_LIMIT = 3072
TARGET_CHARACTER_SET = "utf8mb4"
TARGET_COLLATION = "utf8mb4_general_ci"
# Mapping of source-encoding to target-encoding of equivalent
# collations for the given source-to-target combination.
COLLATION_MAPPINGS = {"utf8": {"utf8mb4": {"utf8_bin": "utf8mb4_bin"}}}
TEXT_TYPES = set(("VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"))
class Timer:
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *args):
self.end = time.time()
self.interval = self.end - self.start
class ValidationError(ValueError):
def __init__(self, errors):
self.errors = errors
class Table:
def __init__(self, table, row_format):
self._table = table
self._row_format = row_format
@property
def indexes(self):
max_index_size_per_column = INDEX_SIZE_LIMIT_BY_ROW_TYPE[
self._row_format.lower()
]
return [
Index(i, max_index_size_per_column, COMPOSITE_INDEX_SIZE_LIMIT)
for i in self._table.indexes
]
def get_size(self, engine):
with engine.begin() as connection:
query = sqlalchemy.select(
[sqlalchemy.column("data_length") + sqlalchemy.column("index_length")],
from_obj=sqlalchemy.text("information_schema.tables"),
).where(
sqlalchemy.sql.and_(
sqlalchemy.column("table_schema") == DB_DATABASE,
sqlalchemy.column("table_name") == self._table.name,
)
)
result = connection.execute(query)
return list(result)[0][0]
def convert_to_dynamic(self, engine, print_duration=True):
"""Converts the table's ROW_FORMAT to DYNAMIC. Refuses to do so if the
table is larger than 500MB. Takes a read-only lock on the
table while it's being converted.
"""
MAX_SIZE = 500 * 1024 * 1024 # 500 MB
if self.get_size(engine) > MAX_SIZE:
raise ValueError("Refusing to convert big table to dynamic")
if self._row_format.lower() == "dynamic":
return
with Timer() as t:
with engine.begin() as connection:
result = connection.execute(
f"ALTER TABLE {self._table.name} ROW_FORMAT=DYNAMIC, LOCK=SHARED"
)
self._row_format = "Dynamic"
if print_duration:
print(f"Converting table {self} took {t.interval}s.")
def can_reencode_whole_table(table):
for col in table._table.columns:
collation = getattr(col.type, "collation", None)
encoding = getattr(col.type, "encoding", None)
if collation is not None or encoding is not None:
return False
return True
def compile_reencode_for_column(self, engine, col, character_set, collation):
column_name = col.compile(dialect=engine.dialect)
column_type = col.type.compile(engine.dialect)
column_type = re.sub(r"(COLLATE|CHARACTER SET) \w+", "", column_type).strip()
column_sql = (
f"MODIFY {column_name} {column_type} "
f"CHARACTER SET {character_set} "
f"COLLATE {collation}"
)
if not col.nullable:
column_sql += " NOT NULL"
if col.server_default:
column_sql += " DEFAULT " + str(col.server_default.arg)
return column_sql
def reencode_column_by_column(
self, engine, character_set=TARGET_CHARACTER_SET, collation=TARGET_COLLATION
):
with engine.begin() as connection:
connection.execute(
f"ALTER TABLE {str(self)} DEFAULT CHARACTER SET {character_set}"
)
sql = f"ALTER TABLE {str(self)}\n"
modify_columns = []
for col in self._table.columns:
if col.type.__class__.__name__ not in TEXT_TYPES:
continue
current_collation = getattr(col.type, "collation", None)
current_character_set = getattr(
col.type,
"encoding",
self._table.dialect_options["mysql"]["default charset"],
)
if current_character_set == character_set:
# No need to convert
continue
if current_collation:
mapping = COLLATION_MAPPINGS[current_character_set][character_set]
new_collation = mapping[current_collation]
else:
new_collation = collation
column_sql = self.compile_reencode_for_column(
engine, col, character_set, new_collation
)
modify_columns.append(column_sql)
if modify_columns:
sql += ",\n".join(modify_columns)
connection.execute(sql)
def reencode(
self,
engine,
print_duration=True,
character_set=TARGET_CHARACTER_SET,
collation=TARGET_COLLATION,
):
with Timer() as t:
if self.can_reencode_whole_table():
with engine.begin() as connection:
result = connection.execute(
f"ALTER TABLE {self._table.name} CONVERT TO "
f"CHARACTER SET {character_set} COLLATE "
f"{collation}, LOCK=SHARED"
)
else:
# If columns in the table explicitly specify an
# encoding or collation different than the
# default, we need to reencode column by column.
self.reencode_column_by_column(engine)
if print_duration:
print(f"Reencoding table {self} took {t.interval}s.")
def __str__(self):
return str(self._table)
class IndexColumnTooBig(ValueError):
def __init__(self, column, column_size, maximum_allowed_size):
self.column = column
self.column_size = column_size
self.maximum_allowed_size = maximum_allowed_size
super().__init__(
f"Column {column} has a size of {column_size} bytes, which "
f"is larger than the allowed maximum of "
f"{maximum_allowed_size} bytes."
)
class IndexTotalSizeTooBig(ValueError):
def __init__(self, total_size, maximum_allowed_total_size):
self.total_size = total_size
self.maximum_allowed_total_size = maximum_allowed_total_size
super().__init__(
f"Index has a total size of {total_size} bytes, "
f"which is larger than the allowed maximum of "
f"{maximum_allowed_total_size} bytes."
)
class IndexTooBig(ValueError):
def __init__(self, index_name, errors):
self.total_size_error = total_size_error
self.column_errors = column_errors
super().__init__(f"Index '{index_name}' too big: {errors}")
class Index:
def __init__(self, index, max_size_per_column, max_total_size):
self._index = index
self._max_size_per_column = max_size_per_column
self._max_total_size = max_total_size
@property
def columns(self):
return [Column(c) for c in self._index.columns]
def validate_size(self):
sizes = []
errors = []
for column in self.columns:
sizes.append(column.size)
if column.size > self._max_size_per_column:
errors.append(
IndexColumnTooBig(column, column.size, self._max_size_per_column)
)
if sum(sizes) > self._max_total_size:
errors.insert(0, IndexTotalSizeTooBig(sum(sizes), self._max_total_size))
if errors:
raise IndexTooBig(f"{self._index.table}.{self}", errors)
def __str__(self):
return str(self._index.name)
class Column:
def __init__(self, column):
self._column = column
@property
def size(self):
type_name = self._column.type.__class__.__name__
if hasattr(self._column.type, "length"):
return self._column.type.length * UNICODE_CHARACTER_SIZE
elif type_name in FIXED_SIZES:
return FIXED_SIZES[type_name]
else:
raise ValueError(f"Unknown type {type_name} for column {self._column}")
def has_entries_longer_than(self, engine, length):
query = (
sqlalchemy.select([1])
.select_from(column._column.table)
.where(sqlalchemy.func.length(column._column) > length)
)
with engine.begin() as connection:
result = connection.execute(query)
return len(list(result)) > 0
def __str__(self):
return str(self._column)
def all_tables(meta, engine):
tables = []
with engine.begin() as connection:
query = sqlalchemy.select(
columns=[sqlalchemy.column("table_name"), sqlalchemy.column("row_format")],
from_obj=sqlalchemy.text("information_schema.tables"),
).where(sqlalchemy.column("table_schema") == DB_DATABASE)
result = connection.execute(query)
for table_name, row_format in result:
tables.append(Table(meta.tables[table_name], row_format))
return tables
class TextFieldTooLong(ValueError):
def __init__(
self, column, character_size, would_need_bytes, max_byte_size, type_name
):
self.column = column
self.character_size = character_size
self.would_need_bytes = would_need_bytes
self.max_byte_size = max_byte_size
self.type_name = type_name
super().__init__(
f"Column {column} of type {type_name} is {character_size} "
f"characters long, which would need {would_need_bytes} bytes for "
f"storage. However, the maximum byte size for {type_name} is "
f"{max_byte_size}."
)
def check_text_field_lengths(meta, engine):
"""In MySQL, data types have a maximum size limit defined in
bytes. However, column sizes are defined in characters. Because
changing to utf8mb4 increases per-character size from 3 to 4
bytes, by making the change some columns might now have a bigger
size than the limit for their data type. Therefore, they'd need to
be bumped to a larger data type."""
# Data types that are affected by the change from utf8mb3 to
# urf8mb4
DATA_TYPE_MAX_SIZE = {
"VARCHAR": 65_535,
"TINYTEXT": 255,
"TEXT": 65_535,
"MEDIUMTEXT": 16_777_216,
"LONGTEXT": 4_294_967_296,
}
for table in meta.tables.values():
for column in table.columns:
type_name = column.type.__class__.__name__
if type_name not in DATA_TYPE_MAX_SIZE:
continue
current_character_size = column.type.length
if current_character_size is None:
# When the length is unspecified, the fields take the
# longest possible size allowed by the underlying type. As
# such, changing the encoding has no effect.
continue
max_byte_size = DATA_TYPE_MAX_SIZE[type_name]
mb4_byte_size = current_character_size * UNICODE_CHARACTER_SIZE
if mb4_byte_size > max_byte_size:
raise TextFieldTooLong(
column=column,
character_size=current_character_size,
would_need_bytes=mb4_byte_size,
max_byte_size=max_byte_size,
type_name=type_name,
)
class IndexTooBig(ValueError):
pass
def check_index_sizes(meta, engine):
"""Similarly to text field lengths, index sizes are limited in byte
size. Switching to utf8mb4 would increase the byte size of the
indexes, to the point that they might surpass the limit.
"""
tables = all_tables(meta, engine)
def _check_table(table):
for index in table.indexes:
try:
index.validate_size()
except IndexTooBig as e:
if table._row_format.lower() == "dynamic":
# Nothing we can do to fix the index
raise
# Converting the table ROW_FORMAT to DYNAMIC increases
# the per-column index prefix size limit
# substantially.
table.convert_to_dynamic(engine)
# Check the table again to see if converting to
# dynamic fixed the issue
return _check_table(table)
for table in tables:
_check_table(table)
def update_database_charset(engine):
"""Updates the database charset to utf8mb4. This is a cheap operation,
as updating the database charset basically only changes the
default charset for new tables and doesn't reencode any data."""
with engine.begin() as connection:
with Timer() as t:
print(
f"ALTER DATABASE {DB_DATABASE} CHARACTER SET "
f"{TARGET_CHARACTER_SET} COLLATE {TARGET_COLLATION}"
)
print(f"Database character set updated in {t.interval}s.")
def reencode_tables(meta, engine):
"""Reencodes all tables to utf8mb4"""
tables = all_tables(meta, engine)
with Timer() as t:
for table in tables:
table.reencode(engine)
print(f"All tables reencoded in {t.interval}s.")
def main():
engine = sqlalchemy.create_engine(CONNECTION_STRING)
meta = sqlalchemy.MetaData()
print("Connected to database, reflecting tables...")
meta.reflect(bind=engine)
print("Tables reflected.")
check_text_field_lengths(meta, engine)
check_index_sizes(meta, engine)
update_database_charset(engine)
reencode_tables(meta, engine)
engine = sqlalchemy.create_engine(CONNECTION_STRING)
meta = sqlalchemy.MetaData()
print("Connected to database, reflecting tables...")
meta.reflect(bind=engine)
print("Tables reflected.")
tables = all_tables(meta, engine)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment