diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java index 0d75d05..c3a00cf 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java +++ b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java @@ -7,14 +7,24 @@ import de.assecutor.votianlt.messaging.plugin.SendOptions; import de.assecutor.votianlt.repository.MessageEnvelopeRepository; import de.assecutor.votianlt.repository.PendingDeliveryRepository; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Implementation of MessageDeliveryService with reliable delivery guarantees. @@ -23,6 +33,12 @@ import java.util.concurrent.CompletableFuture; @Slf4j public class MessageDeliveryServiceImpl implements MessageDeliveryService { + @Value("${app.messaging.delivery.ack-retry-interval-seconds:5}") + private int ackRetryIntervalSeconds; + + @Value("${app.messaging.delivery.ack-max-retries:4}") + private int ackMaxRetries; + private final PluginManager pluginManager; private final PendingDeliveryRepository pendingDeliveryRepository; private final MessageEnvelopeRepository envelopeRepository; @@ -30,6 +46,31 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { private final DeliveryConfig config; private final ObjectMapper objectMapper; + // In-memory tracking of messages awaiting acknowledgment + private final Map pendingAckMessages = new ConcurrentHashMap<>(); + private ScheduledExecutorService retryScheduler; + + /** + * Holds information about a message pending acknowledgment. + */ + private static class PendingAckMessage { + final String messageId; + final String clientId; + final String messageType; + final byte[] envelopeData; + final String json; + int retryCount; + + PendingAckMessage(String messageId, String clientId, String messageType, byte[] envelopeData, String json) { + this.messageId = messageId; + this.clientId = clientId; + this.messageType = messageType; + this.envelopeData = envelopeData; + this.json = json; + this.retryCount = 0; + } + } + public MessageDeliveryServiceImpl( PluginManager pluginManager, PendingDeliveryRepository pendingDeliveryRepository, @@ -45,6 +86,86 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { this.objectMapper = objectMapper; } + @PostConstruct + public void startRetryScheduler() { + retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "message-retry-scheduler"); + t.setDaemon(true); + return t; + }); + retryScheduler.scheduleAtFixedRate( + this::checkAndRetryPendingAckMessages, + ackRetryIntervalSeconds, + ackRetryIntervalSeconds, + TimeUnit.SECONDS + ); + log.info("[MessageDelivery] Started retry scheduler (interval: {}s, max retries: {})", + ackRetryIntervalSeconds, ackMaxRetries); + } + + @PreDestroy + public void stopRetryScheduler() { + if (retryScheduler != null) { + retryScheduler.shutdown(); + try { + if (!retryScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + retryScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + retryScheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + log.info("[MessageDelivery] Stopped retry scheduler"); + } + } + + /** + * Check for pending messages and retry sending them. + * Called every RETRY_INTERVAL_SECONDS seconds. + */ + private void checkAndRetryPendingAckMessages() { + if (pendingAckMessages.isEmpty()) { + return; + } + + log.debug("[MessageDelivery] Checking {} pending messages for retry", pendingAckMessages.size()); + + List toRemove = new ArrayList<>(); + + for (PendingAckMessage pending : pendingAckMessages.values()) { + pending.retryCount++; + + if (pending.retryCount > ackMaxRetries) { + // Max retries reached - log and remove + log.error("[MessageDelivery] Message {} not acknowledged after {} attempts. Removing from retry queue. JSON: {}", + pending.messageId, ackMaxRetries, pending.json); + toRemove.add(pending.messageId); + } else { + // Retry sending + log.info("[MessageDelivery] Retrying message {} (attempt {}/{})", + pending.messageId, pending.retryCount, ackMaxRetries); + + try { + SendOptions sendOptions = SendOptions.reliable(); + pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions) + .thenAccept(v -> log.info("[MessageDelivery] Retry {} sent successfully for message {}", + pending.retryCount, pending.messageId)) + .exceptionally(ex -> { + log.error("[MessageDelivery] Retry {} failed for message {}: {}", + pending.retryCount, pending.messageId, ex.getMessage()); + return null; + }); + } catch (Exception e) { + log.error("[MessageDelivery] Error during retry for message {}: {}", + pending.messageId, e.getMessage(), e); + } + } + } + + // Remove messages that exceeded max retries + toRemove.forEach(pendingAckMessages::remove); + } + @Override public CompletableFuture sendToClient(String clientId, String messageType, Object payload, DeliveryOptions options) { try { @@ -76,6 +197,11 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { ); pendingDeliveryRepository.save(pending); log.debug("[MessageDelivery] Created pending delivery for message {}", messageId); + + // Add to in-memory retry queue + PendingAckMessage pendingAck = new PendingAckMessage(messageId, clientId, messageType, envelopeData, json); + pendingAckMessages.put(messageId, pendingAck); + log.info("[MessageDelivery] Added message {} to retry queue (requires ACK)", messageId); } // Send via plugin manager @@ -153,13 +279,19 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { @Override public void handleAcknowledgment(AcknowledgmentMessage ack) { try { - log.info("[MessageDelivery] Received acknowledgment for message {} with status {}", + log.info("[MessageDelivery] Received acknowledgment for message {} with status {}", ack.getMessageId(), ack.getStatus()); + // Remove from in-memory retry queue + PendingAckMessage removed = pendingAckMessages.remove(ack.getMessageId()); + if (removed != null) { + log.info("[MessageDelivery] Removed message {} from retry queue (ACK received)", ack.getMessageId()); + } + Optional pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId()); - + if (pendingOpt.isEmpty()) { - log.warn("[MessageDelivery] No pending delivery found for acknowledged message {}", + log.warn("[MessageDelivery] No pending delivery found for acknowledged message {}", ack.getMessageId()); return; } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2386dc4..822139b 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -67,6 +67,8 @@ app.messaging.delivery.message-expiry=24h app.messaging.delivery.cleanup-interval-minutes=60 app.messaging.delivery.retry-interval-seconds=30 app.messaging.delivery.acknowledged-retention-days=7 +app.messaging.delivery.ack-retry-interval-seconds=5 +app.messaging.delivery.ack-max-retries=4 # Messaging Plugin Configuration app.messaging.plugin.type=mqtt