Created
January 24, 2023 02:24
-
-
Save pwmcintyre/30ecfc5abcd2abb7abddf41bc2c8b1e9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
/* | |
This will migrate all tables of a database to a new S3 Bucket | |
Steps: | |
- lists all tables of a database, performs the following for each: | |
- list all objects at the table's storage location (S3 path) | |
- copies all objects found to new bucket | |
- updates table config with new S3 path | |
example: | |
AWS_PROFILE=admin AWS_REGION=us-west-2 \ | |
DATABASE_NAME=example_database \ | |
TARGET_BUCKET_NAME=example \ | |
node ./glue/MoveDatabase | |
*/ | |
// validate params | |
[ | |
'DATABASE_NAME', | |
'TARGET_BUCKET_NAME', | |
].forEach( (name) => { | |
if ( !( name in process.env ) ) { | |
console.error(`required env var not found: ${name}`) | |
process.exit(1) | |
} | |
}); | |
// deps | |
const url = require('url') | |
const AWS = require('aws-sdk') | |
// aws clients | |
const glue = new AWS.Glue() | |
const s3 = new AWS.S3() | |
// migrate function | |
async function migrate( DatabaseName, TargetBucket ) { | |
const context = { | |
DatabaseName, | |
TargetBucket, | |
} | |
// list tables for database | |
const tables = await glue.getTables({ DatabaseName }).promise() | |
.catch( error => { | |
error.message = `failed to get tables: ${error.message}` | |
throw error | |
}) | |
// migrate each | |
tables.TableList.map( async (table) => { | |
const TableName = table.Name | |
// migrate only if location includes the old shared location | |
if ( table.StorageDescriptor.Location.includes(TargetBucket) ) { | |
console.info('already migrated', { ...context, TableName, location: table.StorageDescriptor.Location }) | |
return | |
} | |
// parse location | |
const OldLocation = url.parse( table.StorageDescriptor.Location ) | |
// list objects | |
const list = await s3.listObjectsV2({ | |
Bucket: OldLocation.host, | |
Prefix: OldLocation.path.replace(/^[\/]+|[\/]+$/g, "") + '/', // trim leading/trailing slashes, add trailing slash | |
// Delimiter: '/', | |
}).promise() | |
.catch( error => { | |
error.message = `failed to list objects: ${error.message}` | |
throw error | |
}) | |
if ( list.Contents.length <= 0 ) { | |
console.warn('no data found', { ...context, TableName }) | |
} | |
// copy all objects | |
let progress = 0 | |
const moveResult = list.Contents.map( async (object) => { | |
// copy object | |
await s3.copyObject({ | |
Bucket: TargetBucket, | |
CopySource: `/${ OldLocation.host }/${ object.Key }`, | |
Key: object.Key, | |
}).promise() | |
.then( () => { | |
progress++ | |
console.debug('copied', { TableName, key: object.Key, progress, total: list.Contents.length }) | |
}) | |
.catch( error => { | |
error.message = `failed to copy object: ${error.message}` | |
throw error | |
}) | |
}) | |
await Promise.all(moveResult) | |
.catch( error => { | |
error.message = `failed to copy objects: ${error.message}` | |
throw error | |
}) | |
console.info('data copy success', { ...context, TableName }) | |
// get table | |
const config = await glue.getTable({ DatabaseName, Name: TableName }).promise() | |
.catch( error => { | |
error.message = `failed to get table config: ${error.message}` | |
throw error | |
}) | |
console.info('table config before', { ...context, TableName, config: JSON.stringify(config.Table) }) | |
// update table with new location | |
const NewLocation = new url.URL( OldLocation.href ) // using the URL constructor allows us to modify 'host' property | |
NewLocation.host = TargetBucket | |
config.Table.StorageDescriptor.Location = NewLocation.href | |
// cleanup extra properties (read-only values which you can't use in "updateTable" API) | |
delete config.Table.DatabaseName | |
delete config.Table.CreateTime | |
delete config.Table.UpdateTime | |
delete config.Table.CreatedBy | |
delete config.Table.IsRegisteredWithLakeFormation | |
// do update | |
await glue.updateTable({ | |
DatabaseName, | |
TableInput: config.Table, | |
}).promise() | |
.catch( error => { | |
error.message = `failed to update table config: ${error.message}` | |
throw error | |
}) | |
console.info('table config updated', { ...context, DatabaseName, TableName, OldLocation: OldLocation.href, NewLocation: NewLocation.href }) | |
// for humans to compare | |
const configAfter = await glue.getTable({ DatabaseName, Name: TableName }).promise() | |
.catch( error => { | |
error.message = `failed to get table config: ${error.message}` | |
throw error | |
}) | |
console.info('table config after', { ...context, TableName, config: JSON.stringify(configAfter.Table) }) | |
console.info("done") | |
}) | |
} | |
// begin | |
migrate( process.env.DATABASE_NAME, process.env.TARGET_BUCKET_NAME ) | |
.catch( error => console.error("failed to migrate", {error}) ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment