|
|
|
|
@@ -21,8 +21,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
* Topic Structure (managed internally):
|
|
|
|
|
* - Server -> Client: /client/{clientId}/{messageType}
|
|
|
|
|
* - Client -> Server: /server/{clientId}/{messageType}
|
|
|
|
|
* - ACK Server -> Client: /ack/client/{clientId}/{messageId}
|
|
|
|
|
* - ACK Client -> Server: /ack/server/{messageId}
|
|
|
|
|
* - ACK Server -> Client: /client/{clientId}/{messageId}/ack
|
|
|
|
|
* - ACK Client -> Server: /server/{messageId}/ack
|
|
|
|
|
*/
|
|
|
|
|
@Slf4j
|
|
|
|
|
public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
@@ -33,12 +33,12 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
// Topic templates
|
|
|
|
|
private static final String TOPIC_TO_CLIENT = "/client/%s/%s"; // /client/{clientId}/{messageType}
|
|
|
|
|
private static final String TOPIC_FROM_CLIENT = "/server/%s/%s"; // /server/{clientId}/{messageType}
|
|
|
|
|
private static final String TOPIC_ACK_TO_CLIENT = "/ack/client/%s/%s"; // /ack/client/{clientId}/{messageId}
|
|
|
|
|
private static final String TOPIC_ACK_FROM_CLIENT = "/ack/server/%s"; // /ack/server/{messageId}
|
|
|
|
|
private static final String TOPIC_ACK_TO_CLIENT = "/client/%s/%s/ack"; // /client/{clientId}/{messageId}/ack
|
|
|
|
|
private static final String TOPIC_ACK_FROM_CLIENT = "/server/%s/ack"; // /server/{messageId}/ack
|
|
|
|
|
|
|
|
|
|
// Subscription patterns
|
|
|
|
|
private static final String PATTERN_FROM_CLIENT = "/server/+/%s"; // /server/+/{messageType}
|
|
|
|
|
private static final String PATTERN_ACK_FROM_CLIENT = "/ack/server/+"; // /ack/server/+
|
|
|
|
|
private static final String PATTERN_ACK_FROM_CLIENT = "/server/+/ack"; // /server/+/ack
|
|
|
|
|
|
|
|
|
|
private Mqtt5AsyncClient mqttClient;
|
|
|
|
|
private ConnectionStateListener connectionListener;
|
|
|
|
|
@@ -151,9 +151,21 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
try {
|
|
|
|
|
notifyConnectionState(ConnectionState.DISCONNECTING, null);
|
|
|
|
|
|
|
|
|
|
if (mqttClient != null && connected) {
|
|
|
|
|
mqttClient.disconnect().join();
|
|
|
|
|
log.info("[MqttPlugin] Disconnected successfully");
|
|
|
|
|
if (mqttClient != null) {
|
|
|
|
|
// Check actual client connection state, not just our flag
|
|
|
|
|
var clientState = mqttClient.getState();
|
|
|
|
|
if (clientState.isConnected()) {
|
|
|
|
|
try {
|
|
|
|
|
mqttClient.disconnect().join();
|
|
|
|
|
log.info("[MqttPlugin] Disconnected successfully");
|
|
|
|
|
} catch (Exception disconnectEx) {
|
|
|
|
|
// Log but don't throw - client may already be disconnected
|
|
|
|
|
log.warn("[MqttPlugin] Disconnect failed (client may already be disconnected): {}",
|
|
|
|
|
disconnectEx.getMessage());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.info("[MqttPlugin] Client already disconnected (state: {})", clientState);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
connected = false;
|
|
|
|
|
@@ -163,7 +175,10 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("[MqttPlugin] Shutdown failed: {}", e.getMessage(), e);
|
|
|
|
|
throw new PluginException("Failed to shutdown MQTT plugin", e);
|
|
|
|
|
// Don't throw on shutdown - just log the error
|
|
|
|
|
connected = false;
|
|
|
|
|
messageHandlers.clear();
|
|
|
|
|
ackHandler = null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -202,24 +217,43 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
throw new PluginException("MQTT client is not connected");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String topicPattern = String.format(PATTERN_FROM_CLIENT, messageType);
|
|
|
|
|
log.info("[MqttPlugin] Registering handler for message type '{}' with pattern: {}", messageType, topicPattern);
|
|
|
|
|
|
|
|
|
|
messageHandlers.put(messageType, handler);
|
|
|
|
|
|
|
|
|
|
// Subscribe to the topic pattern
|
|
|
|
|
mqttClient.subscribeWith()
|
|
|
|
|
.topicFilter(topicPattern)
|
|
|
|
|
.qos(MqttQos.EXACTLY_ONCE)
|
|
|
|
|
.send()
|
|
|
|
|
.whenComplete((subAck, throwable) -> {
|
|
|
|
|
if (throwable != null) {
|
|
|
|
|
log.error("[MqttPlugin] Subscription to {} failed: {}", topicPattern, throwable.getMessage());
|
|
|
|
|
messageHandlers.remove(messageType);
|
|
|
|
|
} else {
|
|
|
|
|
log.info("[MqttPlugin] Successfully subscribed to: {}", topicPattern);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
// Special case for login: subscribe to /server/login (without clientId)
|
|
|
|
|
if ("login".equals(messageType)) {
|
|
|
|
|
String loginTopic = "/server/login";
|
|
|
|
|
log.info("[MqttPlugin] Registering handler for message type '{}' with topic: {}", messageType, loginTopic);
|
|
|
|
|
|
|
|
|
|
mqttClient.subscribeWith()
|
|
|
|
|
.topicFilter(loginTopic)
|
|
|
|
|
.qos(MqttQos.EXACTLY_ONCE)
|
|
|
|
|
.send()
|
|
|
|
|
.whenComplete((subAck, throwable) -> {
|
|
|
|
|
if (throwable != null) {
|
|
|
|
|
log.error("[MqttPlugin] Subscription to {} failed: {}", loginTopic, throwable.getMessage());
|
|
|
|
|
messageHandlers.remove(messageType);
|
|
|
|
|
} else {
|
|
|
|
|
log.info("[MqttPlugin] Successfully subscribed to: {}", loginTopic);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
// Standard pattern: /server/+/{messageType}
|
|
|
|
|
String topicPattern = String.format(PATTERN_FROM_CLIENT, messageType);
|
|
|
|
|
log.info("[MqttPlugin] Registering handler for message type '{}' with pattern: {}", messageType, topicPattern);
|
|
|
|
|
|
|
|
|
|
mqttClient.subscribeWith()
|
|
|
|
|
.topicFilter(topicPattern)
|
|
|
|
|
.qos(MqttQos.EXACTLY_ONCE)
|
|
|
|
|
.send()
|
|
|
|
|
.whenComplete((subAck, throwable) -> {
|
|
|
|
|
if (throwable != null) {
|
|
|
|
|
log.error("[MqttPlugin] Subscription to {} failed: {}", topicPattern, throwable.getMessage());
|
|
|
|
|
messageHandlers.remove(messageType);
|
|
|
|
|
} else {
|
|
|
|
|
log.info("[MqttPlugin] Successfully subscribed to: {}", topicPattern);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
@@ -296,8 +330,8 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
log.debug("[MqttPlugin] Received message on topic: {}", topic);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// Check if it's an ACK message
|
|
|
|
|
if (topic.startsWith("/ack/server/")) {
|
|
|
|
|
// Check if it's an ACK message (topic ends with /ack)
|
|
|
|
|
if (topic.startsWith("/server/") && topic.endsWith("/ack")) {
|
|
|
|
|
handleAckMessage(topic, payload);
|
|
|
|
|
}
|
|
|
|
|
// Check if it's a client message
|
|
|
|
|
@@ -317,7 +351,7 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle ACK message from client.
|
|
|
|
|
* Topic format: /ack/server/{messageId}
|
|
|
|
|
* Topic format: /server/{messageId}/ack
|
|
|
|
|
*/
|
|
|
|
|
private void handleAckMessage(String topic, byte[] payload) {
|
|
|
|
|
if (ackHandler == null) {
|
|
|
|
|
@@ -325,10 +359,10 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Extract messageId from topic
|
|
|
|
|
// Extract messageId from topic: /server/{messageId}/ack
|
|
|
|
|
String[] parts = topic.split("/");
|
|
|
|
|
if (parts.length >= 4) {
|
|
|
|
|
String messageId = parts[3];
|
|
|
|
|
String messageId = parts[2]; // messageId is at index 2
|
|
|
|
|
log.debug("[MqttPlugin] Routing ACK for message: {}", messageId);
|
|
|
|
|
ackHandler.onAckReceived(messageId, payload);
|
|
|
|
|
} else {
|
|
|
|
|
@@ -338,11 +372,26 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle client message.
|
|
|
|
|
* Topic format: /server/{clientId}/{messageType}
|
|
|
|
|
* Topic format: /server/{clientId}/{messageType} or /server/{messageType} (for login)
|
|
|
|
|
*/
|
|
|
|
|
private void handleClientMessage(String topic, byte[] payload) {
|
|
|
|
|
// Extract clientId and messageType from topic
|
|
|
|
|
String[] parts = topic.split("/");
|
|
|
|
|
|
|
|
|
|
// Handle /server/login (without clientId)
|
|
|
|
|
if (parts.length == 3 && "login".equals(parts[2])) {
|
|
|
|
|
String messageType = parts[2];
|
|
|
|
|
ClientMessageHandler handler = messageHandlers.get(messageType);
|
|
|
|
|
if (handler != null) {
|
|
|
|
|
log.debug("[MqttPlugin] Routing login message (type: {})", messageType);
|
|
|
|
|
handler.onMessageReceived(null, payload);
|
|
|
|
|
} else {
|
|
|
|
|
log.warn("[MqttPlugin] No handler registered for message type: {}", messageType);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle /server/{clientId}/{messageType}
|
|
|
|
|
if (parts.length >= 4) {
|
|
|
|
|
String clientId = parts[2];
|
|
|
|
|
String messageType = parts[3];
|
|
|
|
|
|