Created
April 29, 2025 04:01
-
-
Save choutianxius/15b925dfb45234ed4ef18eda3d22cc56 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.stereotype.Service; | |
import jakarta.annotation.PostConstruct; | |
import jakarta.annotation.PreDestroy; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.InputStreamReader; | |
import java.io.FileInputStream; | |
import javax.net.ssl.KeyManagerFactory; | |
import javax.net.ssl.SSLContext; | |
import javax.net.ssl.SSLSocketFactory; | |
import javax.net.ssl.TrustManagerFactory; | |
import java.security.Security; | |
import java.security.KeyPair; | |
import java.security.KeyStore; | |
import java.security.cert.Certificate; | |
import java.security.cert.CertificateFactory; | |
import java.security.cert.X509Certificate; | |
import org.bouncycastle.jce.provider.BouncyCastleProvider; | |
import org.bouncycastle.openssl.PEMKeyPair; | |
import org.bouncycastle.openssl.PEMParser; | |
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; | |
import org.eclipse.paho.mqttv5.client.MqttClient; | |
import org.eclipse.paho.mqttv5.client.MqttCallback; | |
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; | |
import org.eclipse.paho.mqttv5.client.IMqttToken; | |
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; | |
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; | |
import org.eclipse.paho.mqttv5.common.MqttException; | |
import org.eclipse.paho.mqttv5.common.MqttMessage; | |
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; | |
/** | |
* MQTT subscriber in a Spring Framework context, using the Eclipse Paho libray and connect over TLS | |
*/ | |
@Service | |
public class SpringEclipsePahoMqttSubscriber { | |
private final String serverUri; | |
private final String clientId; | |
private final String topic; | |
private final String caCertPath; | |
private final String certPath; | |
private final String privateKeyPath; | |
private MqttClient client; | |
private static final Logger logger = LoggerFactory.getLogger(SpringEclipsePahoMqttSubscriber.class); | |
@Autowired | |
public SpringEclipsePahoMqttSubscriber(@Value("${app.mqtt.server-url}") String serverUri, | |
@Value("${app.mqtt.client-id}") String clientId, | |
@Value("${app.mqtt.topic}") String topic, | |
@Value("${app.mqtt.tls.ca-cert-path}") String caCertPath, | |
@Value("${app.mqtt.tls.cert-path}") String certPath, | |
@Value("${app.mqtt.tls.private-key-path}") String privateKeyPath) { | |
this.serverUri = serverUri; | |
this.clientId = clientId; | |
this.topic = topic; | |
this.caCertPath = caCertPath; | |
this.certPath = certPath; | |
this.privateKeyPath = privateKeyPath; | |
} | |
@PostConstruct | |
public void run() { | |
try { | |
client = new MqttClient(serverUri, clientId, new MemoryPersistence()); | |
// on message | |
client.setCallback(new MqttCallback() { | |
@Override | |
public void disconnected(MqttDisconnectResponse response) { | |
logger.info("Disconnected"); | |
} | |
@Override | |
public void mqttErrorOccurred(MqttException exception) { | |
logger.error("Error occurred", e); | |
} | |
@Override | |
public void messageArrived(String topic, MqttMessage message) { | |
logger.info("Message received from topic " + topic + ": " + message.toString()); | |
// Do something with the message | |
} | |
@Override | |
public void deliveryComplete(IMqttToken token) {} | |
@Override | |
public void connectComplete(boolean reconnect, String serverURI) { | |
logger.info("Connection completed. Reconnect: " + (reconnect ? "yes" : "no")); | |
} | |
@Override | |
public void authPacketArrived(int reasonCode, MqttProperties properties) {} | |
}); | |
logger.info("Connecting to MQTT server..."); | |
MqttConnectionOptions connectionOptions = new MqttConnectionOptions(); | |
connectionOptions.setSocketFactory(getSocketFactory()); | |
client.connect(connectionOptions); | |
client.subscribe(topic, 1); | |
logger.info("Listening to topic: " + topic); | |
} catch (Exception e) { | |
logger.error("Unexpected error", e); | |
} | |
} | |
@PreDestroy | |
public void cleanUp() { | |
try { | |
logger.info("Closing..."); | |
client.disconnect(); | |
client.close(true); | |
logger.info("Closed"); | |
} catch (Exception e) { | |
logger.error("Unexpected exception occurred when closing", e); | |
} | |
} | |
private SSLSocketFactory getSocketFactory() { | |
try { | |
Security.addProvider(new BouncyCastleProvider()); | |
CertificateFactory certFactory = CertificateFactory.getInstance("X.509"); | |
// CA cert | |
X509Certificate caCert = (X509Certificate) certFactory.generateCertificate( | |
new FileInputStream(caCertPath)); | |
// cert | |
X509Certificate cert = (X509Certificate) certFactory.generateCertificate( | |
new FileInputStream(certPath)); | |
// private key | |
PEMParser privateKeyParser = new PEMParser(new InputStreamReader( | |
new FileInputStream(privateKeyPath))); | |
Object privateKeyObject = privateKeyParser.readObject(); | |
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); | |
KeyPair keyPair = converter.getKeyPair((PEMKeyPair) privateKeyObject); | |
privateKeyParser.close(); | |
// authenticate server | |
KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType()); | |
caKeyStore.load(null, null); | |
caKeyStore.setCertificateEntry("ca-certificate", caCert); | |
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X.509"); | |
trustManagerFactory.init(caKeyStore); | |
// authenticate us (the client) | |
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); | |
keyStore.load(null, null); | |
keyStore.setCertificateEntry("certificate", cert); | |
keyStore.setKeyEntry("private-key", keyPair.getPrivate(), null, | |
new Certificate[] {cert}); | |
KeyManagerFactory keyManagerFactory = | |
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); | |
keyManagerFactory.init(keyStore, null); | |
// create the SSL socket factory | |
SSLContext context = SSLContext.getInstance("TLSv1.3"); | |
context.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), | |
null); | |
return context.getSocketFactory(); | |
} catch (Exception e) { | |
throw new RuntimeException("Failed to create SSL factory", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment