Erweiterungen
This commit is contained in:
@@ -7,14 +7,24 @@ import de.assecutor.votianlt.messaging.plugin.SendOptions;
|
||||
import de.assecutor.votianlt.repository.MessageEnvelopeRepository;
|
||||
import de.assecutor.votianlt.repository.PendingDeliveryRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Implementation of MessageDeliveryService with reliable delivery guarantees.
|
||||
@@ -23,6 +33,12 @@ import java.util.concurrent.CompletableFuture;
|
||||
@Slf4j
|
||||
public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
|
||||
@Value("${app.messaging.delivery.ack-retry-interval-seconds:5}")
|
||||
private int ackRetryIntervalSeconds;
|
||||
|
||||
@Value("${app.messaging.delivery.ack-max-retries:4}")
|
||||
private int ackMaxRetries;
|
||||
|
||||
private final PluginManager pluginManager;
|
||||
private final PendingDeliveryRepository pendingDeliveryRepository;
|
||||
private final MessageEnvelopeRepository envelopeRepository;
|
||||
@@ -30,6 +46,31 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
private final DeliveryConfig config;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
// In-memory tracking of messages awaiting acknowledgment
|
||||
private final Map<String, PendingAckMessage> pendingAckMessages = new ConcurrentHashMap<>();
|
||||
private ScheduledExecutorService retryScheduler;
|
||||
|
||||
/**
|
||||
* Holds information about a message pending acknowledgment.
|
||||
*/
|
||||
private static class PendingAckMessage {
|
||||
final String messageId;
|
||||
final String clientId;
|
||||
final String messageType;
|
||||
final byte[] envelopeData;
|
||||
final String json;
|
||||
int retryCount;
|
||||
|
||||
PendingAckMessage(String messageId, String clientId, String messageType, byte[] envelopeData, String json) {
|
||||
this.messageId = messageId;
|
||||
this.clientId = clientId;
|
||||
this.messageType = messageType;
|
||||
this.envelopeData = envelopeData;
|
||||
this.json = json;
|
||||
this.retryCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
public MessageDeliveryServiceImpl(
|
||||
PluginManager pluginManager,
|
||||
PendingDeliveryRepository pendingDeliveryRepository,
|
||||
@@ -45,6 +86,86 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void startRetryScheduler() {
|
||||
retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r, "message-retry-scheduler");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
retryScheduler.scheduleAtFixedRate(
|
||||
this::checkAndRetryPendingAckMessages,
|
||||
ackRetryIntervalSeconds,
|
||||
ackRetryIntervalSeconds,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
log.info("[MessageDelivery] Started retry scheduler (interval: {}s, max retries: {})",
|
||||
ackRetryIntervalSeconds, ackMaxRetries);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stopRetryScheduler() {
|
||||
if (retryScheduler != null) {
|
||||
retryScheduler.shutdown();
|
||||
try {
|
||||
if (!retryScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
retryScheduler.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
retryScheduler.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
log.info("[MessageDelivery] Stopped retry scheduler");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check for pending messages and retry sending them.
|
||||
* Called every RETRY_INTERVAL_SECONDS seconds.
|
||||
*/
|
||||
private void checkAndRetryPendingAckMessages() {
|
||||
if (pendingAckMessages.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[MessageDelivery] Checking {} pending messages for retry", pendingAckMessages.size());
|
||||
|
||||
List<String> toRemove = new ArrayList<>();
|
||||
|
||||
for (PendingAckMessage pending : pendingAckMessages.values()) {
|
||||
pending.retryCount++;
|
||||
|
||||
if (pending.retryCount > ackMaxRetries) {
|
||||
// Max retries reached - log and remove
|
||||
log.error("[MessageDelivery] Message {} not acknowledged after {} attempts. Removing from retry queue. JSON: {}",
|
||||
pending.messageId, ackMaxRetries, pending.json);
|
||||
toRemove.add(pending.messageId);
|
||||
} else {
|
||||
// Retry sending
|
||||
log.info("[MessageDelivery] Retrying message {} (attempt {}/{})",
|
||||
pending.messageId, pending.retryCount, ackMaxRetries);
|
||||
|
||||
try {
|
||||
SendOptions sendOptions = SendOptions.reliable();
|
||||
pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions)
|
||||
.thenAccept(v -> log.info("[MessageDelivery] Retry {} sent successfully for message {}",
|
||||
pending.retryCount, pending.messageId))
|
||||
.exceptionally(ex -> {
|
||||
log.error("[MessageDelivery] Retry {} failed for message {}: {}",
|
||||
pending.retryCount, pending.messageId, ex.getMessage());
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error during retry for message {}: {}",
|
||||
pending.messageId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove messages that exceeded max retries
|
||||
toRemove.forEach(pendingAckMessages::remove);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload, DeliveryOptions options) {
|
||||
try {
|
||||
@@ -76,6 +197,11 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
);
|
||||
pendingDeliveryRepository.save(pending);
|
||||
log.debug("[MessageDelivery] Created pending delivery for message {}", messageId);
|
||||
|
||||
// Add to in-memory retry queue
|
||||
PendingAckMessage pendingAck = new PendingAckMessage(messageId, clientId, messageType, envelopeData, json);
|
||||
pendingAckMessages.put(messageId, pendingAck);
|
||||
log.info("[MessageDelivery] Added message {} to retry queue (requires ACK)", messageId);
|
||||
}
|
||||
|
||||
// Send via plugin manager
|
||||
@@ -153,13 +279,19 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
@Override
|
||||
public void handleAcknowledgment(AcknowledgmentMessage ack) {
|
||||
try {
|
||||
log.info("[MessageDelivery] Received acknowledgment for message {} with status {}",
|
||||
log.info("[MessageDelivery] Received acknowledgment for message {} with status {}",
|
||||
ack.getMessageId(), ack.getStatus());
|
||||
|
||||
// Remove from in-memory retry queue
|
||||
PendingAckMessage removed = pendingAckMessages.remove(ack.getMessageId());
|
||||
if (removed != null) {
|
||||
log.info("[MessageDelivery] Removed message {} from retry queue (ACK received)", ack.getMessageId());
|
||||
}
|
||||
|
||||
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId());
|
||||
|
||||
|
||||
if (pendingOpt.isEmpty()) {
|
||||
log.warn("[MessageDelivery] No pending delivery found for acknowledged message {}",
|
||||
log.warn("[MessageDelivery] No pending delivery found for acknowledged message {}",
|
||||
ack.getMessageId());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -67,6 +67,8 @@ app.messaging.delivery.message-expiry=24h
|
||||
app.messaging.delivery.cleanup-interval-minutes=60
|
||||
app.messaging.delivery.retry-interval-seconds=30
|
||||
app.messaging.delivery.acknowledged-retention-days=7
|
||||
app.messaging.delivery.ack-retry-interval-seconds=5
|
||||
app.messaging.delivery.ack-max-retries=4
|
||||
|
||||
# Messaging Plugin Configuration
|
||||
app.messaging.plugin.type=mqtt
|
||||
|
||||
Reference in New Issue
Block a user