Created
September 14, 2017 13:10
-
-
Save coopernurse/ae9373879f2e6c8c343fdd39e1d3ea00 to your computer and use it in GitHub Desktop.
ReproDiskConsistency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.imprev.util; | |
import com.amazonaws.auth.profile.ProfileCredentialsProvider; | |
import com.amazonaws.services.s3.AmazonS3; | |
import com.amazonaws.services.s3.AmazonS3Client; | |
import com.amazonaws.services.s3.model.ObjectListing; | |
import com.amazonaws.services.s3.model.S3Object; | |
import com.amazonaws.services.s3.model.S3ObjectSummary; | |
import com.imprev.soa.util.*; | |
import org.apache.log4j.Logger; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.InputStream; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Author: James Cooper - [email protected] | |
* Date: 9/13/17 | |
*/ | |
public class ReproDiskConsistency { | |
private static final Logger log = Logger.getLogger(ReproDiskConsistency.class); | |
public static void main(String argv[]) throws Exception { | |
Log4JInit.removeAppendersAndInit(); | |
String s3Bucket = "redacted"; | |
String s3Prefix = "redacted"; | |
File baseDir = new File("/tmp/diskrepro"); | |
int runtimeSeconds = 10; | |
int threads = 10; | |
if (argv.length > 0) { | |
baseDir = new File(argv[0]); | |
} | |
if (argv.length > 1) { | |
runtimeSeconds = Integer.parseInt(argv[1]); | |
} | |
if (argv.length > 2) { | |
threads = Integer.parseInt(argv[2]); | |
} | |
log.info("baseDir: " + baseDir + " runtimeSeconds: " + runtimeSeconds + " threads: " + threads); | |
AmazonS3Client s3 = new AmazonS3Client(new ProfileCredentialsProvider()); | |
ReproDiskConsistency repro = new ReproDiskConsistency(s3, s3Bucket, s3Prefix, baseDir, | |
runtimeSeconds, threads); | |
repro.go(); | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
private AmazonS3 s3; | |
private String s3Bucket; | |
private String s3Prefix; | |
private File baseDir; | |
private int runtimeSeconds; | |
private int threads; | |
private Random random = new Random(); | |
private AtomicInteger errorCount; | |
public ReproDiskConsistency(AmazonS3 s3, String s3Bucket, String s3Prefix, File baseDir, int runtimeSeconds, | |
int threads) { | |
this.s3 = s3; | |
this.s3Bucket = s3Bucket; | |
this.s3Prefix = s3Prefix; | |
this.baseDir = baseDir; | |
this.runtimeSeconds = runtimeSeconds; | |
this.threads = threads; | |
} | |
public void go() throws InterruptedException { | |
log.info("Starting test"); | |
this.errorCount = new AtomicInteger(); | |
ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, 120, TimeUnit.SECONDS, | |
new ArrayBlockingQueue<>(threads*5), new ThreadPoolExecutor.CallerRunsPolicy()); | |
int fileCount = 0; | |
long deadline = System.currentTimeMillis() + (runtimeSeconds * 1000); | |
Iterator<S3ObjectSummary> keys = s3Keys(); | |
while (keys.hasNext() && System.currentTimeMillis() < deadline) { | |
S3ObjectSummary summary = keys.next(); | |
fileCount++; | |
if (fileCount % 50 == 0) { | |
log.info("fileCount: " + fileCount + " last key: " + summary.getKey()); | |
} | |
pool.submit(() -> downloadAndVerify(summary)); | |
} | |
pool.shutdown(); | |
pool.awaitTermination(10, TimeUnit.MINUTES); | |
log.info("Test Complete"); | |
log.info(" files downloaded: " + fileCount); | |
log.info(" errors: " + errorCount.get()); | |
} | |
private Iterator<S3ObjectSummary> s3Keys() { | |
return new BaseBatchIterator<S3ObjectSummary>() { | |
@Override | |
protected List<S3ObjectSummary> loadNextBatch(S3ObjectSummary s3ObjectSummary) { | |
String randStr = Integer.toHexString(random.nextInt()); | |
String prefix = s3Prefix + randStr.substring(0, 2) + "/" + randStr.substring(2, 4) + "/"; | |
ObjectListing listing = s3.listObjects(s3Bucket, prefix); | |
log.info("Loaded " + listing.getObjectSummaries().size() + " keys from prefix: " + prefix); | |
return listing.getObjectSummaries(); | |
} | |
}; | |
} | |
private void downloadAndVerify(S3ObjectSummary summary) { | |
try { | |
S3Object obj = s3.getObject(summary.getBucketName(), summary.getKey()); | |
File localFile = new File(baseDir, obj.getKey()); | |
com.imprev.soa.util.FileUtil.mkdirs(localFile.getParentFile()); | |
try (FileOutputStream fos = new FileOutputStream(localFile); InputStream is = obj.getObjectContent()) { | |
Streams.copyStream(is, fos); | |
fos.flush(); | |
fos.getFD().sync(); | |
} | |
long localSize = localFile.length(); | |
if (localSize != summary.getSize()) { | |
sizeError(localSize, summary); | |
} | |
} | |
catch (Throwable t) { | |
log.error(t.getMessage(), t); | |
} | |
} | |
private void sizeError(long localSize, S3ObjectSummary summary) { | |
log.error("size mismatch: local=" + localSize + " s3=" + summary.getSize() + " key=" + summary.getKey()); | |
errorCount.incrementAndGet(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment