Skip to content

Instantly share code, notes, and snippets.

@choutianxius
Created April 29, 2025 04:01
Show Gist options
  • Save choutianxius/15b925dfb45234ed4ef18eda3d22cc56 to your computer and use it in GitHub Desktop.
Save choutianxius/15b925dfb45234ed4ef18eda3d22cc56 to your computer and use it in GitHub Desktop.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStreamReader;
import java.io.FileInputStream;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.security.Security;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
/**
* MQTT subscriber in a Spring Framework context, using the Eclipse Paho libray and connect over TLS
*/
@Service
public class SpringEclipsePahoMqttSubscriber {
private final String serverUri;
private final String clientId;
private final String topic;
private final String caCertPath;
private final String certPath;
private final String privateKeyPath;
private MqttClient client;
private static final Logger logger = LoggerFactory.getLogger(SpringEclipsePahoMqttSubscriber.class);
@Autowired
public SpringEclipsePahoMqttSubscriber(@Value("${app.mqtt.server-url}") String serverUri,
@Value("${app.mqtt.client-id}") String clientId,
@Value("${app.mqtt.topic}") String topic,
@Value("${app.mqtt.tls.ca-cert-path}") String caCertPath,
@Value("${app.mqtt.tls.cert-path}") String certPath,
@Value("${app.mqtt.tls.private-key-path}") String privateKeyPath) {
this.serverUri = serverUri;
this.clientId = clientId;
this.topic = topic;
this.caCertPath = caCertPath;
this.certPath = certPath;
this.privateKeyPath = privateKeyPath;
}
@PostConstruct
public void run() {
try {
client = new MqttClient(serverUri, clientId, new MemoryPersistence());
// on message
client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
logger.info("Disconnected");
}
@Override
public void mqttErrorOccurred(MqttException exception) {
logger.error("Error occurred", e);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
logger.info("Message received from topic " + topic + ": " + message.toString());
// Do something with the message
}
@Override
public void deliveryComplete(IMqttToken token) {}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
logger.info("Connection completed. Reconnect: " + (reconnect ? "yes" : "no"));
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {}
});
logger.info("Connecting to MQTT server...");
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setSocketFactory(getSocketFactory());
client.connect(connectionOptions);
client.subscribe(topic, 1);
logger.info("Listening to topic: " + topic);
} catch (Exception e) {
logger.error("Unexpected error", e);
}
}
@PreDestroy
public void cleanUp() {
try {
logger.info("Closing...");
client.disconnect();
client.close(true);
logger.info("Closed");
} catch (Exception e) {
logger.error("Unexpected exception occurred when closing", e);
}
}
private SSLSocketFactory getSocketFactory() {
try {
Security.addProvider(new BouncyCastleProvider());
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
// CA cert
X509Certificate caCert = (X509Certificate) certFactory.generateCertificate(
new FileInputStream(caCertPath));
// cert
X509Certificate cert = (X509Certificate) certFactory.generateCertificate(
new FileInputStream(certPath));
// private key
PEMParser privateKeyParser = new PEMParser(new InputStreamReader(
new FileInputStream(privateKeyPath)));
Object privateKeyObject = privateKeyParser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
KeyPair keyPair = converter.getKeyPair((PEMKeyPair) privateKeyObject);
privateKeyParser.close();
// authenticate server
KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
caKeyStore.load(null, null);
caKeyStore.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X.509");
trustManagerFactory.init(caKeyStore);
// authenticate us (the client)
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("certificate", cert);
keyStore.setKeyEntry("private-key", keyPair.getPrivate(), null,
new Certificate[] {cert});
KeyManagerFactory keyManagerFactory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, null);
// create the SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1.3");
context.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(),
null);
return context.getSocketFactory();
} catch (Exception e) {
throw new RuntimeException("Failed to create SSL factory", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment