Created
April 8, 2014 18:57
-
-
Save vladaman/10171308 to your computer and use it in GitHub Desktop.
PHP Script to split and merge Amazon Kinesis Streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require 'aws-autoloader.php'; | |
use Aws\Kinesis\KinesisClient; | |
// In real applications, the following code is part of your trusted code. | |
// It has your security credentials that you use to obtain temporary | |
// security credentials. | |
// Resource Policy to limit access to just some streams | |
/* | |
"Resource": [ | |
"arn:aws:kinesis:us-east-1:*:stream/sink*" | |
] | |
*/ | |
$AWSKey = 'AWSKey'; | |
$AWSSecret = 'AwsSecret'; | |
$StreamName = 'sink'; | |
// Kinesis | |
$client = KinesisClient::factory(array( | |
'key' => $AWSKey, | |
'secret' => $AWSSecret, | |
'region' => 'us-east-1' | |
)); | |
$result = $client->describeStream(array( | |
// StreamName is required | |
'StreamName' => $StreamName, | |
'Limit' => 20 | |
)); | |
$res = $result->get('StreamDescription'); | |
print_r($result); | |
$ix = 0; | |
foreach ($res['Shards'] as $shard) { | |
if (isset($argv[1]) && $argv[1] == '/split') { | |
print_r($shard); | |
$newHashKeySplit = ($shard['HashKeyRange']['EndingHashKey'] - $shard['HashKeyRange']['StartingHashKey']) / 2; | |
$ln = readline(sprintf("Would you like to split %s shard (Y/n)?", $shard['ShardId'])); | |
if ($ln == 'Y') { | |
splitShard($client, $StreamName, $shard['ShardId'], $newHashKeySplit); | |
} | |
} | |
if (isset($argv[1]) && $argv[1] == '/merge') { | |
printf("%d) %s Parent: %s", $ix, $shard['ShardId'], $shard['ParentShardId']); | |
if (isset($shard['AdjacentParentShardId'])) { | |
printf(" Adj: %s", $shard['AdjacentParentShardId']); | |
} | |
print("\n"); | |
} | |
$ix++; | |
} | |
function splitShard($client, $streamName, $shard, $key) | |
{ | |
$result = $client->splitShard(array( | |
// StreamName is required | |
'StreamName' => $streamName, | |
// ShardToSplit is required | |
'ShardToSplit' => $shard, | |
// NewStartingHashKey is required | |
'NewStartingHashKey' => sprintf("%0.0f", $key) | |
)); | |
printf("Splitting Stream %s shard %s using key %s\n", $streamName, $shard, $key); | |
} | |
function mergeShards($client, $streamName, $shardToMerge, $shardAdjacent) | |
{ | |
$result = $client->mergeShards(array( | |
// StreamName is required | |
'StreamName' => $streamName, | |
// ShardToMerge is required | |
'ShardToMerge' => $shardToMerge, | |
// AdjacentShardToMerge is required | |
'AdjacentShardToMerge' => $shardAdjacent | |
)); | |
} | |
if (isset($argv[1]) && $argv[1] == '/merge') { | |
// Shards can be merged if their end and start sequence numbers continue | |
// mergeShards($client, $StreamName, "shardId-000000000005","shardId-000000000002"); | |
} | |
// print_r($res); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment