Created
July 4, 2019 16:44
-
-
Save autrilla/da5410682fa771b2eb5ba41bb5d922c0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sqlalchemy | |
| import humanize | |
| import time | |
| import logging | |
| import re | |
| import os | |
| logging.basicConfig() | |
| logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) | |
| DB_USER = os.environ['DB_USER'] | |
| DB_PASSWORD = os.environ['DB_PASSWORD'] | |
| DB_HOST = os.environ['DB_HOST'] | |
| DB_DATABASE = os.environ['DB_DATABASE'] | |
| CONNECTION_STRING = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_DATABASE}" | |
| FIXED_SIZES = { | |
| "FLOAT": 4, | |
| "DOUBLE": 8, | |
| "TINYINT": 1, | |
| "SMALLINT": 2, | |
| "MEDIUMINT": 3, | |
| "INTEGER": 4, | |
| "BIGINT": 8, | |
| "DATETIME": 8, | |
| "DATE": 3, | |
| } | |
| INDEX_SIZE_LIMIT_BY_ROW_TYPE = { | |
| "dynamic": 3072, | |
| "compressed": 3072, | |
| "redundant": 767, | |
| "compact": 767, | |
| } | |
| UNICODE_CHARACTER_SIZE = 4 | |
| COMPOSITE_INDEX_SIZE_LIMIT = 3072 | |
| TARGET_CHARACTER_SET = "utf8mb4" | |
| TARGET_COLLATION = "utf8mb4_general_ci" | |
| # Mapping of source-encoding to target-encoding of equivalent | |
| # collations for the given source-to-target combination. | |
| COLLATION_MAPPINGS = {"utf8": {"utf8mb4": {"utf8_bin": "utf8mb4_bin"}}} | |
| TEXT_TYPES = set(("VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT")) | |
| class Timer: | |
| def __enter__(self): | |
| self.start = time.time() | |
| return self | |
| def __exit__(self, *args): | |
| self.end = time.time() | |
| self.interval = self.end - self.start | |
| class ValidationError(ValueError): | |
| def __init__(self, errors): | |
| self.errors = errors | |
| class Table: | |
| def __init__(self, table, row_format): | |
| self._table = table | |
| self._row_format = row_format | |
| @property | |
| def indexes(self): | |
| max_index_size_per_column = INDEX_SIZE_LIMIT_BY_ROW_TYPE[ | |
| self._row_format.lower() | |
| ] | |
| return [ | |
| Index(i, max_index_size_per_column, COMPOSITE_INDEX_SIZE_LIMIT) | |
| for i in self._table.indexes | |
| ] | |
| def get_size(self, engine): | |
| with engine.begin() as connection: | |
| query = sqlalchemy.select( | |
| [sqlalchemy.column("data_length") + sqlalchemy.column("index_length")], | |
| from_obj=sqlalchemy.text("information_schema.tables"), | |
| ).where( | |
| sqlalchemy.sql.and_( | |
| sqlalchemy.column("table_schema") == DB_DATABASE, | |
| sqlalchemy.column("table_name") == self._table.name, | |
| ) | |
| ) | |
| result = connection.execute(query) | |
| return list(result)[0][0] | |
| def convert_to_dynamic(self, engine, print_duration=True): | |
| """Converts the table's ROW_FORMAT to DYNAMIC. Refuses to do so if the | |
| table is larger than 500MB. Takes a read-only lock on the | |
| table while it's being converted. | |
| """ | |
| MAX_SIZE = 500 * 1024 * 1024 # 500 MB | |
| if self.get_size(engine) > MAX_SIZE: | |
| raise ValueError("Refusing to convert big table to dynamic") | |
| if self._row_format.lower() == "dynamic": | |
| return | |
| with Timer() as t: | |
| with engine.begin() as connection: | |
| result = connection.execute( | |
| f"ALTER TABLE {self._table.name} ROW_FORMAT=DYNAMIC, LOCK=SHARED" | |
| ) | |
| self._row_format = "Dynamic" | |
| if print_duration: | |
| print(f"Converting table {self} took {t.interval}s.") | |
| def can_reencode_whole_table(table): | |
| for col in table._table.columns: | |
| collation = getattr(col.type, "collation", None) | |
| encoding = getattr(col.type, "encoding", None) | |
| if collation is not None or encoding is not None: | |
| return False | |
| return True | |
| def compile_reencode_for_column(self, engine, col, character_set, collation): | |
| column_name = col.compile(dialect=engine.dialect) | |
| column_type = col.type.compile(engine.dialect) | |
| column_type = re.sub(r"(COLLATE|CHARACTER SET) \w+", "", column_type).strip() | |
| column_sql = ( | |
| f"MODIFY {column_name} {column_type} " | |
| f"CHARACTER SET {character_set} " | |
| f"COLLATE {collation}" | |
| ) | |
| if not col.nullable: | |
| column_sql += " NOT NULL" | |
| if col.server_default: | |
| column_sql += " DEFAULT " + str(col.server_default.arg) | |
| return column_sql | |
| def reencode_column_by_column( | |
| self, engine, character_set=TARGET_CHARACTER_SET, collation=TARGET_COLLATION | |
| ): | |
| with engine.begin() as connection: | |
| connection.execute( | |
| f"ALTER TABLE {str(self)} DEFAULT CHARACTER SET {character_set}" | |
| ) | |
| sql = f"ALTER TABLE {str(self)}\n" | |
| modify_columns = [] | |
| for col in self._table.columns: | |
| if col.type.__class__.__name__ not in TEXT_TYPES: | |
| continue | |
| current_collation = getattr(col.type, "collation", None) | |
| current_character_set = getattr( | |
| col.type, | |
| "encoding", | |
| self._table.dialect_options["mysql"]["default charset"], | |
| ) | |
| if current_character_set == character_set: | |
| # No need to convert | |
| continue | |
| if current_collation: | |
| mapping = COLLATION_MAPPINGS[current_character_set][character_set] | |
| new_collation = mapping[current_collation] | |
| else: | |
| new_collation = collation | |
| column_sql = self.compile_reencode_for_column( | |
| engine, col, character_set, new_collation | |
| ) | |
| modify_columns.append(column_sql) | |
| if modify_columns: | |
| sql += ",\n".join(modify_columns) | |
| connection.execute(sql) | |
| def reencode( | |
| self, | |
| engine, | |
| print_duration=True, | |
| character_set=TARGET_CHARACTER_SET, | |
| collation=TARGET_COLLATION, | |
| ): | |
| with Timer() as t: | |
| if self.can_reencode_whole_table(): | |
| with engine.begin() as connection: | |
| result = connection.execute( | |
| f"ALTER TABLE {self._table.name} CONVERT TO " | |
| f"CHARACTER SET {character_set} COLLATE " | |
| f"{collation}, LOCK=SHARED" | |
| ) | |
| else: | |
| # If columns in the table explicitly specify an | |
| # encoding or collation different than the | |
| # default, we need to reencode column by column. | |
| self.reencode_column_by_column(engine) | |
| if print_duration: | |
| print(f"Reencoding table {self} took {t.interval}s.") | |
| def __str__(self): | |
| return str(self._table) | |
| class IndexColumnTooBig(ValueError): | |
| def __init__(self, column, column_size, maximum_allowed_size): | |
| self.column = column | |
| self.column_size = column_size | |
| self.maximum_allowed_size = maximum_allowed_size | |
| super().__init__( | |
| f"Column {column} has a size of {column_size} bytes, which " | |
| f"is larger than the allowed maximum of " | |
| f"{maximum_allowed_size} bytes." | |
| ) | |
| class IndexTotalSizeTooBig(ValueError): | |
| def __init__(self, total_size, maximum_allowed_total_size): | |
| self.total_size = total_size | |
| self.maximum_allowed_total_size = maximum_allowed_total_size | |
| super().__init__( | |
| f"Index has a total size of {total_size} bytes, " | |
| f"which is larger than the allowed maximum of " | |
| f"{maximum_allowed_total_size} bytes." | |
| ) | |
| class IndexTooBig(ValueError): | |
| def __init__(self, index_name, errors): | |
| self.total_size_error = total_size_error | |
| self.column_errors = column_errors | |
| super().__init__(f"Index '{index_name}' too big: {errors}") | |
| class Index: | |
| def __init__(self, index, max_size_per_column, max_total_size): | |
| self._index = index | |
| self._max_size_per_column = max_size_per_column | |
| self._max_total_size = max_total_size | |
| @property | |
| def columns(self): | |
| return [Column(c) for c in self._index.columns] | |
| def validate_size(self): | |
| sizes = [] | |
| errors = [] | |
| for column in self.columns: | |
| sizes.append(column.size) | |
| if column.size > self._max_size_per_column: | |
| errors.append( | |
| IndexColumnTooBig(column, column.size, self._max_size_per_column) | |
| ) | |
| if sum(sizes) > self._max_total_size: | |
| errors.insert(0, IndexTotalSizeTooBig(sum(sizes), self._max_total_size)) | |
| if errors: | |
| raise IndexTooBig(f"{self._index.table}.{self}", errors) | |
| def __str__(self): | |
| return str(self._index.name) | |
| class Column: | |
| def __init__(self, column): | |
| self._column = column | |
| @property | |
| def size(self): | |
| type_name = self._column.type.__class__.__name__ | |
| if hasattr(self._column.type, "length"): | |
| return self._column.type.length * UNICODE_CHARACTER_SIZE | |
| elif type_name in FIXED_SIZES: | |
| return FIXED_SIZES[type_name] | |
| else: | |
| raise ValueError(f"Unknown type {type_name} for column {self._column}") | |
| def has_entries_longer_than(self, engine, length): | |
| query = ( | |
| sqlalchemy.select([1]) | |
| .select_from(column._column.table) | |
| .where(sqlalchemy.func.length(column._column) > length) | |
| ) | |
| with engine.begin() as connection: | |
| result = connection.execute(query) | |
| return len(list(result)) > 0 | |
| def __str__(self): | |
| return str(self._column) | |
| def all_tables(meta, engine): | |
| tables = [] | |
| with engine.begin() as connection: | |
| query = sqlalchemy.select( | |
| columns=[sqlalchemy.column("table_name"), sqlalchemy.column("row_format")], | |
| from_obj=sqlalchemy.text("information_schema.tables"), | |
| ).where(sqlalchemy.column("table_schema") == DB_DATABASE) | |
| result = connection.execute(query) | |
| for table_name, row_format in result: | |
| tables.append(Table(meta.tables[table_name], row_format)) | |
| return tables | |
| class TextFieldTooLong(ValueError): | |
| def __init__( | |
| self, column, character_size, would_need_bytes, max_byte_size, type_name | |
| ): | |
| self.column = column | |
| self.character_size = character_size | |
| self.would_need_bytes = would_need_bytes | |
| self.max_byte_size = max_byte_size | |
| self.type_name = type_name | |
| super().__init__( | |
| f"Column {column} of type {type_name} is {character_size} " | |
| f"characters long, which would need {would_need_bytes} bytes for " | |
| f"storage. However, the maximum byte size for {type_name} is " | |
| f"{max_byte_size}." | |
| ) | |
| def check_text_field_lengths(meta, engine): | |
| """In MySQL, data types have a maximum size limit defined in | |
| bytes. However, column sizes are defined in characters. Because | |
| changing to utf8mb4 increases per-character size from 3 to 4 | |
| bytes, by making the change some columns might now have a bigger | |
| size than the limit for their data type. Therefore, they'd need to | |
| be bumped to a larger data type.""" | |
| # Data types that are affected by the change from utf8mb3 to | |
| # urf8mb4 | |
| DATA_TYPE_MAX_SIZE = { | |
| "VARCHAR": 65_535, | |
| "TINYTEXT": 255, | |
| "TEXT": 65_535, | |
| "MEDIUMTEXT": 16_777_216, | |
| "LONGTEXT": 4_294_967_296, | |
| } | |
| for table in meta.tables.values(): | |
| for column in table.columns: | |
| type_name = column.type.__class__.__name__ | |
| if type_name not in DATA_TYPE_MAX_SIZE: | |
| continue | |
| current_character_size = column.type.length | |
| if current_character_size is None: | |
| # When the length is unspecified, the fields take the | |
| # longest possible size allowed by the underlying type. As | |
| # such, changing the encoding has no effect. | |
| continue | |
| max_byte_size = DATA_TYPE_MAX_SIZE[type_name] | |
| mb4_byte_size = current_character_size * UNICODE_CHARACTER_SIZE | |
| if mb4_byte_size > max_byte_size: | |
| raise TextFieldTooLong( | |
| column=column, | |
| character_size=current_character_size, | |
| would_need_bytes=mb4_byte_size, | |
| max_byte_size=max_byte_size, | |
| type_name=type_name, | |
| ) | |
| class IndexTooBig(ValueError): | |
| pass | |
| def check_index_sizes(meta, engine): | |
| """Similarly to text field lengths, index sizes are limited in byte | |
| size. Switching to utf8mb4 would increase the byte size of the | |
| indexes, to the point that they might surpass the limit. | |
| """ | |
| tables = all_tables(meta, engine) | |
| def _check_table(table): | |
| for index in table.indexes: | |
| try: | |
| index.validate_size() | |
| except IndexTooBig as e: | |
| if table._row_format.lower() == "dynamic": | |
| # Nothing we can do to fix the index | |
| raise | |
| # Converting the table ROW_FORMAT to DYNAMIC increases | |
| # the per-column index prefix size limit | |
| # substantially. | |
| table.convert_to_dynamic(engine) | |
| # Check the table again to see if converting to | |
| # dynamic fixed the issue | |
| return _check_table(table) | |
| for table in tables: | |
| _check_table(table) | |
| def update_database_charset(engine): | |
| """Updates the database charset to utf8mb4. This is a cheap operation, | |
| as updating the database charset basically only changes the | |
| default charset for new tables and doesn't reencode any data.""" | |
| with engine.begin() as connection: | |
| with Timer() as t: | |
| print( | |
| f"ALTER DATABASE {DB_DATABASE} CHARACTER SET " | |
| f"{TARGET_CHARACTER_SET} COLLATE {TARGET_COLLATION}" | |
| ) | |
| print(f"Database character set updated in {t.interval}s.") | |
| def reencode_tables(meta, engine): | |
| """Reencodes all tables to utf8mb4""" | |
| tables = all_tables(meta, engine) | |
| with Timer() as t: | |
| for table in tables: | |
| table.reencode(engine) | |
| print(f"All tables reencoded in {t.interval}s.") | |
| def main(): | |
| engine = sqlalchemy.create_engine(CONNECTION_STRING) | |
| meta = sqlalchemy.MetaData() | |
| print("Connected to database, reflecting tables...") | |
| meta.reflect(bind=engine) | |
| print("Tables reflected.") | |
| check_text_field_lengths(meta, engine) | |
| check_index_sizes(meta, engine) | |
| update_database_charset(engine) | |
| reencode_tables(meta, engine) | |
| engine = sqlalchemy.create_engine(CONNECTION_STRING) | |
| meta = sqlalchemy.MetaData() | |
| print("Connected to database, reflecting tables...") | |
| meta.reflect(bind=engine) | |
| print("Tables reflected.") | |
| tables = all_tables(meta, engine) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment