package cn.wangbowen.mqtt;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.HashMap; import java.util.Map;
public abstract class AbstractMqttClient {
private String serviceUri;
private String clientId;
private String userName;
private String password;
private boolean isCleanSession;
private int timeout;
private int aliveInterval;
private MqttClient mqttClient;
private MqttConnectOptions mqttConnectOptions;
private Map<String, Integer> topicMap;
public AbstractMqttClient(String serviceUri, String clientId, String userName, String password) { this(serviceUri, clientId, userName, password, true); }
public AbstractMqttClient(String serviceUri, String clientId, String userName, String password, boolean isCleanSession) { this(serviceUri, clientId, userName, password, isCleanSession, 10, 20); }
public AbstractMqttClient(String serviceUri, String clientId, String userName, String password, boolean isCleanSession, int timeout, int aliveInterval) { this.serviceUri = serviceUri; this.clientId = clientId; this.userName = userName; this.password = password; this.isCleanSession = isCleanSession; this.timeout = timeout; this.aliveInterval = aliveInterval; topicMap = new HashMap<String, Integer>(16); init(); }
public void init() { try { mqttConnectOptions = new MqttConnectOptions(); MqttClientPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(serviceUri, clientId, persistence); mqttConnectOptions.setCleanSession(isCleanSession); mqttConnectOptions.setUserName(userName); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setConnectionTimeout(timeout); mqttConnectOptions.setKeepAliveInterval(aliveInterval); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("MQTT客户端(" + clientId + ")连接丢失!"); reConnect(5); }
@Override public void messageArrived(String topic, MqttMessage message) { try { processMessage(topic, message); } catch (Exception e) { System.out.println("处理主题\"" + topic + "\"的消息\"" + message + "\"失败"); } }
@Override public void deliveryComplete(IMqttDeliveryToken token) { } }); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { mqttClient = null; } if (null != mqttClient && mqttClient.isConnected()) { System.out.println("MQTT客户端(" + clientId + ")初始化成功!"); } else { System.out.println("MQTT客户端(" + clientId + ")初始化失败!"); } }
private void reConnect(int time) { if (time == 0) { System.out.println("MQTT客户端(" + clientId + ")重连失败!"); mqttClient = null; return; } System.out.println("MQTT客户端(" + clientId + ")正在尝试重连...(剩余" + time + "次)"); try { if(null != mqttClient) { if(!mqttClient.isConnected()) { if(null != mqttConnectOptions) { mqttClient.connect(mqttConnectOptions); if (mqttClient.isConnected()) { System.out.println("MQTT客户端(" + clientId + ")重连成功!"); for (Map.Entry<String, Integer> entry : topicMap.entrySet()) { subscribe(entry.getKey(), entry.getValue()); } return; } }else { System.out.println("mqttConnectOptions is null"); } }else { System.out.println("mqttClient is null or connect"); } }else { init(); } } catch (MqttException e) { e.printStackTrace(); } reConnect(time - 1); }
abstract void processMessage(String topic, MqttMessage message);
public void publish(String topic, String content, int qos) { if (null != mqttClient && mqttClient.isConnected()) { try { MqttMessage mqttMessage = new MqttMessage(content.getBytes()); mqttMessage.setQos(qos); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken deliveryToken = mqttTopic.publish(mqttMessage); if (!deliveryToken.isComplete()) { System.out.println("【成功】MQTT客户端(" + clientId + ")向\"" + topic + "\"主题发送:" + content); deliveryToken.waitForCompletion(); } else { System.out.println("【失败】MQTT客户端(" + clientId + ")向\"" + topic + "\"主题发送:" + content);
} } catch (MqttException me) { System.out.println("reason:" + me.getReasonCode()); System.out.println("msg:" + me.getMessage()); System.out.println("loc:" + me.getLocalizedMessage()); System.out.println("cause:" + me.getCause()); System.out.println("exception:" + me); me.printStackTrace(); } } else { init(); } }
public void publish(String topic, String content) { publish(topic, content, 1); }
public void subscribe(String topic, int qos) { if (null != mqttClient && mqttClient.isConnected()) { try { mqttClient.subscribe(topic, qos); topicMap.put(topic, qos); System.out.println("MQTT客户端(" + clientId + ")订阅\"" + topic + "\"主题成功"); } catch (MqttException e) { System.out.println("MQTT客户端(" + clientId + ")订阅\"" + topic + "\"主题失败"); } } else { init(); } }
public void subscribe(String topic) { subscribe(topic, 1); }
public void unsubscribe(String topic) { if (null != mqttClient && mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); System.out.println("MQTT客户端(" + clientId + ")取消订阅\"" + topic + "\"主题成功"); } catch (MqttException e) { System.out.println("MQTT客户端(" + clientId + ")取消订阅\"" + topic + "\"主题失败"); } } else { init(); } }
public void unsubscribeAll() { for (String topic : topicMap.keySet()) { unsubscribe(topic); } }
public void disconnect() { try { if (mqttClient != null) { mqttClient.disconnect(); } } catch (MqttException e) { System.out.println("MQTT客户端(" + clientId + ")断开连接失败!"); } }
public void close() { try { if (mqttClient != null) { mqttClient.close(); } } catch (MqttException e) { System.out.println("MQTT客户端(" + clientId + ")关闭失败!"); } }
public String getServiceUri() { return serviceUri; }
public void setServiceUri(String serviceUri) { this.serviceUri = serviceUri; }
public String getClientId() { return clientId; }
public void setClientId(String clientId) { this.clientId = clientId; }
public String getUserName() { return userName; }
public void setUserName(String userName) { this.userName = userName; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
public boolean isCleanSession() { return isCleanSession; }
public void setCleanSession(boolean cleanSession) { isCleanSession = cleanSession; }
public MqttClient getMqttClient() { return mqttClient; }
public int getTimeout() { return timeout; }
public void setTimeout(int timeout) { this.timeout = timeout; }
public int getAliveInterval() { return aliveInterval; }
public void setAliveInterval(int aliveInterval) { this.aliveInterval = aliveInterval; }
public MqttConnectOptions getMqttConnectOptions() { return mqttConnectOptions; }
public Map<String, Integer> getTopicMap() { return topicMap; }
public void setTopicMap(Map<String, Integer> topicMap) { this.topicMap = topicMap; } }
|