Erweiterungen
This commit is contained in:
@@ -1,15 +0,0 @@
|
||||
package de.assecutor.votianlt.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* MQTT configuration placeholder.
|
||||
*
|
||||
* In environments where Spring Integration MQTT dependencies are not available,
|
||||
* this class remains empty to allow the application to compile and run without
|
||||
* MQTT wiring. The business code uses a no-op MqttPublisher that logs messages.
|
||||
*/
|
||||
@Configuration
|
||||
public class MqttConfig {
|
||||
public static final String MQTT_BROKER_URI = "tcp://192.168.180.26:1883";
|
||||
}
|
||||
@@ -1,129 +0,0 @@
|
||||
package de.assecutor.votianlt.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "app.mqtt")
|
||||
public class MqttProperties {
|
||||
/** Enable/disable MQTT subsystem */
|
||||
private boolean enabled = true;
|
||||
/** Broker URI, e.g. tcp://192.168.180.26:1883 */
|
||||
private String brokerUri = "tcp://192.168.180.26:1883";
|
||||
/** ClientId for the server */
|
||||
private String clientId = "server";
|
||||
/** Optional username */
|
||||
private String username;
|
||||
/** Optional password */
|
||||
private String password;
|
||||
/** MQTT v5 clean start flag */
|
||||
private boolean cleanStart = false;
|
||||
/** Session expiry interval in seconds (0 = expire immediately) */
|
||||
private long sessionExpiryInterval = 24 * 60 * 60; // 1 day
|
||||
/** Keep alive in seconds */
|
||||
private int keepAlive = 30;
|
||||
/** Max inflight messages */
|
||||
private int maxInflight = 50;
|
||||
/** Automatic reconnect */
|
||||
private boolean automaticReconnect = true;
|
||||
/** Default QoS to use for publishing */
|
||||
private int defaultQos = 2;
|
||||
/** Default retained flag for publishing */
|
||||
private boolean defaultRetained = false;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public String getBrokerUri() {
|
||||
return brokerUri;
|
||||
}
|
||||
|
||||
public void setBrokerUri(String brokerUri) {
|
||||
this.brokerUri = brokerUri;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
public void setClientId(String clientId) {
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
||||
public void setUsername(String username) {
|
||||
this.username = username;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public boolean isCleanStart() {
|
||||
return cleanStart;
|
||||
}
|
||||
|
||||
public void setCleanStart(boolean cleanStart) {
|
||||
this.cleanStart = cleanStart;
|
||||
}
|
||||
|
||||
public long getSessionExpiryInterval() {
|
||||
return sessionExpiryInterval;
|
||||
}
|
||||
|
||||
public void setSessionExpiryInterval(long sessionExpiryInterval) {
|
||||
this.sessionExpiryInterval = sessionExpiryInterval;
|
||||
}
|
||||
|
||||
public int getKeepAlive() {
|
||||
return keepAlive;
|
||||
}
|
||||
|
||||
public void setKeepAlive(int keepAlive) {
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
public int getMaxInflight() {
|
||||
return maxInflight;
|
||||
}
|
||||
|
||||
public void setMaxInflight(int maxInflight) {
|
||||
this.maxInflight = maxInflight;
|
||||
}
|
||||
|
||||
public boolean isAutomaticReconnect() {
|
||||
return automaticReconnect;
|
||||
}
|
||||
|
||||
public void setAutomaticReconnect(boolean automaticReconnect) {
|
||||
this.automaticReconnect = automaticReconnect;
|
||||
}
|
||||
|
||||
public int getDefaultQos() {
|
||||
return defaultQos;
|
||||
}
|
||||
|
||||
public void setDefaultQos(int defaultQos) {
|
||||
this.defaultQos = defaultQos;
|
||||
}
|
||||
|
||||
public boolean isDefaultRetained() {
|
||||
return defaultRetained;
|
||||
}
|
||||
|
||||
public void setDefaultRetained(boolean defaultRetained) {
|
||||
this.defaultRetained = defaultRetained;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,194 @@
|
||||
package de.assecutor.votianlt.messaging.config;
|
||||
|
||||
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
|
||||
import de.assecutor.votianlt.messaging.model.AcknowledgmentMessage;
|
||||
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
|
||||
import de.assecutor.votianlt.messaging.plugin.*;
|
||||
import de.assecutor.votianlt.messaging.plugin.mqtt.MqttMessagingPlugin;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.event.EventListener;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* Configuration for the plugin-based messaging system.
|
||||
* Initializes the selected plugin and sets up message routing.
|
||||
*/
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class PluginMessagingConfig {
|
||||
|
||||
@Value("${app.messaging.plugin.type:mqtt}")
|
||||
private String pluginType;
|
||||
|
||||
@Value("${app.messaging.plugin.mqtt.broker.host:mqtt-2.assecutor.de}")
|
||||
private String mqttBrokerHost;
|
||||
|
||||
@Value("${app.messaging.plugin.mqtt.broker.port:1883}")
|
||||
private int mqttBrokerPort;
|
||||
|
||||
@Value("${app.messaging.plugin.mqtt.username:app}")
|
||||
private String mqttUsername;
|
||||
|
||||
@Value("${app.messaging.plugin.mqtt.password:apppwd}")
|
||||
private String mqttPassword;
|
||||
|
||||
@Value("${app.messaging.plugin.mqtt.client.id:votianlt-server}")
|
||||
private String mqttClientId;
|
||||
|
||||
private final PluginManager pluginManager;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public PluginMessagingConfig(PluginManager pluginManager) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the messaging plugin after application startup.
|
||||
* This method is called after all beans are created, so we can safely access MessageDeliveryService.
|
||||
*/
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void initializePlugin(ApplicationReadyEvent event) {
|
||||
log.info("[PluginMessagingConfig] Initializing messaging plugin: {}", pluginType);
|
||||
|
||||
try {
|
||||
MessagingPlugin plugin = createPlugin(pluginType);
|
||||
PluginConfig config = createPluginConfig(pluginType);
|
||||
|
||||
// Get MessageDeliveryService from context (after all beans are created)
|
||||
MessageDeliveryService deliveryService = event.getApplicationContext().getBean(MessageDeliveryService.class);
|
||||
|
||||
// Set up a listener to subscribe when connected
|
||||
log.info("[PluginMessagingConfig] Adding state listener");
|
||||
pluginManager.addStateListener(stateEvent -> {
|
||||
log.info("[PluginMessagingConfig] State event received: state={}, isConnected={}",
|
||||
stateEvent.getState(), stateEvent.isConnected());
|
||||
if (stateEvent.isConnected()) {
|
||||
log.info("[PluginMessagingConfig] Plugin connected, setting up subscriptions");
|
||||
try {
|
||||
setupSubscriptions(deliveryService);
|
||||
log.info("[PluginMessagingConfig] Subscriptions setup completed");
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error setting up subscriptions: {}", e.getMessage(), e);
|
||||
}
|
||||
} else {
|
||||
log.debug("[PluginMessagingConfig] Plugin not yet connected, waiting...");
|
||||
}
|
||||
});
|
||||
log.info("[PluginMessagingConfig] State listener added");
|
||||
|
||||
// Activate plugin (this will trigger connection and eventually the listener above)
|
||||
pluginManager.activatePlugin(plugin, config);
|
||||
|
||||
log.info("[PluginMessagingConfig] Plugin activation initiated, subscriptions will be set up when connected");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Failed to initialize plugin: {}", e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to initialize messaging plugin", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a plugin instance based on the plugin type.
|
||||
*/
|
||||
private MessagingPlugin createPlugin(String type) {
|
||||
return switch (type.toLowerCase()) {
|
||||
case "mqtt" -> new MqttMessagingPlugin();
|
||||
// Add more plugin types here in the future
|
||||
// case "websocket" -> new WebSocketMessagingPlugin();
|
||||
// case "grpc" -> new GrpcMessagingPlugin();
|
||||
default -> throw new IllegalArgumentException("Unknown plugin type: " + type);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create plugin configuration based on the plugin type.
|
||||
*/
|
||||
private PluginConfig createPluginConfig(String type) {
|
||||
PluginConfig config = new PluginConfig();
|
||||
|
||||
switch (type.toLowerCase()) {
|
||||
case "mqtt" -> {
|
||||
config.setProperty("broker.host", mqttBrokerHost);
|
||||
config.setProperty("broker.port", mqttBrokerPort);
|
||||
config.setProperty("username", mqttUsername);
|
||||
config.setProperty("password", mqttPassword);
|
||||
config.setProperty("client.id", mqttClientId);
|
||||
config.setProperty("auto.reconnect", true);
|
||||
config.setProperty("clean.start", true);
|
||||
}
|
||||
// Add more plugin configurations here
|
||||
default -> throw new IllegalArgumentException("Unknown plugin type: " + type);
|
||||
}
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup message subscriptions using the new plugin API.
|
||||
*/
|
||||
private void setupSubscriptions(MessageDeliveryService deliveryService) {
|
||||
log.info("[PluginMessagingConfig] Setting up message subscriptions");
|
||||
|
||||
try {
|
||||
// Register ACK handler
|
||||
pluginManager.registerAckHandler((messageId, payload) -> {
|
||||
try {
|
||||
String json = new String(payload, StandardCharsets.UTF_8);
|
||||
AcknowledgmentMessage ack = objectMapper.readValue(json, AcknowledgmentMessage.class);
|
||||
deliveryService.handleAcknowledgment(ack);
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error handling ACK message: {}", e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
|
||||
// Register message handlers for different message types
|
||||
String[] messageTypes = {
|
||||
"task_completed",
|
||||
"jobs/assigned",
|
||||
"message",
|
||||
"login"
|
||||
};
|
||||
|
||||
for (String messageType : messageTypes) {
|
||||
pluginManager.registerMessageHandler(messageType, (clientId, payload) ->
|
||||
handleEnvelopedMessage(clientId, payload, deliveryService));
|
||||
}
|
||||
|
||||
log.info("[PluginMessagingConfig] Message subscriptions initialized");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error setting up subscriptions: {}", e.getMessage(), e);
|
||||
throw new RuntimeException("Failed to setup subscriptions", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming enveloped message.
|
||||
*/
|
||||
private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService) {
|
||||
try {
|
||||
String json = new String(payload, StandardCharsets.UTF_8);
|
||||
|
||||
// Try to parse as envelope first
|
||||
try {
|
||||
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
|
||||
deliveryService.handleIncomingMessage(envelope);
|
||||
} catch (Exception e) {
|
||||
// If not an envelope, it might be a legacy message - log and skip
|
||||
log.debug("[PluginMessagingConfig] Received non-enveloped message from client {}, skipping", clientId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error handling enveloped message from client {}: {}", clientId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,129 @@
|
||||
package de.assecutor.votianlt.messaging.delivery;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import de.assecutor.votianlt.controller.MessageController;
|
||||
import de.assecutor.votianlt.dto.AppLoginRequest;
|
||||
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Handles acknowledgments and routes incoming messages to application layer.
|
||||
* Acts as a bridge between the messaging layer and the application logic.
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class AcknowledgmentHandler {
|
||||
|
||||
private final MessageController messageController;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public AcknowledgmentHandler(@Lazy MessageController messageController) {
|
||||
this.messageController = messageController;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
/**
|
||||
* Route incoming message envelope to appropriate application handler.
|
||||
* Unwraps the envelope and delegates to MessageController.
|
||||
*/
|
||||
public void routeIncomingMessage(MessageEnvelope envelope) {
|
||||
try {
|
||||
String topic = envelope.getTopic();
|
||||
Object payload = envelope.getPayload();
|
||||
|
||||
log.debug("[AckHandler] Routing message {} on topic {}", envelope.getMessageId(), topic);
|
||||
|
||||
// Convert payload to Map for routing
|
||||
Map<String, Object> payloadMap = objectMapper.convertValue(payload,
|
||||
new TypeReference<Map<String, Object>>() {});
|
||||
|
||||
// Route based on topic pattern
|
||||
if (topic.matches("/server/.+/task_completed")) {
|
||||
handleTaskCompleted(payloadMap);
|
||||
} else if (topic.matches("/server/.+/jobs/assigned")) {
|
||||
handleJobsAssigned(topic, payloadMap);
|
||||
} else if (topic.equals("/server/login")) {
|
||||
handleLogin(payloadMap);
|
||||
} else if (topic.matches("/server/.+/message")) {
|
||||
handleIncomingMessage(topic, payloadMap);
|
||||
} else {
|
||||
log.debug("[AckHandler] No route for topic {}", topic);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[AckHandler] Error routing message {}: {}",
|
||||
envelope.getMessageId(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle task completion message
|
||||
*/
|
||||
private void handleTaskCompleted(Map<String, Object> payload) {
|
||||
try {
|
||||
Object tt = payload.get("taskType");
|
||||
String taskType = tt != null ? tt.toString() : null;
|
||||
messageController.handleTaskCompleted(payload, taskType);
|
||||
} catch (Exception e) {
|
||||
log.error("[AckHandler] Error handling task_completed: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle jobs assigned request
|
||||
*/
|
||||
private void handleJobsAssigned(String topic, Map<String, Object> payload) {
|
||||
try {
|
||||
// Extract clientId from topic: /server/{clientId}/jobs/assigned
|
||||
String[] parts = topic.split("/");
|
||||
String clientId = parts.length > 2 ? parts[2] : null;
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
payload.put("clientId", clientId);
|
||||
} else {
|
||||
log.warn("[AckHandler] Couldn't extract clientId from topic {} for jobs/assigned", topic);
|
||||
}
|
||||
messageController.handleGetAssignedJobs(payload);
|
||||
} catch (Exception e) {
|
||||
log.error("[AckHandler] Error handling jobs/assigned: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle login request
|
||||
*/
|
||||
private void handleLogin(Map<String, Object> payload) {
|
||||
try {
|
||||
AppLoginRequest req = objectMapper.convertValue(payload, AppLoginRequest.class);
|
||||
messageController.handleAppLogin(req);
|
||||
} catch (Exception e) {
|
||||
log.error("[AckHandler] Error handling login: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming chat message
|
||||
*/
|
||||
private void handleIncomingMessage(String topic, Map<String, Object> payload) {
|
||||
try {
|
||||
// Extract clientId from topic: /server/{clientId}/message
|
||||
String[] parts = topic.split("/");
|
||||
String clientId = parts.length > 2 ? parts[2] : null;
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
payload.put("clientId", clientId);
|
||||
} else {
|
||||
log.warn("[AckHandler] Couldn't extract clientId from topic {} for message", topic);
|
||||
}
|
||||
messageController.handleIncomingMessage(payload);
|
||||
} catch (Exception e) {
|
||||
log.error("[AckHandler] Error handling incoming message: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
package de.assecutor.votianlt.messaging.delivery;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* Configuration for message delivery service.
|
||||
*/
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "app.messaging.delivery")
|
||||
@Data
|
||||
public class DeliveryConfig {
|
||||
|
||||
/**
|
||||
* Maximum number of retry attempts for failed deliveries
|
||||
*/
|
||||
private int maxRetries = 3;
|
||||
|
||||
/**
|
||||
* Initial delay before first retry
|
||||
*/
|
||||
private Duration retryInitialDelay = Duration.ofSeconds(5);
|
||||
|
||||
/**
|
||||
* Maximum delay between retries
|
||||
*/
|
||||
private Duration retryMaxDelay = Duration.ofMinutes(5);
|
||||
|
||||
/**
|
||||
* Backoff multiplier for exponential backoff
|
||||
*/
|
||||
private double retryBackoffMultiplier = 2.0;
|
||||
|
||||
/**
|
||||
* Timeout for waiting for acknowledgment
|
||||
*/
|
||||
private Duration ackTimeout = Duration.ofSeconds(30);
|
||||
|
||||
/**
|
||||
* Default message expiry duration
|
||||
*/
|
||||
private Duration messageExpiry = Duration.ofHours(24);
|
||||
|
||||
/**
|
||||
* Interval for cleanup task (in minutes)
|
||||
*/
|
||||
private int cleanupIntervalMinutes = 60;
|
||||
|
||||
/**
|
||||
* Interval for retry task (in seconds)
|
||||
*/
|
||||
private int retryIntervalSeconds = 30;
|
||||
|
||||
/**
|
||||
* Retention period for acknowledged deliveries (in days)
|
||||
*/
|
||||
private int acknowledgedRetentionDays = 7;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,106 @@
|
||||
package de.assecutor.votianlt.messaging.delivery;
|
||||
|
||||
import de.assecutor.votianlt.messaging.model.*;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Service for reliable message delivery with acknowledgment tracking.
|
||||
* Provides guaranteed delivery with retry mechanism and acknowledgment handling.
|
||||
*/
|
||||
public interface MessageDeliveryService {
|
||||
|
||||
/**
|
||||
* Send a message to a specific client with delivery tracking and acknowledgment.
|
||||
*
|
||||
* @param clientId The target client identifier
|
||||
* @param messageType The type of message (e.g., "jobs", "message", "auth", "task")
|
||||
* @param payload The message payload (will be serialized to JSON)
|
||||
* @param options Delivery options (retries, timeout, etc.)
|
||||
* @return CompletableFuture with delivery receipt
|
||||
*/
|
||||
CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload, DeliveryOptions options);
|
||||
|
||||
/**
|
||||
* Send a message to a specific client with default delivery options.
|
||||
*
|
||||
* @param clientId The target client identifier
|
||||
* @param messageType The type of message
|
||||
* @param payload The message payload
|
||||
* @return CompletableFuture with delivery receipt
|
||||
*/
|
||||
default CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload) {
|
||||
return sendToClient(clientId, messageType, payload, DeliveryOptions.standard());
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message with delivery tracking and acknowledgment.
|
||||
* @deprecated Use {@link #sendToClient(String, String, Object, DeliveryOptions)} instead
|
||||
*
|
||||
* @param topic The destination topic
|
||||
* @param payload The message payload (will be serialized to JSON)
|
||||
* @param options Delivery options (retries, timeout, etc.)
|
||||
* @return CompletableFuture with delivery receipt
|
||||
*/
|
||||
@Deprecated
|
||||
CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload, DeliveryOptions options);
|
||||
|
||||
/**
|
||||
* Send a message with default delivery options.
|
||||
* @deprecated Use {@link #sendToClient(String, String, Object)} instead
|
||||
*
|
||||
* @param topic The destination topic
|
||||
* @param payload The message payload
|
||||
* @return CompletableFuture with delivery receipt
|
||||
*/
|
||||
@Deprecated
|
||||
default CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload) {
|
||||
return sendMessage(topic, payload, DeliveryOptions.standard());
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming message envelope from transport layer.
|
||||
* Extracts payload and routes to application layer.
|
||||
*
|
||||
* @param envelope The received message envelope
|
||||
*/
|
||||
void handleIncomingMessage(MessageEnvelope envelope);
|
||||
|
||||
/**
|
||||
* Handle acknowledgment from client.
|
||||
* Updates delivery status and removes from pending queue.
|
||||
*
|
||||
* @param ack The acknowledgment message
|
||||
*/
|
||||
void handleAcknowledgment(AcknowledgmentMessage ack);
|
||||
|
||||
/**
|
||||
* Get the current delivery status for a message.
|
||||
*
|
||||
* @param messageId The message ID
|
||||
* @return Optional containing the delivery status, or empty if not found
|
||||
*/
|
||||
Optional<DeliveryStatus> getDeliveryStatus(String messageId);
|
||||
|
||||
/**
|
||||
* Get detailed pending delivery information.
|
||||
*
|
||||
* @param messageId The message ID
|
||||
* @return Optional containing the pending delivery, or empty if not found
|
||||
*/
|
||||
Optional<PendingDelivery> getPendingDelivery(String messageId);
|
||||
|
||||
/**
|
||||
* Retry all pending deliveries that are ready for retry.
|
||||
* Called by scheduled task.
|
||||
*/
|
||||
void retryPendingDeliveries();
|
||||
|
||||
/**
|
||||
* Clean up expired and completed deliveries.
|
||||
* Called by scheduled task.
|
||||
*/
|
||||
void cleanupOldDeliveries();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,429 @@
|
||||
package de.assecutor.votianlt.messaging.delivery;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import de.assecutor.votianlt.messaging.model.*;
|
||||
import de.assecutor.votianlt.messaging.plugin.PluginManager;
|
||||
import de.assecutor.votianlt.messaging.plugin.SendOptions;
|
||||
import de.assecutor.votianlt.repository.MessageEnvelopeRepository;
|
||||
import de.assecutor.votianlt.repository.PendingDeliveryRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Implementation of MessageDeliveryService with reliable delivery guarantees.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
|
||||
private final PluginManager pluginManager;
|
||||
private final PendingDeliveryRepository pendingDeliveryRepository;
|
||||
private final MessageEnvelopeRepository envelopeRepository;
|
||||
private final AcknowledgmentHandler acknowledgmentHandler;
|
||||
private final DeliveryConfig config;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public MessageDeliveryServiceImpl(
|
||||
PluginManager pluginManager,
|
||||
PendingDeliveryRepository pendingDeliveryRepository,
|
||||
MessageEnvelopeRepository envelopeRepository,
|
||||
AcknowledgmentHandler acknowledgmentHandler,
|
||||
DeliveryConfig config) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.pendingDeliveryRepository = pendingDeliveryRepository;
|
||||
this.envelopeRepository = envelopeRepository;
|
||||
this.acknowledgmentHandler = acknowledgmentHandler;
|
||||
this.config = config;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload, DeliveryOptions options) {
|
||||
try {
|
||||
// Create destination identifier for tracking
|
||||
String destination = clientId + "/" + messageType;
|
||||
|
||||
// Create message envelope
|
||||
final LocalDateTime expiresAt = options.calculateExpiryTime();
|
||||
MessageEnvelope envelope = new MessageEnvelope(destination, payload, options.isRequiresAck(), expiresAt);
|
||||
|
||||
// Save envelope to database
|
||||
envelope = envelopeRepository.save(envelope);
|
||||
final String messageId = envelope.getMessageId();
|
||||
log.debug("[MessageDelivery] Created envelope {} for client {} (type: {})", messageId, clientId, messageType);
|
||||
|
||||
// Serialize envelope to JSON
|
||||
String json = objectMapper.writeValueAsString(envelope);
|
||||
byte[] envelopeData = json.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
// Create pending delivery record if acknowledgment is required
|
||||
if (options.isRequiresAck()) {
|
||||
PendingDelivery pending = new PendingDelivery(
|
||||
messageId,
|
||||
destination,
|
||||
envelopeData,
|
||||
options.getMaxRetries(),
|
||||
expiresAt
|
||||
);
|
||||
pendingDeliveryRepository.save(pending);
|
||||
log.debug("[MessageDelivery] Created pending delivery for message {}", messageId);
|
||||
}
|
||||
|
||||
// Send via plugin manager
|
||||
SendOptions sendOptions = SendOptions.builder()
|
||||
.qos(options.getQos())
|
||||
.retained(options.isRetained())
|
||||
.build();
|
||||
|
||||
final boolean requiresAck = options.isRequiresAck();
|
||||
final Duration ackTimeout = options.getAckTimeout();
|
||||
|
||||
return pluginManager.sendToClient(clientId, messageType, envelopeData, sendOptions)
|
||||
.thenApply(v -> {
|
||||
// Update pending delivery status
|
||||
if (requiresAck) {
|
||||
updatePendingDeliveryAfterSend(messageId, ackTimeout);
|
||||
}
|
||||
log.info("[MessageDelivery] Successfully sent message {} to client {} (type: {})",
|
||||
messageId, clientId, messageType);
|
||||
return DeliveryReceipt.submitted(messageId, destination, expiresAt);
|
||||
})
|
||||
.exceptionally(ex -> {
|
||||
log.error("[MessageDelivery] Failed to send message {} to client {} (type: {}): {}",
|
||||
messageId, clientId, messageType, ex.getMessage());
|
||||
if (requiresAck) {
|
||||
markPendingDeliveryFailed(messageId, ex.getMessage());
|
||||
}
|
||||
return DeliveryReceipt.failed(messageId, destination);
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error creating message for client {} (type: {}): {}",
|
||||
clientId, messageType, e.getMessage(), e);
|
||||
return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", clientId + "/" + messageType));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload, DeliveryOptions options) {
|
||||
// Extract clientId and messageType from topic
|
||||
// Topic format: /client/{clientId}/{messageType}
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length >= 4 && parts[1].equals("client")) {
|
||||
String clientId = parts[2];
|
||||
String messageType = parts[3];
|
||||
return sendToClient(clientId, messageType, payload, options);
|
||||
}
|
||||
|
||||
// Fallback for legacy topics - log warning
|
||||
log.warn("[MessageDelivery] Using deprecated sendMessage with topic: {}", topic);
|
||||
return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", topic));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleIncomingMessage(MessageEnvelope envelope) {
|
||||
try {
|
||||
log.info("[MessageDelivery] Received message {} on topic {}",
|
||||
envelope.getMessageId(), envelope.getTopic());
|
||||
|
||||
// Send acknowledgment if required
|
||||
if (envelope.isRequiresAck()) {
|
||||
sendAcknowledgment(envelope);
|
||||
}
|
||||
|
||||
// Forward to acknowledgment handler for application routing
|
||||
acknowledgmentHandler.routeIncomingMessage(envelope);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error handling incoming message {}: {}",
|
||||
envelope.getMessageId(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleAcknowledgment(AcknowledgmentMessage ack) {
|
||||
try {
|
||||
log.info("[MessageDelivery] Received acknowledgment for message {} with status {}",
|
||||
ack.getMessageId(), ack.getStatus());
|
||||
|
||||
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId());
|
||||
|
||||
if (pendingOpt.isEmpty()) {
|
||||
log.warn("[MessageDelivery] No pending delivery found for acknowledged message {}",
|
||||
ack.getMessageId());
|
||||
return;
|
||||
}
|
||||
|
||||
PendingDelivery pending = pendingOpt.get();
|
||||
|
||||
switch (ack.getStatus()) {
|
||||
case RECEIVED, PROCESSED -> {
|
||||
pending.markAsAcknowledged();
|
||||
pendingDeliveryRepository.save(pending);
|
||||
log.info("[MessageDelivery] Message {} acknowledged successfully", ack.getMessageId());
|
||||
}
|
||||
case FAILED -> {
|
||||
pending.markAsFailed(ack.getErrorMessage());
|
||||
pendingDeliveryRepository.save(pending);
|
||||
log.warn("[MessageDelivery] Message {} failed on client: {}",
|
||||
ack.getMessageId(), ack.getErrorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error handling acknowledgment for message {}: {}",
|
||||
ack.getMessageId(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<DeliveryStatus> getDeliveryStatus(String messageId) {
|
||||
return pendingDeliveryRepository.findByMessageId(messageId)
|
||||
.map(PendingDelivery::getStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<PendingDelivery> getPendingDelivery(String messageId) {
|
||||
return pendingDeliveryRepository.findByMessageId(messageId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void retryPendingDeliveries() {
|
||||
try {
|
||||
List<PendingDelivery> readyForRetry = pendingDeliveryRepository
|
||||
.findByStatusAndNextRetryAtBefore(DeliveryStatus.SENT, LocalDateTime.now());
|
||||
|
||||
if (readyForRetry.isEmpty()) {
|
||||
log.debug("[MessageDelivery] No pending deliveries ready for retry");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[MessageDelivery] Retrying {} pending deliveries", readyForRetry.size());
|
||||
|
||||
for (PendingDelivery pending : readyForRetry) {
|
||||
retryDelivery(pending);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error during retry process: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupOldDeliveries() {
|
||||
try {
|
||||
// Clean up acknowledged deliveries older than configured retention
|
||||
LocalDateTime cutoff = LocalDateTime.now().minus(Duration.ofDays(7));
|
||||
List<PendingDelivery> oldAcknowledged = pendingDeliveryRepository
|
||||
.findByStatusAndAcknowledgedAtBefore(DeliveryStatus.ACKNOWLEDGED, cutoff);
|
||||
|
||||
if (!oldAcknowledged.isEmpty()) {
|
||||
pendingDeliveryRepository.deleteAll(oldAcknowledged);
|
||||
log.info("[MessageDelivery] Cleaned up {} old acknowledged deliveries", oldAcknowledged.size());
|
||||
}
|
||||
|
||||
// Mark expired deliveries
|
||||
List<PendingDelivery> expired = pendingDeliveryRepository
|
||||
.findByStatusInAndExpiresAtBefore(
|
||||
List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT),
|
||||
LocalDateTime.now()
|
||||
);
|
||||
|
||||
for (PendingDelivery pending : expired) {
|
||||
pending.markAsExpired();
|
||||
pendingDeliveryRepository.save(pending);
|
||||
}
|
||||
|
||||
if (!expired.isEmpty()) {
|
||||
log.info("[MessageDelivery] Marked {} deliveries as expired", expired.size());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error during cleanup: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update pending delivery after successful send
|
||||
*/
|
||||
private void updatePendingDeliveryAfterSend(String messageId, Duration ackTimeout) {
|
||||
try {
|
||||
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId);
|
||||
if (pendingOpt.isPresent()) {
|
||||
PendingDelivery pending = pendingOpt.get();
|
||||
LocalDateTime nextRetry = LocalDateTime.now().plus(ackTimeout);
|
||||
pending.markAsSent(nextRetry);
|
||||
pendingDeliveryRepository.save(pending);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error updating pending delivery {}: {}", messageId, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark pending delivery as failed
|
||||
*/
|
||||
private void markPendingDeliveryFailed(String messageId, String reason) {
|
||||
try {
|
||||
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId);
|
||||
if (pendingOpt.isPresent()) {
|
||||
PendingDelivery pending = pendingOpt.get();
|
||||
pending.markAsFailed(reason);
|
||||
pendingDeliveryRepository.save(pending);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error marking delivery as failed {}: {}", messageId, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry a pending delivery
|
||||
*/
|
||||
private void retryDelivery(PendingDelivery pending) {
|
||||
try {
|
||||
// Check if expired
|
||||
if (pending.isExpired()) {
|
||||
pending.markAsExpired();
|
||||
pendingDeliveryRepository.save(pending);
|
||||
log.warn("[MessageDelivery] Message {} expired, not retrying", pending.getMessageId());
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if max retries reached
|
||||
if (pending.hasReachedMaxRetries()) {
|
||||
pending.markAsFailed("Max retries reached");
|
||||
pendingDeliveryRepository.save(pending);
|
||||
log.warn("[MessageDelivery] Message {} reached max retries", pending.getMessageId());
|
||||
return;
|
||||
}
|
||||
|
||||
// Increment retry count
|
||||
pending.incrementRetryCount();
|
||||
|
||||
// Calculate next retry time with exponential backoff
|
||||
Duration backoffDelay = calculateBackoff(pending.getRetryCount());
|
||||
LocalDateTime nextRetry = LocalDateTime.now().plus(backoffDelay);
|
||||
|
||||
// Extract clientId and messageType from topic
|
||||
// Topic format: clientId/messageType
|
||||
String topic = pending.getTopic();
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length < 2) {
|
||||
log.error("[MessageDelivery] Invalid topic format for retry: {}", topic);
|
||||
pending.markAsFailed("Invalid topic format");
|
||||
pendingDeliveryRepository.save(pending);
|
||||
return;
|
||||
}
|
||||
|
||||
String clientId = parts[0];
|
||||
String messageType = parts[1];
|
||||
|
||||
// Send via plugin manager
|
||||
SendOptions options = SendOptions.reliable();
|
||||
pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options)
|
||||
.thenAccept(v -> {
|
||||
pending.markAsSent(nextRetry);
|
||||
pendingDeliveryRepository.save(pending);
|
||||
log.info("[MessageDelivery] Retry {} successful for message {}",
|
||||
pending.getRetryCount(), pending.getMessageId());
|
||||
})
|
||||
.exceptionally(ex -> {
|
||||
log.error("[MessageDelivery] Retry failed for message {}: {}",
|
||||
pending.getMessageId(), ex.getMessage());
|
||||
pending.markAsFailed(ex.getMessage());
|
||||
pendingDeliveryRepository.save(pending);
|
||||
return null;
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error retrying delivery {}: {}",
|
||||
pending.getMessageId(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send acknowledgment back to client
|
||||
*/
|
||||
private void sendAcknowledgment(MessageEnvelope envelope) {
|
||||
try {
|
||||
// Extract client ID from topic (e.g., /server/{clientId}/... or clientId/messageType)
|
||||
String clientId = extractClientIdFromTopic(envelope.getTopic());
|
||||
if (clientId == null) {
|
||||
log.warn("[MessageDelivery] Cannot send ACK, no clientId in topic: {}", envelope.getTopic());
|
||||
return;
|
||||
}
|
||||
|
||||
// Create acknowledgment message
|
||||
AcknowledgmentMessage ack = new AcknowledgmentMessage(
|
||||
envelope.getMessageId(),
|
||||
AckStatus.RECEIVED,
|
||||
"server"
|
||||
);
|
||||
|
||||
// Send ACK to client using new API
|
||||
String ackJson = objectMapper.writeValueAsString(ack);
|
||||
byte[] ackData = ackJson.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
pluginManager.sendAckToClient(clientId, envelope.getMessageId(), ackData, SendOptions.fireAndForget())
|
||||
.thenAccept(v -> log.debug("[MessageDelivery] Sent ACK for message {}", envelope.getMessageId()))
|
||||
.exceptionally(ex -> {
|
||||
log.error("[MessageDelivery] Failed to send ACK for message {}: {}",
|
||||
envelope.getMessageId(), ex.getMessage());
|
||||
return null;
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error sending acknowledgment for message {}: {}",
|
||||
envelope.getMessageId(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate exponential backoff delay
|
||||
*/
|
||||
private Duration calculateBackoff(int retryCount) {
|
||||
long delayMs = (long) (config.getRetryInitialDelay().toMillis()
|
||||
* Math.pow(config.getRetryBackoffMultiplier(), retryCount - 1));
|
||||
long maxDelayMs = config.getRetryMaxDelay().toMillis();
|
||||
return Duration.ofMillis(Math.min(delayMs, maxDelayMs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract client ID from topic pattern.
|
||||
* Supports both old format (/server/{clientId}/...) and new format (clientId/messageType)
|
||||
*/
|
||||
private String extractClientIdFromTopic(String topic) {
|
||||
if (topic == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Old format: /server/{clientId}/...
|
||||
if (topic.startsWith("/server/")) {
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length > 2) {
|
||||
return parts[2];
|
||||
}
|
||||
}
|
||||
|
||||
// New format: clientId/messageType
|
||||
if (topic.contains("/")) {
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length >= 1) {
|
||||
return parts[0];
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package de.assecutor.votianlt.messaging.delivery;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Scheduled tasks for message delivery retry and cleanup.
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RetryScheduler {
|
||||
|
||||
private final MessageDeliveryService deliveryService;
|
||||
|
||||
public RetryScheduler(MessageDeliveryService deliveryService) {
|
||||
this.deliveryService = deliveryService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry pending deliveries every 30 seconds (configurable)
|
||||
*/
|
||||
@Scheduled(fixedDelayString = "${app.messaging.delivery.retry-interval-seconds:30}000")
|
||||
public void retryPendingDeliveries() {
|
||||
try {
|
||||
log.debug("[RetryScheduler] Starting retry task");
|
||||
deliveryService.retryPendingDeliveries();
|
||||
} catch (Exception e) {
|
||||
log.error("[RetryScheduler] Error in retry task: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup old deliveries every hour (configurable)
|
||||
*/
|
||||
@Scheduled(fixedDelayString = "${app.messaging.delivery.cleanup-interval-minutes:60}000")
|
||||
public void cleanupOldDeliveries() {
|
||||
try {
|
||||
log.debug("[RetryScheduler] Starting cleanup task");
|
||||
deliveryService.cleanupOldDeliveries();
|
||||
} catch (Exception e) {
|
||||
log.error("[RetryScheduler] Error in cleanup task: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package de.assecutor.votianlt.messaging.model;
|
||||
|
||||
/**
|
||||
* Status of message acknowledgment from client.
|
||||
*/
|
||||
public enum AckStatus {
|
||||
/**
|
||||
* Message was received by the client
|
||||
*/
|
||||
RECEIVED,
|
||||
|
||||
/**
|
||||
* Message was successfully processed by the client
|
||||
*/
|
||||
PROCESSED,
|
||||
|
||||
/**
|
||||
* Message processing failed on the client side
|
||||
*/
|
||||
FAILED
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
package de.assecutor.votianlt.messaging.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* Acknowledgment message sent by clients to confirm message receipt/processing.
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class AcknowledgmentMessage {
|
||||
|
||||
/**
|
||||
* ID of the message being acknowledged
|
||||
*/
|
||||
private String messageId;
|
||||
|
||||
/**
|
||||
* Status of the acknowledgment
|
||||
*/
|
||||
private AckStatus status;
|
||||
|
||||
/**
|
||||
* Timestamp when the acknowledgment was created
|
||||
*/
|
||||
private LocalDateTime timestamp;
|
||||
|
||||
/**
|
||||
* ID of the client sending the acknowledgment
|
||||
*/
|
||||
private String clientId;
|
||||
|
||||
/**
|
||||
* Optional error message if status is FAILED
|
||||
*/
|
||||
private String errorMessage;
|
||||
|
||||
/**
|
||||
* Constructor for successful acknowledgment
|
||||
*/
|
||||
public AcknowledgmentMessage(String messageId, AckStatus status, String clientId) {
|
||||
this.messageId = messageId;
|
||||
this.status = status;
|
||||
this.timestamp = LocalDateTime.now();
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for failed acknowledgment with error message
|
||||
*/
|
||||
public AcknowledgmentMessage(String messageId, String clientId, String errorMessage) {
|
||||
this.messageId = messageId;
|
||||
this.status = AckStatus.FAILED;
|
||||
this.timestamp = LocalDateTime.now();
|
||||
this.clientId = clientId;
|
||||
this.errorMessage = errorMessage;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
package de.assecutor.votianlt.messaging.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* Options for message delivery configuration.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class DeliveryOptions {
|
||||
|
||||
/**
|
||||
* Whether this message requires acknowledgment
|
||||
*/
|
||||
@Builder.Default
|
||||
private boolean requiresAck = true;
|
||||
|
||||
/**
|
||||
* Maximum number of retry attempts
|
||||
*/
|
||||
@Builder.Default
|
||||
private int maxRetries = 3;
|
||||
|
||||
/**
|
||||
* Timeout for acknowledgment
|
||||
*/
|
||||
@Builder.Default
|
||||
private Duration ackTimeout = Duration.ofSeconds(30);
|
||||
|
||||
/**
|
||||
* Message expiry duration from now
|
||||
*/
|
||||
@Builder.Default
|
||||
private Duration expiryDuration = Duration.ofHours(24);
|
||||
|
||||
/**
|
||||
* QoS level for transport (MQTT specific, but kept generic)
|
||||
*/
|
||||
@Builder.Default
|
||||
private int qos = 2;
|
||||
|
||||
/**
|
||||
* Whether message should be retained by broker
|
||||
*/
|
||||
@Builder.Default
|
||||
private boolean retained = false;
|
||||
|
||||
/**
|
||||
* Calculate expiry timestamp from duration
|
||||
*/
|
||||
public LocalDateTime calculateExpiryTime() {
|
||||
return LocalDateTime.now().plus(expiryDuration);
|
||||
}
|
||||
|
||||
/**
|
||||
* Default options for standard messages
|
||||
*/
|
||||
public static DeliveryOptions standard() {
|
||||
return DeliveryOptions.builder().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for fire-and-forget messages (no acknowledgment required)
|
||||
*/
|
||||
public static DeliveryOptions fireAndForget() {
|
||||
return DeliveryOptions.builder()
|
||||
.requiresAck(false)
|
||||
.maxRetries(0)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for critical messages with extended retry
|
||||
*/
|
||||
public static DeliveryOptions critical() {
|
||||
return DeliveryOptions.builder()
|
||||
.requiresAck(true)
|
||||
.maxRetries(5)
|
||||
.ackTimeout(Duration.ofMinutes(2))
|
||||
.expiryDuration(Duration.ofDays(7))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
package de.assecutor.votianlt.messaging.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* Receipt returned when a message is submitted for delivery.
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class DeliveryReceipt {
|
||||
|
||||
/**
|
||||
* Unique message identifier
|
||||
*/
|
||||
private String messageId;
|
||||
|
||||
/**
|
||||
* Topic where message was sent
|
||||
*/
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* When the message was submitted
|
||||
*/
|
||||
private LocalDateTime submittedAt;
|
||||
|
||||
/**
|
||||
* Initial delivery status
|
||||
*/
|
||||
private DeliveryStatus status;
|
||||
|
||||
/**
|
||||
* When the message will expire
|
||||
*/
|
||||
private LocalDateTime expiresAt;
|
||||
|
||||
/**
|
||||
* Create a receipt for a successfully submitted message
|
||||
*/
|
||||
public static DeliveryReceipt submitted(String messageId, String topic, LocalDateTime expiresAt) {
|
||||
return new DeliveryReceipt(
|
||||
messageId,
|
||||
topic,
|
||||
LocalDateTime.now(),
|
||||
DeliveryStatus.PENDING,
|
||||
expiresAt
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a receipt for a failed submission
|
||||
*/
|
||||
public static DeliveryReceipt failed(String messageId, String topic) {
|
||||
return new DeliveryReceipt(
|
||||
messageId,
|
||||
topic,
|
||||
LocalDateTime.now(),
|
||||
DeliveryStatus.FAILED,
|
||||
null
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
package de.assecutor.votianlt.messaging.model;
|
||||
|
||||
/**
|
||||
* Status of a message delivery attempt.
|
||||
*/
|
||||
public enum DeliveryStatus {
|
||||
/**
|
||||
* Message is queued but not yet sent
|
||||
*/
|
||||
PENDING,
|
||||
|
||||
/**
|
||||
* Message has been sent to the transport layer
|
||||
*/
|
||||
SENT,
|
||||
|
||||
/**
|
||||
* Client has acknowledged receipt of the message
|
||||
*/
|
||||
ACKNOWLEDGED,
|
||||
|
||||
/**
|
||||
* Delivery failed after all retry attempts
|
||||
*/
|
||||
FAILED,
|
||||
|
||||
/**
|
||||
* Message expired before delivery could be confirmed
|
||||
*/
|
||||
EXPIRED
|
||||
}
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
package de.assecutor.votianlt.messaging.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonGetter;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.mongodb.core.index.Indexed;
|
||||
import org.springframework.data.mongodb.core.mapping.Document;
|
||||
import org.springframework.data.mongodb.core.mapping.Field;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Envelope that wraps all messages sent through the messaging system.
|
||||
* Contains metadata for delivery tracking and acknowledgment.
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Document(collection = "message_envelopes")
|
||||
public class MessageEnvelope {
|
||||
|
||||
@Id
|
||||
@JsonIgnore
|
||||
private ObjectId id;
|
||||
|
||||
/**
|
||||
* Unique identifier for this message (UUID)
|
||||
*/
|
||||
@Field("message_id")
|
||||
@Indexed(unique = true)
|
||||
private String messageId;
|
||||
|
||||
/**
|
||||
* Timestamp when the envelope was created
|
||||
*/
|
||||
@Field("timestamp")
|
||||
private LocalDateTime timestamp;
|
||||
|
||||
/**
|
||||
* Target topic for this message
|
||||
*/
|
||||
@Field("topic")
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* The actual message payload (can be any serializable object)
|
||||
*/
|
||||
@Field("payload")
|
||||
private Object payload;
|
||||
|
||||
/**
|
||||
* Whether this message requires acknowledgment from the receiver
|
||||
*/
|
||||
@Field("requires_ack")
|
||||
private boolean requiresAck;
|
||||
|
||||
/**
|
||||
* Number of times this message has been retried
|
||||
*/
|
||||
@Field("retry_count")
|
||||
private int retryCount;
|
||||
|
||||
/**
|
||||
* When this message expires and should no longer be delivered
|
||||
*/
|
||||
@Field("expires_at")
|
||||
private LocalDateTime expiresAt;
|
||||
|
||||
/**
|
||||
* Additional metadata for the message
|
||||
*/
|
||||
@Field("metadata")
|
||||
private Map<String, String> metadata;
|
||||
|
||||
/**
|
||||
* Constructor for creating a new envelope with payload
|
||||
*/
|
||||
public MessageEnvelope(String topic, Object payload, boolean requiresAck, LocalDateTime expiresAt) {
|
||||
this.messageId = UUID.randomUUID().toString();
|
||||
this.timestamp = LocalDateTime.now();
|
||||
this.topic = topic;
|
||||
this.payload = payload;
|
||||
this.requiresAck = requiresAck;
|
||||
this.retryCount = 0;
|
||||
this.expiresAt = expiresAt;
|
||||
this.metadata = new HashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this message has expired
|
||||
*/
|
||||
public boolean isExpired() {
|
||||
return expiresAt != null && LocalDateTime.now().isAfter(expiresAt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the retry counter
|
||||
*/
|
||||
public void incrementRetryCount() {
|
||||
this.retryCount++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add metadata to the envelope
|
||||
*/
|
||||
public void addMetadata(String key, String value) {
|
||||
if (this.metadata == null) {
|
||||
this.metadata = new HashMap<>();
|
||||
}
|
||||
this.metadata.put(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the ObjectId as string for JSON serialization
|
||||
*/
|
||||
@JsonGetter("id")
|
||||
public String getIdAsString() {
|
||||
return id != null ? id.toString() : null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,214 @@
|
||||
package de.assecutor.votianlt.messaging.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonGetter;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.mongodb.core.index.Indexed;
|
||||
import org.springframework.data.mongodb.core.mapping.Document;
|
||||
import org.springframework.data.mongodb.core.mapping.Field;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* Represents a message delivery that is pending acknowledgment.
|
||||
* Stored in MongoDB for retry and tracking purposes.
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Document(collection = "pending_deliveries")
|
||||
public class PendingDelivery {
|
||||
|
||||
@Id
|
||||
@JsonIgnore
|
||||
private ObjectId id;
|
||||
|
||||
/**
|
||||
* Unique message identifier
|
||||
*/
|
||||
@Field("message_id")
|
||||
@Indexed(unique = true)
|
||||
private String messageId;
|
||||
|
||||
/**
|
||||
* Target topic for this message
|
||||
*/
|
||||
@Field("topic")
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* Serialized envelope data (JSON bytes)
|
||||
*/
|
||||
@Field("envelope_data")
|
||||
private byte[] envelopeData;
|
||||
|
||||
/**
|
||||
* Current delivery status
|
||||
*/
|
||||
@Field("status")
|
||||
@Indexed
|
||||
private DeliveryStatus status;
|
||||
|
||||
/**
|
||||
* When the delivery record was created
|
||||
*/
|
||||
@Field("created_at")
|
||||
private LocalDateTime createdAt;
|
||||
|
||||
/**
|
||||
* When the message was last sent
|
||||
*/
|
||||
@Field("sent_at")
|
||||
private LocalDateTime sentAt;
|
||||
|
||||
/**
|
||||
* When acknowledgment was received
|
||||
*/
|
||||
@Field("acknowledged_at")
|
||||
private LocalDateTime acknowledgedAt;
|
||||
|
||||
/**
|
||||
* Number of retry attempts made
|
||||
*/
|
||||
@Field("retry_count")
|
||||
private int retryCount;
|
||||
|
||||
/**
|
||||
* Maximum number of retries allowed
|
||||
*/
|
||||
@Field("max_retries")
|
||||
private int maxRetries;
|
||||
|
||||
/**
|
||||
* When the next retry should be attempted
|
||||
*/
|
||||
@Field("next_retry_at")
|
||||
@Indexed
|
||||
private LocalDateTime nextRetryAt;
|
||||
|
||||
/**
|
||||
* When this delivery expires
|
||||
*/
|
||||
@Field("expires_at")
|
||||
@Indexed
|
||||
private LocalDateTime expiresAt;
|
||||
|
||||
/**
|
||||
* Reason for failure (if status is FAILED)
|
||||
*/
|
||||
@Field("failure_reason")
|
||||
private String failureReason;
|
||||
|
||||
/**
|
||||
* Client ID (extracted from topic if available)
|
||||
*/
|
||||
@Field("client_id")
|
||||
private String clientId;
|
||||
|
||||
/**
|
||||
* Constructor for new pending delivery
|
||||
*/
|
||||
public PendingDelivery(String messageId, String topic, byte[] envelopeData,
|
||||
int maxRetries, LocalDateTime expiresAt) {
|
||||
this.messageId = messageId;
|
||||
this.topic = topic;
|
||||
this.envelopeData = envelopeData;
|
||||
this.status = DeliveryStatus.PENDING;
|
||||
this.createdAt = LocalDateTime.now();
|
||||
this.retryCount = 0;
|
||||
this.maxRetries = maxRetries;
|
||||
this.expiresAt = expiresAt;
|
||||
this.clientId = extractClientIdFromTopic(topic);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark as sent and schedule next retry
|
||||
*/
|
||||
public void markAsSent(LocalDateTime nextRetryAt) {
|
||||
this.status = DeliveryStatus.SENT;
|
||||
this.sentAt = LocalDateTime.now();
|
||||
this.nextRetryAt = nextRetryAt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark as acknowledged
|
||||
*/
|
||||
public void markAsAcknowledged() {
|
||||
this.status = DeliveryStatus.ACKNOWLEDGED;
|
||||
this.acknowledgedAt = LocalDateTime.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark as failed with reason
|
||||
*/
|
||||
public void markAsFailed(String reason) {
|
||||
this.status = DeliveryStatus.FAILED;
|
||||
this.failureReason = reason;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark as expired
|
||||
*/
|
||||
public void markAsExpired() {
|
||||
this.status = DeliveryStatus.EXPIRED;
|
||||
this.failureReason = "Message expired before delivery could be confirmed";
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment retry count
|
||||
*/
|
||||
public void incrementRetryCount() {
|
||||
this.retryCount++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if max retries reached
|
||||
*/
|
||||
public boolean hasReachedMaxRetries() {
|
||||
return retryCount >= maxRetries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if expired
|
||||
*/
|
||||
public boolean isExpired() {
|
||||
return expiresAt != null && LocalDateTime.now().isAfter(expiresAt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if ready for retry
|
||||
*/
|
||||
public boolean isReadyForRetry() {
|
||||
return status == DeliveryStatus.SENT
|
||||
&& nextRetryAt != null
|
||||
&& LocalDateTime.now().isAfter(nextRetryAt)
|
||||
&& !hasReachedMaxRetries()
|
||||
&& !isExpired();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract client ID from topic pattern /client/{clientId}/...
|
||||
*/
|
||||
private String extractClientIdFromTopic(String topic) {
|
||||
if (topic != null && topic.startsWith("/client/")) {
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length > 2) {
|
||||
return parts[2];
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the ObjectId as string for JSON serialization
|
||||
*/
|
||||
@JsonGetter("id")
|
||||
public String getIdAsString() {
|
||||
return id != null ? id.toString() : null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* Event representing a connection state change.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ConnectionStateEvent {
|
||||
|
||||
/**
|
||||
* Connection state
|
||||
*/
|
||||
private ConnectionState state;
|
||||
|
||||
/**
|
||||
* Previous connection state
|
||||
*/
|
||||
private ConnectionState previousState;
|
||||
|
||||
/**
|
||||
* Timestamp of the state change
|
||||
*/
|
||||
@Builder.Default
|
||||
private LocalDateTime timestamp = LocalDateTime.now();
|
||||
|
||||
/**
|
||||
* Optional error message if state is ERROR or DISCONNECTED
|
||||
*/
|
||||
private String errorMessage;
|
||||
|
||||
/**
|
||||
* Optional exception if state is ERROR
|
||||
*/
|
||||
private Throwable exception;
|
||||
|
||||
/**
|
||||
* Plugin that generated this event
|
||||
*/
|
||||
private String pluginName;
|
||||
|
||||
/**
|
||||
* Connection states
|
||||
*/
|
||||
public enum ConnectionState {
|
||||
/**
|
||||
* Plugin is initializing
|
||||
*/
|
||||
INITIALIZING,
|
||||
|
||||
/**
|
||||
* Plugin is connecting to the transport
|
||||
*/
|
||||
CONNECTING,
|
||||
|
||||
/**
|
||||
* Plugin is connected and ready
|
||||
*/
|
||||
CONNECTED,
|
||||
|
||||
/**
|
||||
* Plugin is disconnecting
|
||||
*/
|
||||
DISCONNECTING,
|
||||
|
||||
/**
|
||||
* Plugin is disconnected
|
||||
*/
|
||||
DISCONNECTED,
|
||||
|
||||
/**
|
||||
* Plugin encountered an error
|
||||
*/
|
||||
ERROR,
|
||||
|
||||
/**
|
||||
* Plugin is reconnecting after a failure
|
||||
*/
|
||||
RECONNECTING
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the connection is active.
|
||||
*
|
||||
* @return true if connected
|
||||
*/
|
||||
public boolean isConnected() {
|
||||
return state == ConnectionState.CONNECTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if there was an error.
|
||||
*
|
||||
* @return true if state is ERROR
|
||||
*/
|
||||
public boolean isError() {
|
||||
return state == ConnectionState.ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Interface for messaging transport plugins.
|
||||
* Plugins implement specific transport protocols (MQTT, WebSocket, gRPC, etc.)
|
||||
* and provide a unified interface for the messaging layer.
|
||||
*
|
||||
* The plugin is responsible for managing the internal topic/channel structure.
|
||||
* The messaging layer only uses clientId and messageType as identifiers.
|
||||
*/
|
||||
public interface MessagingPlugin {
|
||||
|
||||
/**
|
||||
* Initialize the plugin with configuration.
|
||||
* Called once during application startup.
|
||||
*
|
||||
* @param config Plugin-specific configuration
|
||||
* @throws PluginException if initialization fails
|
||||
*/
|
||||
void init(PluginConfig config) throws PluginException;
|
||||
|
||||
/**
|
||||
* Shutdown the plugin and release resources.
|
||||
* Called during application shutdown.
|
||||
*
|
||||
* @throws PluginException if shutdown fails
|
||||
*/
|
||||
void exit() throws PluginException;
|
||||
|
||||
/**
|
||||
* Callback when connection state changes.
|
||||
* The plugin should call this method when the underlying transport
|
||||
* connection state changes (connected, disconnected, error).
|
||||
*
|
||||
* @param listener Connection state listener
|
||||
*/
|
||||
void setConnectionListener(ConnectionStateListener listener);
|
||||
|
||||
/**
|
||||
* Send a message to a specific client.
|
||||
* The plugin is responsible for determining the correct topic/channel based on the messageType.
|
||||
*
|
||||
* @param clientId Target client identifier
|
||||
* @param messageType Type of message (e.g., "jobs", "message", "auth", "task")
|
||||
* @param payload Message payload as byte array
|
||||
* @param options Transport-specific options
|
||||
* @return CompletableFuture that completes when message is sent
|
||||
* @throws PluginException if sending fails
|
||||
*/
|
||||
CompletableFuture<Void> sendToClient(String clientId, String messageType, byte[] payload, SendOptions options) throws PluginException;
|
||||
|
||||
/**
|
||||
* Send an acknowledgment to a specific client.
|
||||
* The plugin is responsible for determining the correct ACK topic/channel.
|
||||
*
|
||||
* @param clientId Target client identifier
|
||||
* @param messageId Message ID being acknowledged
|
||||
* @param payload ACK payload as byte array
|
||||
* @param options Transport-specific options
|
||||
* @return CompletableFuture that completes when ACK is sent
|
||||
* @throws PluginException if sending fails
|
||||
*/
|
||||
CompletableFuture<Void> sendAckToClient(String clientId, String messageId, byte[] payload, SendOptions options) throws PluginException;
|
||||
|
||||
/**
|
||||
* Register a handler for incoming messages of a specific type from clients.
|
||||
* The plugin is responsible for subscribing to the appropriate topics/channels.
|
||||
*
|
||||
* @param messageType Type of message to handle (e.g., "task_completed", "message", "jobs/assigned", "login")
|
||||
* @param handler Message handler to be called when a message is received
|
||||
* @throws PluginException if registration fails
|
||||
*/
|
||||
void registerMessageHandler(String messageType, ClientMessageHandler handler) throws PluginException;
|
||||
|
||||
/**
|
||||
* Register a handler for incoming acknowledgments from clients.
|
||||
* The plugin is responsible for subscribing to the appropriate ACK topics/channels.
|
||||
*
|
||||
* @param handler ACK handler to be called when an ACK is received
|
||||
* @throws PluginException if registration fails
|
||||
*/
|
||||
void registerAckHandler(AckHandler handler) throws PluginException;
|
||||
|
||||
/**
|
||||
* Check if the plugin is currently connected.
|
||||
*
|
||||
* @return true if connected, false otherwise
|
||||
*/
|
||||
boolean isConnected();
|
||||
|
||||
/**
|
||||
* Get the plugin name/type identifier.
|
||||
*
|
||||
* @return Plugin name (e.g., "mqtt", "websocket", "grpc")
|
||||
*/
|
||||
String getPluginName();
|
||||
|
||||
/**
|
||||
* Get plugin version.
|
||||
*
|
||||
* @return Plugin version string
|
||||
*/
|
||||
String getPluginVersion();
|
||||
|
||||
/**
|
||||
* Get plugin metadata/information.
|
||||
*
|
||||
* @return Plugin metadata
|
||||
*/
|
||||
PluginMetadata getMetadata();
|
||||
|
||||
/**
|
||||
* Callback interface for connection state changes.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
interface ConnectionStateListener {
|
||||
/**
|
||||
* Called when connection state changes.
|
||||
*
|
||||
* @param event Connection state event
|
||||
*/
|
||||
void onConnectionStateChanged(ConnectionStateEvent event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for received messages from clients.
|
||||
* Includes the clientId extracted from the topic/channel.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
interface ClientMessageHandler {
|
||||
/**
|
||||
* Called when a message is received from a client.
|
||||
*
|
||||
* @param clientId Client identifier extracted from the topic/channel
|
||||
* @param payload Message payload as byte array
|
||||
*/
|
||||
void onMessageReceived(String clientId, byte[] payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for received acknowledgments from clients.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
interface AckHandler {
|
||||
/**
|
||||
* Called when an ACK is received from a client.
|
||||
*
|
||||
* @param messageId Message ID being acknowledged
|
||||
* @param payload ACK payload as byte array
|
||||
*/
|
||||
void onAckReceived(String messageId, byte[] payload);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Configuration for messaging plugins.
|
||||
* Provides a flexible key-value store for plugin-specific settings.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class PluginConfig {
|
||||
|
||||
/**
|
||||
* Plugin-specific properties
|
||||
*/
|
||||
@Builder.Default
|
||||
private Map<String, Object> properties = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Get a string property.
|
||||
*
|
||||
* @param key Property key
|
||||
* @return Property value or null if not found
|
||||
*/
|
||||
public String getString(String key) {
|
||||
Object value = properties.get(key);
|
||||
return value != null ? value.toString() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a string property with default value.
|
||||
*
|
||||
* @param key Property key
|
||||
* @param defaultValue Default value if property not found
|
||||
* @return Property value or default
|
||||
*/
|
||||
public String getString(String key, String defaultValue) {
|
||||
String value = getString(key);
|
||||
return value != null ? value : defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an integer property.
|
||||
*
|
||||
* @param key Property key
|
||||
* @return Property value or null if not found
|
||||
*/
|
||||
public Integer getInt(String key) {
|
||||
Object value = properties.get(key);
|
||||
if (value instanceof Integer) {
|
||||
return (Integer) value;
|
||||
} else if (value instanceof String) {
|
||||
try {
|
||||
return Integer.parseInt((String) value);
|
||||
} catch (NumberFormatException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an integer property with default value.
|
||||
*
|
||||
* @param key Property key
|
||||
* @param defaultValue Default value if property not found
|
||||
* @return Property value or default
|
||||
*/
|
||||
public int getInt(String key, int defaultValue) {
|
||||
Integer value = getInt(key);
|
||||
return value != null ? value : defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a boolean property.
|
||||
*
|
||||
* @param key Property key
|
||||
* @return Property value or null if not found
|
||||
*/
|
||||
public Boolean getBoolean(String key) {
|
||||
Object value = properties.get(key);
|
||||
if (value instanceof Boolean) {
|
||||
return (Boolean) value;
|
||||
} else if (value instanceof String) {
|
||||
return Boolean.parseBoolean((String) value);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a boolean property with default value.
|
||||
*
|
||||
* @param key Property key
|
||||
* @param defaultValue Default value if property not found
|
||||
* @return Property value or default
|
||||
*/
|
||||
public boolean getBoolean(String key, boolean defaultValue) {
|
||||
Boolean value = getBoolean(key);
|
||||
return value != null ? value : defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a property.
|
||||
*
|
||||
* @param key Property key
|
||||
* @param value Property value
|
||||
*/
|
||||
public void setProperty(String key, Object value) {
|
||||
properties.put(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a property exists.
|
||||
*
|
||||
* @param key Property key
|
||||
* @return true if property exists
|
||||
*/
|
||||
public boolean hasProperty(String key) {
|
||||
return properties.containsKey(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
/**
|
||||
* Exception thrown by messaging plugins.
|
||||
*/
|
||||
public class PluginException extends Exception {
|
||||
|
||||
public PluginException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public PluginException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public PluginException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,251 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Manager for messaging plugins.
|
||||
* Handles plugin lifecycle, registration, and delegation.
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class PluginManager {
|
||||
|
||||
private MessagingPlugin activePlugin;
|
||||
private final List<ConnectionStateEvent> connectionHistory = new ArrayList<>();
|
||||
private final List<PluginStateListener> stateListeners = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Initialize and activate a plugin.
|
||||
*
|
||||
* @param plugin Plugin to activate
|
||||
* @param config Plugin configuration
|
||||
* @throws PluginException if initialization fails
|
||||
*/
|
||||
public void activatePlugin(MessagingPlugin plugin, PluginConfig config) throws PluginException {
|
||||
log.info("[PluginManager] Activating plugin: {}", plugin.getPluginName());
|
||||
|
||||
// Shutdown existing plugin if any
|
||||
if (activePlugin != null) {
|
||||
log.info("[PluginManager] Shutting down existing plugin: {}", activePlugin.getPluginName());
|
||||
try {
|
||||
activePlugin.exit();
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginManager] Error shutting down existing plugin: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// Set connection listener
|
||||
plugin.setConnectionListener(event -> {
|
||||
String previousState = event.getPreviousState() != null
|
||||
? event.getPreviousState().toString()
|
||||
: "NONE";
|
||||
log.info("[PluginManager] Connection state changed: {} -> {}",
|
||||
previousState, event.getState());
|
||||
connectionHistory.add(event);
|
||||
notifyStateListeners(event);
|
||||
});
|
||||
|
||||
// Initialize plugin
|
||||
plugin.init(config);
|
||||
activePlugin = plugin;
|
||||
|
||||
log.info("[PluginManager] Plugin activated: {} v{}",
|
||||
plugin.getPluginName(), plugin.getPluginVersion());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the currently active plugin.
|
||||
*
|
||||
* @return Active plugin or empty if none
|
||||
*/
|
||||
public Optional<MessagingPlugin> getActivePlugin() {
|
||||
return Optional.ofNullable(activePlugin);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a specific client via the active plugin.
|
||||
*
|
||||
* @param clientId Target client identifier
|
||||
* @param messageType Type of message (e.g., "jobs", "message", "auth", "task")
|
||||
* @param payload Message payload
|
||||
* @param options Send options
|
||||
* @return CompletableFuture that completes when message is sent
|
||||
* @throws PluginException if no plugin is active or sending fails
|
||||
*/
|
||||
public CompletableFuture<Void> sendToClient(String clientId, String messageType, byte[] payload, SendOptions options) throws PluginException {
|
||||
if (activePlugin == null) {
|
||||
return CompletableFuture.failedFuture(new PluginException("No active plugin"));
|
||||
}
|
||||
|
||||
if (!activePlugin.isConnected()) {
|
||||
return CompletableFuture.failedFuture(new PluginException("Plugin is not connected"));
|
||||
}
|
||||
|
||||
return activePlugin.sendToClient(clientId, messageType, payload, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an acknowledgment to a specific client via the active plugin.
|
||||
*
|
||||
* @param clientId Target client identifier
|
||||
* @param messageId Message ID being acknowledged
|
||||
* @param payload ACK payload
|
||||
* @param options Send options
|
||||
* @return CompletableFuture that completes when ACK is sent
|
||||
* @throws PluginException if no plugin is active or sending fails
|
||||
*/
|
||||
public CompletableFuture<Void> sendAckToClient(String clientId, String messageId, byte[] payload, SendOptions options) throws PluginException {
|
||||
if (activePlugin == null) {
|
||||
return CompletableFuture.failedFuture(new PluginException("No active plugin"));
|
||||
}
|
||||
|
||||
if (!activePlugin.isConnected()) {
|
||||
return CompletableFuture.failedFuture(new PluginException("Plugin is not connected"));
|
||||
}
|
||||
|
||||
return activePlugin.sendAckToClient(clientId, messageId, payload, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a handler for incoming messages of a specific type from clients.
|
||||
*
|
||||
* @param messageType Type of message to handle
|
||||
* @param handler Message handler
|
||||
* @throws PluginException if no plugin is active or registration fails
|
||||
*/
|
||||
public void registerMessageHandler(String messageType, MessagingPlugin.ClientMessageHandler handler) throws PluginException {
|
||||
if (activePlugin == null) {
|
||||
throw new PluginException("No active plugin");
|
||||
}
|
||||
|
||||
activePlugin.registerMessageHandler(messageType, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a handler for incoming acknowledgments from clients.
|
||||
*
|
||||
* @param handler ACK handler
|
||||
* @throws PluginException if no plugin is active or registration fails
|
||||
*/
|
||||
public void registerAckHandler(MessagingPlugin.AckHandler handler) throws PluginException {
|
||||
if (activePlugin == null) {
|
||||
throw new PluginException("No active plugin");
|
||||
}
|
||||
|
||||
activePlugin.registerAckHandler(handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the active plugin is connected.
|
||||
*
|
||||
* @return true if connected, false otherwise
|
||||
*/
|
||||
public boolean isConnected() {
|
||||
return activePlugin != null && activePlugin.isConnected();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get metadata of the active plugin.
|
||||
*
|
||||
* @return Plugin metadata or empty if no plugin is active
|
||||
*/
|
||||
public Optional<PluginMetadata> getActivePluginMetadata() {
|
||||
return Optional.ofNullable(activePlugin).map(MessagingPlugin::getMetadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection history.
|
||||
*
|
||||
* @return List of connection state events
|
||||
*/
|
||||
public List<ConnectionStateEvent> getConnectionHistory() {
|
||||
return new ArrayList<>(connectionHistory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last connection state event.
|
||||
*
|
||||
* @return Last connection state event or empty if none
|
||||
*/
|
||||
public Optional<ConnectionStateEvent> getLastConnectionState() {
|
||||
if (connectionHistory.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(connectionHistory.get(connectionHistory.size() - 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a plugin state listener.
|
||||
*
|
||||
* @param listener State listener
|
||||
*/
|
||||
public void addStateListener(PluginStateListener listener) {
|
||||
stateListeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a plugin state listener.
|
||||
*
|
||||
* @param listener State listener
|
||||
*/
|
||||
public void removeStateListener(PluginStateListener listener) {
|
||||
stateListeners.remove(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify all state listeners of a connection state change.
|
||||
*
|
||||
* @param event Connection state event
|
||||
*/
|
||||
private void notifyStateListeners(ConnectionStateEvent event) {
|
||||
for (PluginStateListener listener : stateListeners) {
|
||||
try {
|
||||
listener.onConnectionStateChanged(event);
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginManager] Error in state listener: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the plugin manager and active plugin.
|
||||
*/
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
log.info("[PluginManager] Shutting down plugin manager");
|
||||
|
||||
if (activePlugin != null) {
|
||||
try {
|
||||
activePlugin.exit();
|
||||
log.info("[PluginManager] Active plugin shut down successfully");
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginManager] Error shutting down active plugin: {}", e.getMessage(), e);
|
||||
}
|
||||
activePlugin = null;
|
||||
}
|
||||
|
||||
stateListeners.clear();
|
||||
connectionHistory.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener interface for plugin state changes.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface PluginStateListener {
|
||||
/**
|
||||
* Called when plugin connection state changes.
|
||||
*
|
||||
* @param event Connection state event
|
||||
*/
|
||||
void onConnectionStateChanged(ConnectionStateEvent event);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Metadata about a messaging plugin.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class PluginMetadata {
|
||||
|
||||
/**
|
||||
* Plugin name
|
||||
*/
|
||||
private String name;
|
||||
|
||||
/**
|
||||
* Plugin version
|
||||
*/
|
||||
private String version;
|
||||
|
||||
/**
|
||||
* Plugin description
|
||||
*/
|
||||
private String description;
|
||||
|
||||
/**
|
||||
* Plugin author/vendor
|
||||
*/
|
||||
private String author;
|
||||
|
||||
/**
|
||||
* Supported features
|
||||
*/
|
||||
@Builder.Default
|
||||
private List<String> supportedFeatures = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Whether the plugin supports wildcards in topic patterns
|
||||
*/
|
||||
@Builder.Default
|
||||
private boolean supportsWildcards = false;
|
||||
|
||||
/**
|
||||
* Whether the plugin supports retained messages
|
||||
*/
|
||||
@Builder.Default
|
||||
private boolean supportsRetainedMessages = false;
|
||||
|
||||
/**
|
||||
* Whether the plugin supports QoS levels
|
||||
*/
|
||||
@Builder.Default
|
||||
private boolean supportsQos = false;
|
||||
|
||||
/**
|
||||
* Maximum QoS level supported (0, 1, 2)
|
||||
*/
|
||||
@Builder.Default
|
||||
private int maxQosLevel = 0;
|
||||
|
||||
/**
|
||||
* Check if a feature is supported.
|
||||
*
|
||||
* @param feature Feature name
|
||||
* @return true if supported
|
||||
*/
|
||||
public boolean supportsFeature(String feature) {
|
||||
return supportedFeatures.contains(feature);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a supported feature.
|
||||
*
|
||||
* @param feature Feature name
|
||||
*/
|
||||
public void addSupportedFeature(String feature) {
|
||||
if (!supportedFeatures.contains(feature)) {
|
||||
supportedFeatures.add(feature);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Represents a message received from a messaging plugin.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ReceivedMessage {
|
||||
|
||||
/**
|
||||
* Topic/channel the message was received on
|
||||
*/
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* Message payload
|
||||
*/
|
||||
private byte[] payload;
|
||||
|
||||
/**
|
||||
* Quality of Service level (if applicable)
|
||||
*/
|
||||
private int qos;
|
||||
|
||||
/**
|
||||
* Whether the message was retained
|
||||
*/
|
||||
private boolean retained;
|
||||
|
||||
/**
|
||||
* Timestamp when message was received
|
||||
*/
|
||||
@Builder.Default
|
||||
private LocalDateTime receivedAt = LocalDateTime.now();
|
||||
|
||||
/**
|
||||
* Additional metadata from the transport
|
||||
*/
|
||||
@Builder.Default
|
||||
private Map<String, Object> metadata = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Get metadata value.
|
||||
*
|
||||
* @param key Metadata key
|
||||
* @return Metadata value or null
|
||||
*/
|
||||
public Object getMetadata(String key) {
|
||||
return metadata.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set metadata value.
|
||||
*
|
||||
* @param key Metadata key
|
||||
* @param value Metadata value
|
||||
*/
|
||||
public void setMetadata(String key, Object value) {
|
||||
metadata.put(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get payload as UTF-8 string.
|
||||
*
|
||||
* @return Payload as string
|
||||
*/
|
||||
public String getPayloadAsString() {
|
||||
return payload != null ? new String(payload, java.nio.charset.StandardCharsets.UTF_8) : null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
package de.assecutor.votianlt.messaging.plugin;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Options for sending messages via plugins.
|
||||
* Provides transport-agnostic options with extensibility for plugin-specific settings.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class SendOptions {
|
||||
|
||||
/**
|
||||
* Quality of Service level (0, 1, 2 for MQTT-like transports)
|
||||
*/
|
||||
@Builder.Default
|
||||
private int qos = 1;
|
||||
|
||||
/**
|
||||
* Whether the message should be retained by the broker/server
|
||||
*/
|
||||
@Builder.Default
|
||||
private boolean retained = false;
|
||||
|
||||
/**
|
||||
* Message priority (if supported by transport)
|
||||
*/
|
||||
@Builder.Default
|
||||
private int priority = 0;
|
||||
|
||||
/**
|
||||
* Message expiry time in seconds (if supported by transport)
|
||||
*/
|
||||
private Long expirySeconds;
|
||||
|
||||
/**
|
||||
* Additional plugin-specific options
|
||||
*/
|
||||
@Builder.Default
|
||||
private Map<String, Object> additionalOptions = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Get an additional option.
|
||||
*
|
||||
* @param key Option key
|
||||
* @return Option value or null
|
||||
*/
|
||||
public Object getAdditionalOption(String key) {
|
||||
return additionalOptions.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set an additional option.
|
||||
*
|
||||
* @param key Option key
|
||||
* @param value Option value
|
||||
*/
|
||||
public void setAdditionalOption(String key, Object value) {
|
||||
additionalOptions.put(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create default send options.
|
||||
*
|
||||
* @return Default options
|
||||
*/
|
||||
public static SendOptions defaults() {
|
||||
return SendOptions.builder().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create options for fire-and-forget messages.
|
||||
*
|
||||
* @return Fire-and-forget options
|
||||
*/
|
||||
public static SendOptions fireAndForget() {
|
||||
return SendOptions.builder()
|
||||
.qos(0)
|
||||
.retained(false)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create options for reliable delivery.
|
||||
*
|
||||
* @return Reliable delivery options
|
||||
*/
|
||||
public static SendOptions reliable() {
|
||||
return SendOptions.builder()
|
||||
.qos(2)
|
||||
.retained(false)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,419 @@
|
||||
package de.assecutor.votianlt.messaging.plugin.mqtt;
|
||||
|
||||
import com.hivemq.client.mqtt.MqttClient;
|
||||
import com.hivemq.client.mqtt.datatypes.MqttQos;
|
||||
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
|
||||
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
|
||||
import de.assecutor.votianlt.messaging.plugin.*;
|
||||
import de.assecutor.votianlt.messaging.plugin.ConnectionStateEvent.ConnectionState;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* MQTT implementation of the MessagingPlugin interface.
|
||||
* Uses HiveMQ MQTT 5 client for communication.
|
||||
*
|
||||
* Topic Structure (managed internally):
|
||||
* - Server -> Client: /client/{clientId}/{messageType}
|
||||
* - Client -> Server: /server/{clientId}/{messageType}
|
||||
* - ACK Server -> Client: /ack/client/{clientId}/{messageId}
|
||||
* - ACK Client -> Server: /ack/server/{messageId}
|
||||
*/
|
||||
@Slf4j
|
||||
public class MqttMessagingPlugin implements MessagingPlugin {
|
||||
|
||||
private static final String PLUGIN_NAME = "mqtt";
|
||||
private static final String PLUGIN_VERSION = "2.0.0";
|
||||
|
||||
// Topic templates
|
||||
private static final String TOPIC_TO_CLIENT = "/client/%s/%s"; // /client/{clientId}/{messageType}
|
||||
private static final String TOPIC_FROM_CLIENT = "/server/%s/%s"; // /server/{clientId}/{messageType}
|
||||
private static final String TOPIC_ACK_TO_CLIENT = "/ack/client/%s/%s"; // /ack/client/{clientId}/{messageId}
|
||||
private static final String TOPIC_ACK_FROM_CLIENT = "/ack/server/%s"; // /ack/server/{messageId}
|
||||
|
||||
// Subscription patterns
|
||||
private static final String PATTERN_FROM_CLIENT = "/server/+/%s"; // /server/+/{messageType}
|
||||
private static final String PATTERN_ACK_FROM_CLIENT = "/ack/server/+"; // /ack/server/+
|
||||
|
||||
private Mqtt5AsyncClient mqttClient;
|
||||
private ConnectionStateListener connectionListener;
|
||||
private final Map<String, ClientMessageHandler> messageHandlers = new ConcurrentHashMap<>();
|
||||
private AckHandler ackHandler;
|
||||
private volatile boolean connected = false;
|
||||
|
||||
// Configuration keys
|
||||
private static final String CONFIG_BROKER_HOST = "broker.host";
|
||||
private static final String CONFIG_BROKER_PORT = "broker.port";
|
||||
private static final String CONFIG_USERNAME = "username";
|
||||
private static final String CONFIG_PASSWORD = "password";
|
||||
private static final String CONFIG_CLIENT_ID = "client.id";
|
||||
private static final String CONFIG_AUTO_RECONNECT = "auto.reconnect";
|
||||
private static final String CONFIG_CLEAN_START = "clean.start";
|
||||
private static final String CONFIG_CONNECTION_TIMEOUT = "connection.timeout.seconds";
|
||||
private static final String CONFIG_KEEP_ALIVE = "keep.alive.seconds";
|
||||
|
||||
@Override
|
||||
public void init(PluginConfig config) throws PluginException {
|
||||
log.info("[MqttPlugin] Initializing MQTT plugin");
|
||||
|
||||
try {
|
||||
notifyConnectionState(ConnectionState.INITIALIZING, null);
|
||||
|
||||
// Extract configuration
|
||||
String brokerHost = config.getString(CONFIG_BROKER_HOST, "localhost");
|
||||
int brokerPort = config.getInt(CONFIG_BROKER_PORT, 1883);
|
||||
String username = config.getString(CONFIG_USERNAME);
|
||||
String password = config.getString(CONFIG_PASSWORD);
|
||||
String clientId = config.getString(CONFIG_CLIENT_ID, "votianlt-" + UUID.randomUUID());
|
||||
boolean cleanStart = config.getBoolean(CONFIG_CLEAN_START, true);
|
||||
int connectionTimeout = config.getInt(CONFIG_CONNECTION_TIMEOUT, 60);
|
||||
int keepAlive = config.getInt(CONFIG_KEEP_ALIVE, 60);
|
||||
|
||||
log.info("[MqttPlugin] Connecting to {}:{} with clientId: {} (timeout: {}s, keepAlive: {}s)",
|
||||
brokerHost, brokerPort, clientId, connectionTimeout, keepAlive);
|
||||
|
||||
// Build MQTT client
|
||||
var clientBuilder = MqttClient.builder()
|
||||
.useMqttVersion5()
|
||||
.identifier(clientId)
|
||||
.serverHost(brokerHost)
|
||||
.serverPort(brokerPort)
|
||||
.automaticReconnect()
|
||||
.initialDelay(1, java.util.concurrent.TimeUnit.SECONDS)
|
||||
.maxDelay(30, java.util.concurrent.TimeUnit.SECONDS)
|
||||
.applyAutomaticReconnect();
|
||||
|
||||
mqttClient = clientBuilder.buildAsync();
|
||||
|
||||
// Build connect options
|
||||
var connectBuilder = com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect.builder()
|
||||
.cleanStart(cleanStart)
|
||||
.keepAlive(keepAlive);
|
||||
|
||||
if (username != null && password != null) {
|
||||
connectBuilder.simpleAuth()
|
||||
.username(username)
|
||||
.password(password.getBytes(StandardCharsets.UTF_8))
|
||||
.applySimpleAuth();
|
||||
}
|
||||
|
||||
// Connect asynchronously
|
||||
notifyConnectionState(ConnectionState.CONNECTING, null);
|
||||
|
||||
log.info("[MqttPlugin] Starting async connection to {}:{}", brokerHost, brokerPort);
|
||||
|
||||
mqttClient.connect(connectBuilder.build())
|
||||
.orTimeout(connectionTimeout, java.util.concurrent.TimeUnit.SECONDS)
|
||||
.whenComplete((connAck, throwable) -> {
|
||||
if (throwable != null) {
|
||||
String errorMsg = String.format("Connection to %s:%d failed: %s",
|
||||
brokerHost, brokerPort, throwable.getMessage());
|
||||
log.error("[MqttPlugin] {}", errorMsg, throwable);
|
||||
|
||||
// Check for specific error types
|
||||
if (throwable instanceof java.util.concurrent.TimeoutException) {
|
||||
log.error("[MqttPlugin] Connection timeout - broker may be unreachable or firewall blocking connection");
|
||||
} else if (throwable.getCause() instanceof java.net.UnknownHostException) {
|
||||
log.error("[MqttPlugin] Unknown host - DNS resolution failed for {}", brokerHost);
|
||||
} else if (throwable.getCause() instanceof java.net.ConnectException) {
|
||||
log.error("[MqttPlugin] Connection refused - broker may be down or port {} is blocked", brokerPort);
|
||||
}
|
||||
|
||||
connected = false;
|
||||
notifyConnectionState(ConnectionState.ERROR, errorMsg);
|
||||
} else {
|
||||
log.info("[MqttPlugin] Connected successfully - connAck: {}", connAck);
|
||||
connected = true;
|
||||
setupGlobalMessageHandler();
|
||||
log.info("[MqttPlugin] Notifying CONNECTED state");
|
||||
notifyConnectionState(ConnectionState.CONNECTED, null);
|
||||
log.info("[MqttPlugin] CONNECTED state notification sent");
|
||||
}
|
||||
});
|
||||
|
||||
log.info("[MqttPlugin] Initialization complete - connection in progress");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MqttPlugin] Initialization failed: {}", e.getMessage(), e);
|
||||
throw new PluginException("Failed to initialize MQTT plugin", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exit() throws PluginException {
|
||||
log.info("[MqttPlugin] Shutting down MQTT plugin");
|
||||
|
||||
try {
|
||||
notifyConnectionState(ConnectionState.DISCONNECTING, null);
|
||||
|
||||
if (mqttClient != null && connected) {
|
||||
mqttClient.disconnect().join();
|
||||
log.info("[MqttPlugin] Disconnected successfully");
|
||||
}
|
||||
|
||||
connected = false;
|
||||
messageHandlers.clear();
|
||||
ackHandler = null;
|
||||
notifyConnectionState(ConnectionState.DISCONNECTED, null);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MqttPlugin] Shutdown failed: {}", e.getMessage(), e);
|
||||
throw new PluginException("Failed to shutdown MQTT plugin", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConnectionListener(ConnectionStateListener listener) {
|
||||
this.connectionListener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> sendToClient(String clientId, String messageType, byte[] payload, SendOptions options) throws PluginException {
|
||||
if (!connected) {
|
||||
return CompletableFuture.failedFuture(new PluginException("MQTT client is not connected"));
|
||||
}
|
||||
|
||||
String topic = String.format(TOPIC_TO_CLIENT, clientId, messageType);
|
||||
log.debug("[MqttPlugin] Sending to client {} (type: {}) on topic: {}", clientId, messageType, topic);
|
||||
|
||||
return sendToTopic(topic, payload, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> sendAckToClient(String clientId, String messageId, byte[] payload, SendOptions options) throws PluginException {
|
||||
if (!connected) {
|
||||
return CompletableFuture.failedFuture(new PluginException("MQTT client is not connected"));
|
||||
}
|
||||
|
||||
String topic = String.format(TOPIC_ACK_TO_CLIENT, clientId, messageId);
|
||||
log.debug("[MqttPlugin] Sending ACK to client {} for message {} on topic: {}", clientId, messageId, topic);
|
||||
|
||||
return sendToTopic(topic, payload, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerMessageHandler(String messageType, ClientMessageHandler handler) throws PluginException {
|
||||
if (!connected) {
|
||||
throw new PluginException("MQTT client is not connected");
|
||||
}
|
||||
|
||||
String topicPattern = String.format(PATTERN_FROM_CLIENT, messageType);
|
||||
log.info("[MqttPlugin] Registering handler for message type '{}' with pattern: {}", messageType, topicPattern);
|
||||
|
||||
messageHandlers.put(messageType, handler);
|
||||
|
||||
// Subscribe to the topic pattern
|
||||
mqttClient.subscribeWith()
|
||||
.topicFilter(topicPattern)
|
||||
.qos(MqttQos.EXACTLY_ONCE)
|
||||
.send()
|
||||
.whenComplete((subAck, throwable) -> {
|
||||
if (throwable != null) {
|
||||
log.error("[MqttPlugin] Subscription to {} failed: {}", topicPattern, throwable.getMessage());
|
||||
messageHandlers.remove(messageType);
|
||||
} else {
|
||||
log.info("[MqttPlugin] Successfully subscribed to: {}", topicPattern);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerAckHandler(AckHandler handler) throws PluginException {
|
||||
if (!connected) {
|
||||
throw new PluginException("MQTT client is not connected");
|
||||
}
|
||||
|
||||
log.info("[MqttPlugin] Registering ACK handler with pattern: {}", PATTERN_ACK_FROM_CLIENT);
|
||||
|
||||
this.ackHandler = handler;
|
||||
|
||||
// Subscribe to ACK topic pattern
|
||||
mqttClient.subscribeWith()
|
||||
.topicFilter(PATTERN_ACK_FROM_CLIENT)
|
||||
.qos(MqttQos.EXACTLY_ONCE)
|
||||
.send()
|
||||
.whenComplete((subAck, throwable) -> {
|
||||
if (throwable != null) {
|
||||
log.error("[MqttPlugin] Subscription to {} failed: {}", PATTERN_ACK_FROM_CLIENT, throwable.getMessage());
|
||||
this.ackHandler = null;
|
||||
} else {
|
||||
log.info("[MqttPlugin] Successfully subscribed to: {}", PATTERN_ACK_FROM_CLIENT);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return connected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPluginName() {
|
||||
return PLUGIN_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPluginVersion() {
|
||||
return PLUGIN_VERSION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginMetadata getMetadata() {
|
||||
return PluginMetadata.builder()
|
||||
.name(PLUGIN_NAME)
|
||||
.version(PLUGIN_VERSION)
|
||||
.description("MQTT v5 messaging plugin using HiveMQ client")
|
||||
.supportsWildcards(true)
|
||||
.supportsRetainedMessages(true)
|
||||
.supportsQos(true)
|
||||
.maxQosLevel(2)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup global message handler to route incoming messages to registered handlers.
|
||||
*/
|
||||
private void setupGlobalMessageHandler() {
|
||||
mqttClient.publishes(com.hivemq.client.mqtt.MqttGlobalPublishFilter.ALL, publish -> {
|
||||
handleIncomingMessage(publish);
|
||||
});
|
||||
|
||||
log.info("[MqttPlugin] Global message handler configured");
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming MQTT message and route to appropriate handler.
|
||||
*/
|
||||
private void handleIncomingMessage(Mqtt5Publish publish) {
|
||||
String topic = publish.getTopic().toString();
|
||||
byte[] payload = publish.getPayloadAsBytes();
|
||||
|
||||
log.debug("[MqttPlugin] Received message on topic: {}", topic);
|
||||
|
||||
try {
|
||||
// Check if it's an ACK message
|
||||
if (topic.startsWith("/ack/server/")) {
|
||||
handleAckMessage(topic, payload);
|
||||
}
|
||||
// Check if it's a client message
|
||||
else if (topic.startsWith("/server/")) {
|
||||
handleClientMessage(topic, payload);
|
||||
}
|
||||
else {
|
||||
log.warn("[MqttPlugin] Received message on unexpected topic: {}", topic);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[MqttPlugin] Error handling message on topic {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Handle ACK message from client.
|
||||
* Topic format: /ack/server/{messageId}
|
||||
*/
|
||||
private void handleAckMessage(String topic, byte[] payload) {
|
||||
if (ackHandler == null) {
|
||||
log.warn("[MqttPlugin] Received ACK but no handler registered: {}", topic);
|
||||
return;
|
||||
}
|
||||
|
||||
// Extract messageId from topic
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length >= 4) {
|
||||
String messageId = parts[3];
|
||||
log.debug("[MqttPlugin] Routing ACK for message: {}", messageId);
|
||||
ackHandler.onAckReceived(messageId, payload);
|
||||
} else {
|
||||
log.warn("[MqttPlugin] Invalid ACK topic format: {}", topic);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle client message.
|
||||
* Topic format: /server/{clientId}/{messageType}
|
||||
*/
|
||||
private void handleClientMessage(String topic, byte[] payload) {
|
||||
// Extract clientId and messageType from topic
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length >= 4) {
|
||||
String clientId = parts[2];
|
||||
String messageType = parts[3];
|
||||
|
||||
ClientMessageHandler handler = messageHandlers.get(messageType);
|
||||
if (handler != null) {
|
||||
log.debug("[MqttPlugin] Routing message from client {} (type: {})", clientId, messageType);
|
||||
handler.onMessageReceived(clientId, payload);
|
||||
} else {
|
||||
log.warn("[MqttPlugin] No handler registered for message type: {}", messageType);
|
||||
}
|
||||
} else {
|
||||
log.warn("[MqttPlugin] Invalid client message topic format: {}", topic);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send message to a specific MQTT topic.
|
||||
*/
|
||||
private CompletableFuture<Void> sendToTopic(String topic, byte[] payload, SendOptions options) {
|
||||
try {
|
||||
var publishBuilder = Mqtt5Publish.builder()
|
||||
.topic(topic)
|
||||
.payload(payload)
|
||||
.qos(mapQos(options.getQos()))
|
||||
.retain(options.isRetained());
|
||||
|
||||
return mqttClient.publish(publishBuilder.build())
|
||||
.thenApply(publishResult -> {
|
||||
log.debug("[MqttPlugin] Message published to topic: {}", topic);
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("[MqttPlugin] Failed to publish to topic {}: {}", topic, e.getMessage(), e);
|
||||
return CompletableFuture.failedFuture(new PluginException("Failed to publish message", e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Map QoS level to MQTT QoS.
|
||||
*/
|
||||
private MqttQos mapQos(int qos) {
|
||||
return switch (qos) {
|
||||
case 0 -> MqttQos.AT_MOST_ONCE;
|
||||
case 1 -> MqttQos.AT_LEAST_ONCE;
|
||||
case 2 -> MqttQos.EXACTLY_ONCE;
|
||||
default -> MqttQos.AT_LEAST_ONCE;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify connection state listener.
|
||||
*/
|
||||
private void notifyConnectionState(ConnectionState state, String message) {
|
||||
log.debug("[MqttPlugin] notifyConnectionState called: state={}, listener={}", state, connectionListener != null ? "present" : "null");
|
||||
if (connectionListener != null) {
|
||||
ConnectionStateEvent event = ConnectionStateEvent.builder()
|
||||
.state(state)
|
||||
.previousState(null)
|
||||
.errorMessage(message)
|
||||
.pluginName(PLUGIN_NAME)
|
||||
.build();
|
||||
try {
|
||||
log.debug("[MqttPlugin] Calling connectionListener.onConnectionStateChanged");
|
||||
connectionListener.onConnectionStateChanged(event);
|
||||
log.debug("[MqttPlugin] connectionListener.onConnectionStateChanged completed");
|
||||
} catch (Exception e) {
|
||||
log.error("[MqttPlugin] Error in connection listener: {}", e.getMessage(), e);
|
||||
}
|
||||
} else {
|
||||
log.warn("[MqttPlugin] Connection listener is null, cannot notify state: {}", state);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Kept for compatibility: The actual MQTT v5 lifecycle is managed by
|
||||
* MqttV5ClientManager. This runner only logs application readiness.
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MqttClientRunner {
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void onApplicationReady() {
|
||||
log.info("Application ready. MQTT v5 client lifecycle managed by MqttV5ClientManager.");
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,8 @@ package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
|
||||
import de.assecutor.votianlt.messaging.model.DeliveryOptions;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
@@ -9,6 +11,9 @@ import org.springframework.context.annotation.Lazy;
|
||||
/**
|
||||
* Simple MQTT publishing helper to send JSON payloads.
|
||||
*
|
||||
* This implementation now uses MessageDeliveryService for reliable delivery
|
||||
* with acknowledgment tracking and retry mechanism.
|
||||
*
|
||||
* Note: In environments where Spring Integration MQTT is unavailable (e.g.,
|
||||
* offline CI), this implementation degrades to a no-op publisher that logs the
|
||||
* intended message.
|
||||
@@ -24,10 +29,10 @@ public interface MqttPublisher {
|
||||
class MqttPublisherImpl implements MqttPublisher {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final MqttV5ClientManager clientManager;
|
||||
private final MessageDeliveryService deliveryService;
|
||||
|
||||
public MqttPublisherImpl(@Lazy MqttV5ClientManager clientManager) {
|
||||
this.clientManager = clientManager;
|
||||
public MqttPublisherImpl(@Lazy MessageDeliveryService deliveryService) {
|
||||
this.deliveryService = deliveryService;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
@@ -40,20 +45,37 @@ class MqttPublisherImpl implements MqttPublisher {
|
||||
@Override
|
||||
public void publishAsJson(String topic, Object payload, boolean retained) {
|
||||
try {
|
||||
String json = (payload instanceof String s) ? s : objectMapper.writeValueAsString(payload);
|
||||
byte[] bytes = json.getBytes(java.nio.charset.StandardCharsets.UTF_8);
|
||||
// Default QoS 2
|
||||
clientManager.publish(topic, bytes, 2, retained);
|
||||
// Use MessageDeliveryService for reliable delivery
|
||||
DeliveryOptions options = DeliveryOptions.builder()
|
||||
.requiresAck(true)
|
||||
.retained(retained)
|
||||
.build();
|
||||
|
||||
// Log all published JSON documents
|
||||
log.info("=== MQTT JSON PUBLISHED ===");
|
||||
log.info("Topic: {}", topic);
|
||||
log.info("Retained: {}", retained);
|
||||
log.info("JSON Data: {}", json);
|
||||
log.info("=== END MQTT PUBLISH ===");
|
||||
deliveryService.sendMessage(topic, payload, options)
|
||||
.thenAccept(receipt -> {
|
||||
log.info("=== MESSAGE DELIVERY SUBMITTED ===");
|
||||
log.info("Topic: {}", topic);
|
||||
log.info("Message ID: {}", receipt.getMessageId());
|
||||
log.info("Status: {}", receipt.getStatus());
|
||||
log.info("Retained: {}", retained);
|
||||
|
||||
// Log payload for debugging
|
||||
try {
|
||||
String json = (payload instanceof String s) ? s : objectMapper.writeValueAsString(payload);
|
||||
log.info("Payload: {}", json);
|
||||
} catch (Exception e) {
|
||||
log.debug("Could not serialize payload for logging: {}", e.getMessage());
|
||||
}
|
||||
|
||||
log.info("=== END MESSAGE DELIVERY ===");
|
||||
})
|
||||
.exceptionally(ex -> {
|
||||
log.error("Failed to submit message for delivery to topic {}: {}", topic, ex.getMessage(), ex);
|
||||
return null;
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to serialize/publish MQTT message for topic {}: {}", topic, e.getMessage(), e);
|
||||
log.error("Failed to publish message for topic {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,288 +0,0 @@
|
||||
package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
|
||||
import com.hivemq.client.mqtt.datatypes.MqttQos;
|
||||
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
|
||||
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
|
||||
import de.assecutor.votianlt.config.MqttProperties;
|
||||
import de.assecutor.votianlt.controller.MessageController;
|
||||
import de.assecutor.votianlt.model.PendingMqttMessage;
|
||||
import de.assecutor.votianlt.repository.PendingMqttMessageRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Manages a single MQTT v5 client connection with Spring lifecycle using HiveMQ
|
||||
* MQTT Client.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class MqttV5ClientManager implements SmartLifecycle {
|
||||
|
||||
private final MqttProperties props;
|
||||
private final MessageController messageController;
|
||||
private final PendingMqttMessageRepository pendingMessageRepository;
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
private volatile boolean running = false;
|
||||
private Mqtt5AsyncClient client;
|
||||
|
||||
public MqttV5ClientManager(MqttProperties props, @Lazy MessageController messageController, PendingMqttMessageRepository pendingMessageRepository) {
|
||||
this.props = props;
|
||||
this.messageController = messageController;
|
||||
this.pendingMessageRepository = pendingMessageRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (!props.isEnabled()) {
|
||||
log.warn("MQTT is disabled via app.mqtt.enabled=false");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
String clientId = buildClientId(props.getClientId());
|
||||
URI uri = URI.create(props.getBrokerUri());
|
||||
String host = uri.getHost();
|
||||
int port = 42099;
|
||||
|
||||
var builder = Mqtt5Client.builder().identifier(clientId).serverHost(host).serverPort(port);
|
||||
if (props.isAutomaticReconnect()) {
|
||||
builder = builder.automaticReconnectWithDefaultConfig();
|
||||
}
|
||||
client = builder.buildAsync();
|
||||
|
||||
var connect = client.connectWith().cleanStart(props.isCleanStart()).keepAlive(props.getKeepAlive())
|
||||
.sessionExpiryInterval(props.getSessionExpiryInterval()).simpleAuth()
|
||||
.username("app")
|
||||
.password("apppwd".getBytes(StandardCharsets.UTF_8)).applySimpleAuth();
|
||||
|
||||
log.info("[MQTT] Connecting to {} with clientId={} ...", props.getBrokerUri(), clientId);
|
||||
connect.send().join();
|
||||
log.info("[MQTT] Connected");
|
||||
|
||||
// Handle all incoming publishes
|
||||
client.publishes(MqttGlobalPublishFilter.ALL, publish -> {
|
||||
String topic = publish.getTopic().toString();
|
||||
byte[] bytes;
|
||||
try {
|
||||
ByteBuffer buf = publish.getPayload().orElse(null);
|
||||
bytes = buf != null ? toByteArray(buf) : new byte[0];
|
||||
} catch (Throwable t) {
|
||||
bytes = new byte[0];
|
||||
}
|
||||
handleInbound(topic, bytes);
|
||||
});
|
||||
|
||||
// Subscribe to topics with QoS
|
||||
String[] topics = new String[] { "/server/+/task/photo/completed", "/server/+/task/confirm",
|
||||
"/server/+/task/completed", "/server/+/task_completed", "/server/+/job/status",
|
||||
"/server/+/jobs/assigned", "/server/+/message", "/server/login" };
|
||||
MqttQos qos = mapQos(props.getDefaultQos());
|
||||
for (String topic : topics) {
|
||||
client.subscribeWith().topicFilter(topic).qos(qos).send().join();
|
||||
}
|
||||
running = true;
|
||||
log.info("[MQTT] Subscribed to {} topics (QoS={}), awaiting messages ...", topics.length, qos);
|
||||
|
||||
// Process pending messages after successful connection
|
||||
processPendingMessages();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to start HiveMQ MQTT client: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] toByteArray(ByteBuffer buffer) {
|
||||
byte[] bytes = new byte[buffer.remaining()];
|
||||
buffer.get(bytes);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private String buildClientId(String base) {
|
||||
String b = (base == null || base.isBlank()) ? "server" : base;
|
||||
if (!b.contains("${random.uuid}")) {
|
||||
return b + "-" + UUID.randomUUID();
|
||||
}
|
||||
return b.replace("${random.uuid}", UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
private void handleInbound(String topic, byte[] payload) {
|
||||
String json = new String(payload, StandardCharsets.UTF_8);
|
||||
try {
|
||||
Map<String, Object> map = objectMapper.readValue(json, new TypeReference<Map<String, Object>>() {
|
||||
});
|
||||
routeInbound(topic, map);
|
||||
} catch (Exception ex) {
|
||||
log.error("Failed to parse inbound MQTT JSON on {}: {}", topic, ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void routeInbound(String topic, Map<String, Object> payload) {
|
||||
try {
|
||||
// The consolidated topic /server/{clientId}/task_completed is used by apps to
|
||||
// report completion of any task type. Only PHOTO and CONFIRMATION require
|
||||
// specialized processing on the server side. All other task types are handled
|
||||
// by the
|
||||
// generic handler handleTaskCompleted(). This keeps routing simple while
|
||||
// allowing
|
||||
// special logic (e.g., photo persistence) where necessary.
|
||||
if (topic.matches("/server/.+/task_completed")) {
|
||||
try {
|
||||
Object tt = payload.get("taskType");
|
||||
String taskType = tt != null ? tt.toString() : null;
|
||||
messageController.handleTaskCompleted(payload, taskType);
|
||||
} catch (Exception e) {
|
||||
log.error("Error routing task_completed by taskType: {}", e.getMessage(), e);
|
||||
}
|
||||
} else if (topic.matches("/server/.+/jobs/assigned")) {
|
||||
try {
|
||||
// Extract clientId from topic: /server/{clientId}/jobs/assigned
|
||||
String[] parts = topic.split("/");
|
||||
String clientId = parts.length > 2 ? parts[2] : null;
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
payload.put("clientId", clientId);
|
||||
} else {
|
||||
log.warn("Couldn't extract clientId from topic {} for jobs/assigned", topic);
|
||||
}
|
||||
messageController.handleGetAssignedJobs(payload);
|
||||
} catch (Exception e) {
|
||||
log.error("Error handling jobs/assigned on {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
} else if (topic.equals("/server/login")) {
|
||||
var om = new ObjectMapper();
|
||||
de.assecutor.votianlt.dto.AppLoginRequest req = om.convertValue(payload,
|
||||
de.assecutor.votianlt.dto.AppLoginRequest.class);
|
||||
messageController.handleAppLogin(req);
|
||||
} else if (topic.matches("/server/.+/message")) {
|
||||
try {
|
||||
// Extract clientId from topic: /server/{clientId}/message
|
||||
String[] parts = topic.split("/");
|
||||
String clientId = parts.length > 2 ? parts[2] : null;
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
payload.put("clientId", clientId);
|
||||
} else {
|
||||
log.warn("Couldn't extract clientId from topic {} for message", topic);
|
||||
}
|
||||
messageController.handleIncomingMessage(payload);
|
||||
} catch (Exception e) {
|
||||
log.error("Error handling incoming message on {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
} else {
|
||||
log.debug("No route for topic {}", topic);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error routing inbound MQTT message on {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
try {
|
||||
if (client != null) {
|
||||
client.disconnect().join();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Error during MQTT client shutdown: {}", e.getMessage());
|
||||
} finally {
|
||||
running = false;
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
private MqttQos mapQos(int q) {
|
||||
return switch (q) {
|
||||
case 0 -> MqttQos.AT_MOST_ONCE;
|
||||
case 1 -> MqttQos.AT_LEAST_ONCE;
|
||||
default -> MqttQos.EXACTLY_ONCE;
|
||||
};
|
||||
}
|
||||
|
||||
public void publish(String topic, byte[] payload, int qos, boolean retained) {
|
||||
try {
|
||||
if (client == null || !running) {
|
||||
log.warn("[MQTT] Not connected, saving message for later: topic={}", topic);
|
||||
savePendingMessage(topic, payload, qos, retained);
|
||||
return;
|
||||
}
|
||||
client.publishWith().topic(topic).payload(payload).qos(mapQos(qos)).retain(retained).send()
|
||||
.whenComplete((ack, ex) -> {
|
||||
if (ex != null) {
|
||||
log.error("Failed to publish to {}: {}, saving for retry", topic, ex.getMessage(), ex);
|
||||
savePendingMessage(topic, payload, qos, retained);
|
||||
} else {
|
||||
log.debug("Successfully published to topic: {}", topic);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to publish to {}: {}, saving for retry", topic, e.getMessage(), e);
|
||||
savePendingMessage(topic, payload, qos, retained);
|
||||
}
|
||||
}
|
||||
|
||||
private void savePendingMessage(String topic, byte[] payload, int qos, boolean retained) {
|
||||
try {
|
||||
PendingMqttMessage pendingMessage = new PendingMqttMessage(topic, payload, qos, retained);
|
||||
pendingMessageRepository.save(pendingMessage);
|
||||
log.info("[MQTT] Saved pending message for topic: {}", topic);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to save pending MQTT message: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processPendingMessages() {
|
||||
try {
|
||||
List<PendingMqttMessage> pendingMessages = pendingMessageRepository.findAllByOrderByCreatedAtAsc();
|
||||
if (pendingMessages.isEmpty()) {
|
||||
log.debug("[MQTT] No pending messages to process");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[MQTT] Processing {} pending messages", pendingMessages.size());
|
||||
for (PendingMqttMessage pendingMessage : pendingMessages) {
|
||||
try {
|
||||
// Attempt to send the pending message
|
||||
client.publishWith()
|
||||
.topic(pendingMessage.getTopic())
|
||||
.payload(pendingMessage.getPayload())
|
||||
.qos(mapQos(pendingMessage.getQos()))
|
||||
.retain(pendingMessage.isRetained())
|
||||
.send()
|
||||
.whenComplete((ack, ex) -> {
|
||||
if (ex != null) {
|
||||
log.warn("Failed to resend pending message to {}: {}", pendingMessage.getTopic(), ex.getMessage());
|
||||
// Update retry count
|
||||
pendingMessage.incrementRetryCount();
|
||||
pendingMessageRepository.save(pendingMessage);
|
||||
} else {
|
||||
// Successfully sent, remove from pending
|
||||
log.info("Successfully resent pending message to topic: {}", pendingMessage.getTopic());
|
||||
pendingMessageRepository.delete(pendingMessage);
|
||||
}
|
||||
}).join(); // Wait for completion to process sequentially
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("Error processing pending message for {}: {}", pendingMessage.getTopic(), e.getMessage());
|
||||
pendingMessage.incrementRetryCount();
|
||||
pendingMessageRepository.save(pendingMessage);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error processing pending MQTT messages: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package de.assecutor.votianlt.repository;
|
||||
|
||||
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.springframework.data.mongodb.repository.MongoRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Repository for MessageEnvelope entities.
|
||||
*/
|
||||
@Repository
|
||||
public interface MessageEnvelopeRepository extends MongoRepository<MessageEnvelope, ObjectId> {
|
||||
|
||||
/**
|
||||
* Find envelope by message ID
|
||||
*/
|
||||
Optional<MessageEnvelope> findByMessageId(String messageId);
|
||||
|
||||
/**
|
||||
* Find all envelopes for a specific topic
|
||||
*/
|
||||
List<MessageEnvelope> findByTopic(String topic);
|
||||
|
||||
/**
|
||||
* Find expired envelopes
|
||||
*/
|
||||
List<MessageEnvelope> findByExpiresAtBefore(LocalDateTime dateTime);
|
||||
|
||||
/**
|
||||
* Find envelopes created after a specific time
|
||||
*/
|
||||
List<MessageEnvelope> findByTimestampAfter(LocalDateTime dateTime);
|
||||
|
||||
/**
|
||||
* Delete envelopes older than specified time
|
||||
*/
|
||||
void deleteByTimestampBefore(LocalDateTime dateTime);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package de.assecutor.votianlt.repository;
|
||||
|
||||
import de.assecutor.votianlt.messaging.model.DeliveryStatus;
|
||||
import de.assecutor.votianlt.messaging.model.PendingDelivery;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.springframework.data.mongodb.repository.MongoRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Repository for PendingDelivery entities.
|
||||
*/
|
||||
@Repository
|
||||
public interface PendingDeliveryRepository extends MongoRepository<PendingDelivery, ObjectId> {
|
||||
|
||||
/**
|
||||
* Find pending delivery by message ID
|
||||
*/
|
||||
Optional<PendingDelivery> findByMessageId(String messageId);
|
||||
|
||||
/**
|
||||
* Find all deliveries with a specific status
|
||||
*/
|
||||
List<PendingDelivery> findByStatus(DeliveryStatus status);
|
||||
|
||||
/**
|
||||
* Find deliveries ready for retry (status = SENT and nextRetryAt is in the past)
|
||||
*/
|
||||
List<PendingDelivery> findByStatusAndNextRetryAtBefore(DeliveryStatus status, LocalDateTime dateTime);
|
||||
|
||||
/**
|
||||
* Find acknowledged deliveries older than specified time
|
||||
*/
|
||||
List<PendingDelivery> findByStatusAndAcknowledgedAtBefore(DeliveryStatus status, LocalDateTime dateTime);
|
||||
|
||||
/**
|
||||
* Find deliveries with specific statuses that have expired
|
||||
*/
|
||||
List<PendingDelivery> findByStatusInAndExpiresAtBefore(List<DeliveryStatus> statuses, LocalDateTime dateTime);
|
||||
|
||||
/**
|
||||
* Find all deliveries for a specific client
|
||||
*/
|
||||
List<PendingDelivery> findByClientId(String clientId);
|
||||
|
||||
/**
|
||||
* Find all deliveries for a specific topic
|
||||
*/
|
||||
List<PendingDelivery> findByTopic(String topic);
|
||||
|
||||
/**
|
||||
* Count deliveries by status
|
||||
*/
|
||||
long countByStatus(DeliveryStatus status);
|
||||
|
||||
/**
|
||||
* Delete deliveries older than specified time
|
||||
*/
|
||||
void deleteByCreatedAtBefore(LocalDateTime dateTime);
|
||||
}
|
||||
|
||||
@@ -54,24 +54,27 @@ spring.servlet.multipart.max-request-size=64MB
|
||||
# Jackson message converter limits
|
||||
spring.jackson.default-property-inclusion=non_null
|
||||
|
||||
# MQTT v5 settings
|
||||
app.mqtt.enabled=true
|
||||
#app.mqtt.broker-uri=mqtt://192.168.180.26:1883
|
||||
app.mqtt.broker-uri=mqtt://mqtt-2.assecutor.de
|
||||
# The server MQTT clientId; a random UUID suffix will be inserted where ${random.uuid} appears
|
||||
app.mqtt.client-id=server-${random.uuid}
|
||||
# v5 session and keepalive
|
||||
app.mqtt.clean-start=false
|
||||
app.mqtt.session-expiry-interval=86400
|
||||
app.mqtt.keep-alive=30
|
||||
app.mqtt.max-inflight=50
|
||||
app.mqtt.automatic-reconnect=true
|
||||
# Defaults for publishing
|
||||
app.mqtt.default-qos=2
|
||||
app.mqtt.default-retained=false
|
||||
|
||||
# 2FA Configuration
|
||||
app.security.two-factor.enabled=false
|
||||
|
||||
# Message Delivery Layer Configuration
|
||||
app.messaging.delivery.max-retries=3
|
||||
app.messaging.delivery.retry-initial-delay=5s
|
||||
app.messaging.delivery.retry-max-delay=5m
|
||||
app.messaging.delivery.retry-backoff-multiplier=2.0
|
||||
app.messaging.delivery.ack-timeout=30s
|
||||
app.messaging.delivery.message-expiry=24h
|
||||
app.messaging.delivery.cleanup-interval-minutes=60
|
||||
app.messaging.delivery.retry-interval-seconds=30
|
||||
app.messaging.delivery.acknowledged-retention-days=7
|
||||
|
||||
# Messaging Plugin Configuration
|
||||
app.messaging.plugin.type=mqtt
|
||||
app.messaging.plugin.mqtt.broker.host=mqtt-2.assecutor.de
|
||||
app.messaging.plugin.mqtt.broker.port=42099
|
||||
app.messaging.plugin.mqtt.username=app
|
||||
app.messaging.plugin.mqtt.password=apppwd
|
||||
app.messaging.plugin.mqtt.client.id=votianlt-server
|
||||
|
||||
# Application Version - automatically set from pom.xml during build
|
||||
app.version=@project.version@
|
||||
Reference in New Issue
Block a user