import com.github.luben.zstd.Zstd;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.bukkit.Bukkit;
import org.strixmc.messenger.component.server.ServerStartPacket;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RedisAPI {

    private final ThreadPoolExecutor executor;
    private RedisClient redisClient;
    private StatefulRedisConnection<byte[],byte[]> sender;

    public RedisAPI(String host, int port, String password){
        executor = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory("RedisAPI"));
        redisClient = RedisClient.create(RedisURI.builder().withHost(host).withPort(port).withPassword(password).withDatabase(0).withTimeout(Duration.ofSeconds(30000)).build());
        sender = redisClient.connect(new ByteArrayCodec());
    }

    public static byte[] compress(byte[] text){
        return Zstd.compress(text);
    }

    public static byte[] decompress(byte[] bytes){
        return Zstd.decompress(bytes, (int)Zstd.decompressedSize(bytes));
    }

    public void publishData(byte[] channel,byte[] message){
        executor.execute(()-> sender.sync().publish(channel,compress(message)));
    }

    public void registerPubSub(RedisPubSubAdapter<byte[],byte[]> sub, byte[] ... names){
        StatefulRedisPubSubConnection<byte[],byte[]> connection = redisClient.connectPubSub(new ByteArrayCodec());
        connection.addListener(sub);
        RedisPubSubAsyncCommands<byte[],byte[]> pubSubCommands = connection.async();
        pubSubCommands.subscribe(names);
    }

    public void stop(){
        try {
            sender.close();
        } catch (Exception ignored){ }

        try {
            redisClient.shutdown();
        } catch (Exception ignored){ }
        executor.shutdown();
    }

    public ThreadPoolExecutor getExecutor(){
        return executor;
    }

}