Skip to content

Instantly share code, notes, and snippets.

@ShawnSWu
Last active May 28, 2019 12:10
Show Gist options
  • Save ShawnSWu/34857522fe943e9a2ed5a2a2040bd5e9 to your computer and use it in GitHub Desktop.
Save ShawnSWu/34857522fe943e9a2ed5a2a2040bd5e9 to your computer and use it in GitHub Desktop.
package com.nutn.mcnmqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Configuration
@IntegrationComponentScan
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfiguration {
private String url;
private String username;
private String password;
private String clientId;
private String topics;
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
String [] s = new String[1];
s[0] = url;
options.setServerURIs(s);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
return options;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
"MCN",
mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(topics);
return messageHandler;
}
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
String [] s = new String[1];
s[0] = url;
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId, mqttClientFactory(), s);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("Message: "+ message.getPayload());
}
};
}
public void setUrl(String url) {
this.url = url;
}
public void setUsername(String username) {
this.username = username;
}
public void setPassword(String password) {
this.password = password;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public void setTopics(String topics) {
this.topics = topics;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment