Skip to content

Instantly share code, notes, and snippets.

@wcp1231
Last active March 17, 2024 15:53
Show Gist options
  • Save wcp1231/2464917504f862823dc71c2f0e5af534 to your computer and use it in GitHub Desktop.
Save wcp1231/2464917504f862823dc71c2f0e5af534 to your computer and use it in GitHub Desktop.
A simple snowflake conflict test

A simple snowflake conflict test

# defult: -i=2 -t=20 -u=100
> java SimpleSnowflakeTest.java
Generators QPS: [ 187 181]      Total IDs: 4176   (368 /s)      Conflicts: 1139  (104/s)        27.275%

# -t=20 -u=200 => 20 tokens pre 200 ms (100/s)
> java SimpleSnowflakeTest.java -t=20 -u=200
Generators QPS: [ 100 99]]      Total IDs: 3404   (199 /s)      Conflicts: 1062  (52 /s)        31.199%

# -t=20 -u=20 => 20 tokens pre 20 ms (1000/s)
> java SimpleSnowflakeTest.java  -u=20
Generators QPS: [ 740 744]      Total IDs: 18959  (1484/s)      Conflicts: 4709  (388/s)        24.838%

# -r  =>  starts with a random value
> java SimpleSnowflakeTest.java -r
Generators QPS: [ 191 192]      Total IDs: 2702   (383 /s)      Conflicts: 10    (2  /s)        0.370%

# -t=20 -u=20 => 20 tokens pre 20 ms (1000/s)
> java SimpleSnowflakeTest.java -t=20 -u=20 -r
Generators QPS: [ 723 731]      Total IDs: 18350  (1454/s)      Conflicts: 52    (6  /s)        0.283%

# -i=3        => 3 instances
> java SimpleSnowflakeTest.java -i=3 -t=20 -u=20 -r
Generators QPS: [ 487 481 537]  Total IDs: 20747  (1505/s)      Conflicts: 77    (4  /s)        0.371%
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class SimpleSnowflakeTest {
/**
* A simple snowflakes implementation.
*/
static class SimpleSnowflakes {
private final static long START_TIMESTAMP = 1690819200000L;
private final static int MAX_MACHINE_ID = 100;
private final static int MAX_SEQUENCE = 100;
private final static int FAKE_MACHINE_ID = 1;
private final boolean randomStart;
private final SecureRandom random = new SecureRandom();
private long lastTimeStamp = -1L;
private long sequence = 0L;
public SimpleSnowflakes(boolean randomStart) {
this.randomStart = randomStart;
}
public synchronized long nextId() {
long currTimeStamp = getNewTimeStamp();
if (currTimeStamp == lastTimeStamp) {
sequence = (sequence + 1) % MAX_SEQUENCE;
if (sequence == 0L) {
currTimeStamp = getNextMill(lastTimeStamp);
}
} else {
// Whether to start with a random value
sequence = randomStart ? random.nextInt(MAX_SEQUENCE) : 0;
}
lastTimeStamp = currTimeStamp;
return (currTimeStamp - START_TIMESTAMP) * MAX_MACHINE_ID * MAX_SEQUENCE
+ FAKE_MACHINE_ID * MAX_SEQUENCE
+ sequence;
}
private static long getNextMill(long lastTimeStamp) {
long mill = getNewTimeStamp();
while (mill <= lastTimeStamp) {
mill = getNewTimeStamp();
}
return mill;
}
private static long getNewTimeStamp() {
return System.currentTimeMillis();
}
}
static class TestManager {
private final ExecutorService workerPool;
private final ConflictTracker conflictTracker;
private final ArrayList<IdGeneratingWorker> idGeneratingWorkers;
private Future<?> reportFuture;
public TestManager(Argument argument) {
this.workerPool = Executors.newFixedThreadPool(argument.instances);
this.idGeneratingWorkers = new ArrayList<>(argument.instances);
QpsTracker qpsTracker = new QpsTracker();
TokenBucket tokenBucket = new TokenBucket(argument.capacity, argument.tokensPreUnit, argument.unitInNanos);
this.conflictTracker = new ConflictTracker(qpsTracker);
for (int i = 0; i < argument.instances; i++) {
SimpleSnowflakes ssf = new SimpleSnowflakes(argument.random);
IdGeneratingWorker idGeneratingWorker = new IdGeneratingWorker(ssf, tokenBucket, qpsTracker, conflictTracker);
idGeneratingWorkers.add(idGeneratingWorker);
}
}
public void start() {
conflictTracker.start();
for (IdGeneratingWorker worker : idGeneratingWorkers) {
workerPool.submit(worker);
}
reportFuture = Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(this::report, 1, 1, TimeUnit.SECONDS);
}
public void stop() {
for (IdGeneratingWorker worker : idGeneratingWorkers) {
worker.stop();
}
workerPool.shutdown();
conflictTracker.stop();
reportFuture.cancel(true);
}
private void report() {
StringBuilder sb = new StringBuilder();
for (IdGeneratingWorker worker : idGeneratingWorkers) {
sb.append(" ").append(worker.getQps());
}
System.out.printf("\rGenerators QPS: [%s]\t%s", sb, conflictTracker.getStatistics());
}
}
/**
* A worker that generates IDs.
*/
static class IdGeneratingWorker implements Runnable {
private final SimpleSnowflakes ssf;
private final TokenBucket tokenBucket;
private final QpsTracker qpsTracker;
private final ConflictTracker conflictTracker;
private volatile boolean running = true;
public IdGeneratingWorker(SimpleSnowflakes ssf, TokenBucket tokenBucket,
QpsTracker qpsTracker, ConflictTracker conflictTracker) {
this.ssf = ssf;
this.tokenBucket = tokenBucket;
this.qpsTracker = qpsTracker;
this.conflictTracker = conflictTracker;
}
@Override
public void run() {
while (running) {
if (tokenBucket.tryConsume()) {
qpsTracker.track(this);
conflictTracker.check(ssf.nextId());
}
}
}
public void stop() {
running = false;
}
public Long getQps() {
return qpsTracker.getQps(this);
}
}
/**
* A tracker that tracks the conflict rate.
*/
static class ConflictTracker {
private final AtomicLong total = new AtomicLong(0);
private final AtomicLong conflicts = new AtomicLong(0);
private final ConcurrentHashMap<Long, ConcurrentHashMap<Long, Boolean>> timeBuckets;
private final QpsTracker qpsTracker;
private final ScheduledExecutorService cleanPool;
public ConflictTracker(QpsTracker qpsTracker) {
this.qpsTracker = qpsTracker;
this.timeBuckets = new ConcurrentHashMap<>();
this.cleanPool = Executors.newSingleThreadScheduledExecutor();
}
public void start() {
cleanPool.scheduleWithFixedDelay(this::clean, 0, 3, TimeUnit.SECONDS);
}
public void check(long id) {
total.incrementAndGet();
qpsTracker.track(total);
long sec = id / (SimpleSnowflakes.MAX_MACHINE_ID * SimpleSnowflakes.MAX_SEQUENCE);
ConcurrentHashMap<Long, Boolean> bucket = timeBuckets
.computeIfAbsent(sec, (k) -> new ConcurrentHashMap<>());
if (bucket.containsKey(id)) {
conflicts.incrementAndGet();
qpsTracker.track(conflicts);
}
bucket.put(id, true);
}
public void stop() {
cleanPool.shutdown();
}
private void clean() {
Enumeration<Long> keys = timeBuckets.keys();
while(keys.hasMoreElements()) {
long sec = keys.nextElement();
if (System.currentTimeMillis() / 1000 > sec) {
timeBuckets.remove(sec);
}
}
}
public String getStatistics() {
long total = this.total.get();
long conflicts = this.conflicts.get();
return "Total IDs: %-6d (%-4d/s)\tConflicts: %-5d (%-3d/s)\t%.3f%%"
.formatted(total, qpsTracker.getQps(this.total),
conflicts, qpsTracker.getQps(this.conflicts),
conflicts * 100.0 / total);
}
}
/**
* A tracker that tracks the QPS.
*/
static class QpsTracker {
private final ConcurrentHashMap<Object, AtomicLong> qpsCounters = new ConcurrentHashMap<>();
private final HashMap<Object, Long> qpsResult = new HashMap<>();
public QpsTracker() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
for (Object key : qpsCounters.keySet()) {
long qps = qpsCounters.get(key).getAndSet(0);
qpsResult.put(key, qps);
}
}, 1, 1, TimeUnit.SECONDS);
}
public void track(Object key) {
qpsCounters.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
}
public Long getQps(Object key) {
return qpsResult.getOrDefault(key, 0L);
}
}
/**
* A simple token bucket implementation.
*/
static class TokenBucket {
private final long capacity;
private final long tokensPerUnit;
private final long unitInNanos;
private final AtomicLong tokens;
private final AtomicLong lastFillTime;
public TokenBucket(long capacity, long tokensPerUnit, long unitInNanos) {
this.capacity = capacity;
this.tokensPerUnit = tokensPerUnit;
this.unitInNanos = unitInNanos;
this.tokens = new AtomicLong(0);
this.lastFillTime = new AtomicLong(System.nanoTime());
}
public boolean tryConsume() {
fill();
while (true) {
long currentTokens = tokens.get();
if (currentTokens < 1) {
return false;
}
if (tokens.compareAndSet(currentTokens, currentTokens - 1)) {
return true;
}
}
}
private void fill() {
long now = System.nanoTime();
long timeSinceLastFill = now - lastFillTime.get();
long tokensToAdd = (tokensPerUnit * timeSinceLastFill) / unitInNanos;
if (tokensToAdd < 1) {
return;
}
while (true) {
long currentTokens = tokens.get();
long newTokens = Math.min(capacity, currentTokens + tokensToAdd);
if (tokens.compareAndSet(currentTokens, newTokens)) {
lastFillTime.set(now);
return;
}
}
}
}
static class Argument {
public final int instances;
public final long capacity;
public final long tokensPreUnit;
public final long unitInNanos;
public final boolean random;
Argument(int instances, long capacity, long tokensPreUnit, long unitInNanos, boolean random) {
this.instances = instances;
this.capacity = capacity;
this.tokensPreUnit = tokensPreUnit;
this.unitInNanos = unitInNanos;
this.random = random;
}
public static Argument parse(String[] args) {
int instances = 2;
long capacity = 1000;
long tokensPreUnit = 20;
long unitInNanos = TimeUnit.MILLISECONDS.toNanos(100);
boolean random = false;
for (String arg : args) {
if (arg.equals("-random") || arg.equals("-r")) {
random = true;
continue;
}
String[] kv = arg.split("=");
if (kv.length != 2) {
continue;
}
switch (kv[0]) {
case "-instances", "-i":
instances = Integer.parseInt(kv[1]);
break;
case "-capacity", "-c":
capacity = Long.parseLong(kv[1]);
break;
case "-tokensPreUnit", "-t":
tokensPreUnit = Long.parseLong(kv[1]);
break;
case "-milliPreUnit", "-u":
unitInNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(kv[1]));
break;
}
}
return new Argument(instances, capacity, tokensPreUnit, unitInNanos, random);
}
}
public static void main(String[] args) {
Argument argument = Argument.parse(args);
TestManager testManager = new TestManager(argument);
testManager.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment