# defult: -i=2 -t=20 -u=100
> java SimpleSnowflakeTest.java
Generators QPS: [ 187 181] Total IDs: 4176 (368 /s) Conflicts: 1139 (104/s) 27.275%
# -t=20 -u=200 => 20 tokens pre 200 ms (100/s)
> java SimpleSnowflakeTest.java -t=20 -u=200
Generators QPS: [ 100 99]] Total IDs: 3404 (199 /s) Conflicts: 1062 (52 /s) 31.199%
# -t=20 -u=20 => 20 tokens pre 20 ms (1000/s)
> java SimpleSnowflakeTest.java -u=20
Generators QPS: [ 740 744] Total IDs: 18959 (1484/s) Conflicts: 4709 (388/s) 24.838%
# -r => starts with a random value
> java SimpleSnowflakeTest.java -r
Generators QPS: [ 191 192] Total IDs: 2702 (383 /s) Conflicts: 10 (2 /s) 0.370%
# -t=20 -u=20 => 20 tokens pre 20 ms (1000/s)
> java SimpleSnowflakeTest.java -t=20 -u=20 -r
Generators QPS: [ 723 731] Total IDs: 18350 (1454/s) Conflicts: 52 (6 /s) 0.283%
# -i=3 => 3 instances
> java SimpleSnowflakeTest.java -i=3 -t=20 -u=20 -r
Generators QPS: [ 487 481 537] Total IDs: 20747 (1505/s) Conflicts: 77 (4 /s) 0.371%
Last active
March 17, 2024 15:53
-
-
Save wcp1231/2464917504f862823dc71c2f0e5af534 to your computer and use it in GitHub Desktop.
A simple snowflake conflict test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.security.SecureRandom; | |
import java.util.ArrayList; | |
import java.util.Enumeration; | |
import java.util.HashMap; | |
import java.util.concurrent.*; | |
import java.util.concurrent.atomic.AtomicLong; | |
public class SimpleSnowflakeTest { | |
/** | |
* A simple snowflakes implementation. | |
*/ | |
static class SimpleSnowflakes { | |
private final static long START_TIMESTAMP = 1690819200000L; | |
private final static int MAX_MACHINE_ID = 100; | |
private final static int MAX_SEQUENCE = 100; | |
private final static int FAKE_MACHINE_ID = 1; | |
private final boolean randomStart; | |
private final SecureRandom random = new SecureRandom(); | |
private long lastTimeStamp = -1L; | |
private long sequence = 0L; | |
public SimpleSnowflakes(boolean randomStart) { | |
this.randomStart = randomStart; | |
} | |
public synchronized long nextId() { | |
long currTimeStamp = getNewTimeStamp(); | |
if (currTimeStamp == lastTimeStamp) { | |
sequence = (sequence + 1) % MAX_SEQUENCE; | |
if (sequence == 0L) { | |
currTimeStamp = getNextMill(lastTimeStamp); | |
} | |
} else { | |
// Whether to start with a random value | |
sequence = randomStart ? random.nextInt(MAX_SEQUENCE) : 0; | |
} | |
lastTimeStamp = currTimeStamp; | |
return (currTimeStamp - START_TIMESTAMP) * MAX_MACHINE_ID * MAX_SEQUENCE | |
+ FAKE_MACHINE_ID * MAX_SEQUENCE | |
+ sequence; | |
} | |
private static long getNextMill(long lastTimeStamp) { | |
long mill = getNewTimeStamp(); | |
while (mill <= lastTimeStamp) { | |
mill = getNewTimeStamp(); | |
} | |
return mill; | |
} | |
private static long getNewTimeStamp() { | |
return System.currentTimeMillis(); | |
} | |
} | |
static class TestManager { | |
private final ExecutorService workerPool; | |
private final ConflictTracker conflictTracker; | |
private final ArrayList<IdGeneratingWorker> idGeneratingWorkers; | |
private Future<?> reportFuture; | |
public TestManager(Argument argument) { | |
this.workerPool = Executors.newFixedThreadPool(argument.instances); | |
this.idGeneratingWorkers = new ArrayList<>(argument.instances); | |
QpsTracker qpsTracker = new QpsTracker(); | |
TokenBucket tokenBucket = new TokenBucket(argument.capacity, argument.tokensPreUnit, argument.unitInNanos); | |
this.conflictTracker = new ConflictTracker(qpsTracker); | |
for (int i = 0; i < argument.instances; i++) { | |
SimpleSnowflakes ssf = new SimpleSnowflakes(argument.random); | |
IdGeneratingWorker idGeneratingWorker = new IdGeneratingWorker(ssf, tokenBucket, qpsTracker, conflictTracker); | |
idGeneratingWorkers.add(idGeneratingWorker); | |
} | |
} | |
public void start() { | |
conflictTracker.start(); | |
for (IdGeneratingWorker worker : idGeneratingWorkers) { | |
workerPool.submit(worker); | |
} | |
reportFuture = Executors.newSingleThreadScheduledExecutor() | |
.scheduleWithFixedDelay(this::report, 1, 1, TimeUnit.SECONDS); | |
} | |
public void stop() { | |
for (IdGeneratingWorker worker : idGeneratingWorkers) { | |
worker.stop(); | |
} | |
workerPool.shutdown(); | |
conflictTracker.stop(); | |
reportFuture.cancel(true); | |
} | |
private void report() { | |
StringBuilder sb = new StringBuilder(); | |
for (IdGeneratingWorker worker : idGeneratingWorkers) { | |
sb.append(" ").append(worker.getQps()); | |
} | |
System.out.printf("\rGenerators QPS: [%s]\t%s", sb, conflictTracker.getStatistics()); | |
} | |
} | |
/** | |
* A worker that generates IDs. | |
*/ | |
static class IdGeneratingWorker implements Runnable { | |
private final SimpleSnowflakes ssf; | |
private final TokenBucket tokenBucket; | |
private final QpsTracker qpsTracker; | |
private final ConflictTracker conflictTracker; | |
private volatile boolean running = true; | |
public IdGeneratingWorker(SimpleSnowflakes ssf, TokenBucket tokenBucket, | |
QpsTracker qpsTracker, ConflictTracker conflictTracker) { | |
this.ssf = ssf; | |
this.tokenBucket = tokenBucket; | |
this.qpsTracker = qpsTracker; | |
this.conflictTracker = conflictTracker; | |
} | |
@Override | |
public void run() { | |
while (running) { | |
if (tokenBucket.tryConsume()) { | |
qpsTracker.track(this); | |
conflictTracker.check(ssf.nextId()); | |
} | |
} | |
} | |
public void stop() { | |
running = false; | |
} | |
public Long getQps() { | |
return qpsTracker.getQps(this); | |
} | |
} | |
/** | |
* A tracker that tracks the conflict rate. | |
*/ | |
static class ConflictTracker { | |
private final AtomicLong total = new AtomicLong(0); | |
private final AtomicLong conflicts = new AtomicLong(0); | |
private final ConcurrentHashMap<Long, ConcurrentHashMap<Long, Boolean>> timeBuckets; | |
private final QpsTracker qpsTracker; | |
private final ScheduledExecutorService cleanPool; | |
public ConflictTracker(QpsTracker qpsTracker) { | |
this.qpsTracker = qpsTracker; | |
this.timeBuckets = new ConcurrentHashMap<>(); | |
this.cleanPool = Executors.newSingleThreadScheduledExecutor(); | |
} | |
public void start() { | |
cleanPool.scheduleWithFixedDelay(this::clean, 0, 3, TimeUnit.SECONDS); | |
} | |
public void check(long id) { | |
total.incrementAndGet(); | |
qpsTracker.track(total); | |
long sec = id / (SimpleSnowflakes.MAX_MACHINE_ID * SimpleSnowflakes.MAX_SEQUENCE); | |
ConcurrentHashMap<Long, Boolean> bucket = timeBuckets | |
.computeIfAbsent(sec, (k) -> new ConcurrentHashMap<>()); | |
if (bucket.containsKey(id)) { | |
conflicts.incrementAndGet(); | |
qpsTracker.track(conflicts); | |
} | |
bucket.put(id, true); | |
} | |
public void stop() { | |
cleanPool.shutdown(); | |
} | |
private void clean() { | |
Enumeration<Long> keys = timeBuckets.keys(); | |
while(keys.hasMoreElements()) { | |
long sec = keys.nextElement(); | |
if (System.currentTimeMillis() / 1000 > sec) { | |
timeBuckets.remove(sec); | |
} | |
} | |
} | |
public String getStatistics() { | |
long total = this.total.get(); | |
long conflicts = this.conflicts.get(); | |
return "Total IDs: %-6d (%-4d/s)\tConflicts: %-5d (%-3d/s)\t%.3f%%" | |
.formatted(total, qpsTracker.getQps(this.total), | |
conflicts, qpsTracker.getQps(this.conflicts), | |
conflicts * 100.0 / total); | |
} | |
} | |
/** | |
* A tracker that tracks the QPS. | |
*/ | |
static class QpsTracker { | |
private final ConcurrentHashMap<Object, AtomicLong> qpsCounters = new ConcurrentHashMap<>(); | |
private final HashMap<Object, Long> qpsResult = new HashMap<>(); | |
public QpsTracker() { | |
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { | |
for (Object key : qpsCounters.keySet()) { | |
long qps = qpsCounters.get(key).getAndSet(0); | |
qpsResult.put(key, qps); | |
} | |
}, 1, 1, TimeUnit.SECONDS); | |
} | |
public void track(Object key) { | |
qpsCounters.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet(); | |
} | |
public Long getQps(Object key) { | |
return qpsResult.getOrDefault(key, 0L); | |
} | |
} | |
/** | |
* A simple token bucket implementation. | |
*/ | |
static class TokenBucket { | |
private final long capacity; | |
private final long tokensPerUnit; | |
private final long unitInNanos; | |
private final AtomicLong tokens; | |
private final AtomicLong lastFillTime; | |
public TokenBucket(long capacity, long tokensPerUnit, long unitInNanos) { | |
this.capacity = capacity; | |
this.tokensPerUnit = tokensPerUnit; | |
this.unitInNanos = unitInNanos; | |
this.tokens = new AtomicLong(0); | |
this.lastFillTime = new AtomicLong(System.nanoTime()); | |
} | |
public boolean tryConsume() { | |
fill(); | |
while (true) { | |
long currentTokens = tokens.get(); | |
if (currentTokens < 1) { | |
return false; | |
} | |
if (tokens.compareAndSet(currentTokens, currentTokens - 1)) { | |
return true; | |
} | |
} | |
} | |
private void fill() { | |
long now = System.nanoTime(); | |
long timeSinceLastFill = now - lastFillTime.get(); | |
long tokensToAdd = (tokensPerUnit * timeSinceLastFill) / unitInNanos; | |
if (tokensToAdd < 1) { | |
return; | |
} | |
while (true) { | |
long currentTokens = tokens.get(); | |
long newTokens = Math.min(capacity, currentTokens + tokensToAdd); | |
if (tokens.compareAndSet(currentTokens, newTokens)) { | |
lastFillTime.set(now); | |
return; | |
} | |
} | |
} | |
} | |
static class Argument { | |
public final int instances; | |
public final long capacity; | |
public final long tokensPreUnit; | |
public final long unitInNanos; | |
public final boolean random; | |
Argument(int instances, long capacity, long tokensPreUnit, long unitInNanos, boolean random) { | |
this.instances = instances; | |
this.capacity = capacity; | |
this.tokensPreUnit = tokensPreUnit; | |
this.unitInNanos = unitInNanos; | |
this.random = random; | |
} | |
public static Argument parse(String[] args) { | |
int instances = 2; | |
long capacity = 1000; | |
long tokensPreUnit = 20; | |
long unitInNanos = TimeUnit.MILLISECONDS.toNanos(100); | |
boolean random = false; | |
for (String arg : args) { | |
if (arg.equals("-random") || arg.equals("-r")) { | |
random = true; | |
continue; | |
} | |
String[] kv = arg.split("="); | |
if (kv.length != 2) { | |
continue; | |
} | |
switch (kv[0]) { | |
case "-instances", "-i": | |
instances = Integer.parseInt(kv[1]); | |
break; | |
case "-capacity", "-c": | |
capacity = Long.parseLong(kv[1]); | |
break; | |
case "-tokensPreUnit", "-t": | |
tokensPreUnit = Long.parseLong(kv[1]); | |
break; | |
case "-milliPreUnit", "-u": | |
unitInNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(kv[1])); | |
break; | |
} | |
} | |
return new Argument(instances, capacity, tokensPreUnit, unitInNanos, random); | |
} | |
} | |
public static void main(String[] args) { | |
Argument argument = Argument.parse(args); | |
TestManager testManager = new TestManager(argument); | |
testManager.start(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment