From 9b838863d9b0e8a63cd1750a4160a9137ce260ea Mon Sep 17 00:00:00 2001 From: Sven Carstensen Date: Wed, 7 Jan 2026 08:52:14 +0100 Subject: [PATCH] Erweiterungen --- .claude/settings.local.json | 3 +- .../delivery/MessageDeliveryServiceImpl.java | 267 +++--------------- .../messaging/model/MessageEnvelope.java | 33 +-- .../pages/view/AuthenticatedStartView.java | 4 +- .../repository/MessageEnvelopeRepository.java | 43 --- .../service/ClientConnectionService.java | 71 +++-- 6 files changed, 83 insertions(+), 338 deletions(-) delete mode 100644 src/main/java/de/assecutor/votianlt/repository/MessageEnvelopeRepository.java diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 82fa38c..af66fec 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -10,7 +10,8 @@ "Bash(xargs kill:*)", "Bash(cat:*)", "Bash(mongosh:*)", - "Bash(mongo:*)" + "Bash(mongo:*)", + "Bash(kill:*)" ], "deny": [], "ask": [] diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java index ff587af..e0169d4 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java +++ b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.assecutor.votianlt.messaging.model.*; import de.assecutor.votianlt.messaging.plugin.PluginManager; import de.assecutor.votianlt.messaging.plugin.SendOptions; -import de.assecutor.votianlt.repository.MessageEnvelopeRepository; import de.assecutor.votianlt.repository.PendingDeliveryRepository; import de.assecutor.votianlt.service.ClientConnectionService; import lombok.extern.slf4j.Slf4j; @@ -17,12 +16,9 @@ import jakarta.annotation.PreDestroy; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -42,48 +38,22 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { private final PluginManager pluginManager; private final PendingDeliveryRepository pendingDeliveryRepository; - private final MessageEnvelopeRepository envelopeRepository; private final AcknowledgmentHandler acknowledgmentHandler; private final DeliveryConfig config; private final ObjectMapper objectMapper; private final ClientConnectionService clientConnectionService; - // In-memory tracking of messages awaiting acknowledgment - private final Map pendingAckMessages = new ConcurrentHashMap<>(); private ScheduledExecutorService retryScheduler; - /** - * Holds information about a message pending acknowledgment. - */ - private static class PendingAckMessage { - final String messageId; - final String clientId; - final String messageType; - final byte[] envelopeData; - final String json; - int retryCount; - - PendingAckMessage(String messageId, String clientId, String messageType, byte[] envelopeData, String json) { - this.messageId = messageId; - this.clientId = clientId; - this.messageType = messageType; - this.envelopeData = envelopeData; - this.json = json; - this.retryCount = 0; - } - } - public MessageDeliveryServiceImpl( PluginManager pluginManager, PendingDeliveryRepository pendingDeliveryRepository, - MessageEnvelopeRepository envelopeRepository, AcknowledgmentHandler acknowledgmentHandler, DeliveryConfig config, ObjectMapper objectMapper, ClientConnectionService clientConnectionService) { this.pluginManager = pluginManager; this.pendingDeliveryRepository = pendingDeliveryRepository; - this.envelopeRepository = envelopeRepository; this.acknowledgmentHandler = acknowledgmentHandler; this.config = config; this.objectMapper = objectMapper; @@ -98,7 +68,7 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { return t; }); retryScheduler.scheduleAtFixedRate( - this::checkAndRetryPendingAckMessages, + this::retryPendingDeliveries, ackRetryIntervalSeconds, ackRetryIntervalSeconds, TimeUnit.SECONDS @@ -123,81 +93,17 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { } } - /** - * Check for pending messages and retry sending them. - * Called every RETRY_INTERVAL_SECONDS seconds. - */ - private void checkAndRetryPendingAckMessages() { - if (pendingAckMessages.isEmpty()) { - return; - } - - log.debug("[MessageDelivery] Checking {} pending messages for retry", pendingAckMessages.size()); - - List toRemove = new ArrayList<>(); - - for (PendingAckMessage pending : pendingAckMessages.values()) { - // Only retry if client is connected - if (!clientConnectionService.isClientConnected(pending.clientId)) { - log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected", - pending.messageId, pending.clientId); - continue; - } - - pending.retryCount++; - - if (pending.retryCount > ackMaxRetries) { - // Max retries reached - log and remove - log.error("[MessageDelivery] Message {} not acknowledged after {} attempts. Removing from retry queue. JSON: {}", - pending.messageId, ackMaxRetries, pending.json); - toRemove.add(pending.messageId); - } else { - // Retry sending - log.info("[MessageDelivery] Retrying message {} (attempt {}/{})", - pending.messageId, pending.retryCount, ackMaxRetries); - - try { - SendOptions sendOptions = SendOptions.reliable(); - pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions) - .thenAccept(v -> log.info("[MessageDelivery] Retry {} sent successfully for message {}", - pending.retryCount, pending.messageId)) - .exceptionally(ex -> { - log.error("[MessageDelivery] Retry {} failed for message {}: {}", - pending.retryCount, pending.messageId, ex.getMessage()); - return null; - }); - } catch (Exception e) { - log.error("[MessageDelivery] Error during retry for message {}: {}", - pending.messageId, e.getMessage(), e); - } - } - } - - // Remove messages that exceeded max retries - toRemove.forEach(pendingAckMessages::remove); - } - @Override public CompletableFuture sendToClient(String clientId, String messageType, Object payload, DeliveryOptions options) { try { - // Create destination identifier for tracking String destination = clientId + "/" + messageType; - - // Create message envelope final LocalDateTime expiresAt = options.calculateExpiryTime(); MessageEnvelope envelope = new MessageEnvelope(destination, payload, options.isRequiresAck(), expiresAt); - - // Save envelope to database - envelope = envelopeRepository.save(envelope); final String messageId = envelope.getMessageId(); - log.debug("[MessageDelivery] Created envelope {} for client {} (type: {})", messageId, clientId, messageType); - // Serialize envelope to JSON String json = objectMapper.writeValueAsString(envelope); byte[] envelopeData = json.getBytes(StandardCharsets.UTF_8); - log.info("[MessageDelivery] Sending JSON to client {} (type: {}): {}", clientId, messageType, json); - // Create pending delivery record if acknowledgment is required if (options.isRequiresAck()) { PendingDelivery pending = new PendingDelivery( messageId, @@ -207,15 +113,8 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { expiresAt ); pendingDeliveryRepository.save(pending); - log.debug("[MessageDelivery] Created pending delivery for message {}", messageId); - - // Add to in-memory retry queue - PendingAckMessage pendingAck = new PendingAckMessage(messageId, clientId, messageType, envelopeData, json); - pendingAckMessages.put(messageId, pendingAck); - log.info("[MessageDelivery] Added message {} to retry queue (requires ACK)", messageId); } - // Send via plugin manager SendOptions sendOptions = SendOptions.builder() .qos(options.getQos()) .retained(options.isRetained()) @@ -224,19 +123,17 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { final boolean requiresAck = options.isRequiresAck(); final Duration ackTimeout = options.getAckTimeout(); + log.info("[MessageDelivery] Sending message {} to client {} (type: {})", messageId, clientId, messageType); + return pluginManager.sendToClient(clientId, messageType, envelopeData, sendOptions) .thenApply(v -> { - // Update pending delivery status if (requiresAck) { updatePendingDeliveryAfterSend(messageId, ackTimeout); } - log.info("[MessageDelivery] Successfully sent message {} to client {} (type: {})", - messageId, clientId, messageType); return DeliveryReceipt.submitted(messageId, destination, expiresAt); }) .exceptionally(ex -> { - log.error("[MessageDelivery] Failed to send message {} to client {} (type: {}): {}", - messageId, clientId, messageType, ex.getMessage()); + log.error("[MessageDelivery] Failed to send message {}: {}", messageId, ex.getMessage()); if (requiresAck) { markPendingDeliveryFailed(messageId, ex.getMessage()); } @@ -244,8 +141,7 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { }); } catch (Exception e) { - log.error("[MessageDelivery] Error creating message for client {} (type: {}): {}", - clientId, messageType, e.getMessage(), e); + log.error("[MessageDelivery] Error creating message for client {}: {}", clientId, e.getMessage()); return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", clientId + "/" + messageType)); } } @@ -253,16 +149,12 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { @Override @Deprecated public CompletableFuture sendMessage(String topic, Object payload, DeliveryOptions options) { - // Extract clientId and messageType from topic - // Topic format: /client/{clientId}/{messageType} String[] parts = topic.split("/"); if (parts.length >= 4 && parts[1].equals("client")) { String clientId = parts[2]; String messageType = parts[3]; return sendToClient(clientId, messageType, payload, options); } - - // Fallback for legacy topics - log warning log.warn("[MessageDelivery] Using deprecated sendMessage with topic: {}", topic); return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", topic)); } @@ -270,40 +162,30 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { @Override public void handleIncomingMessage(MessageEnvelope envelope) { try { - log.info("[MessageDelivery] Received message {} on topic {}", + log.info("[MessageDelivery] Received message {} on topic {}", envelope.getMessageId(), envelope.getTopic()); - // Send acknowledgment if required if (envelope.isRequiresAck()) { sendAcknowledgment(envelope); } - // Forward to acknowledgment handler for application routing acknowledgmentHandler.routeIncomingMessage(envelope); } catch (Exception e) { - log.error("[MessageDelivery] Error handling incoming message {}: {}", - envelope.getMessageId(), e.getMessage(), e); + log.error("[MessageDelivery] Error handling incoming message {}: {}", + envelope.getMessageId(), e.getMessage()); } } @Override public void handleAcknowledgment(AcknowledgmentMessage ack) { try { - log.info("[MessageDelivery] Received acknowledgment for message {} with status {}", + log.info("[MessageDelivery] Received ACK for message {} (status: {})", ack.getMessageId(), ack.getStatus()); - // Remove from in-memory retry queue - PendingAckMessage removed = pendingAckMessages.remove(ack.getMessageId()); - if (removed != null) { - log.info("[MessageDelivery] Removed message {} from retry queue (ACK received)", ack.getMessageId()); - } - Optional pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId()); if (pendingOpt.isEmpty()) { - log.warn("[MessageDelivery] No pending delivery found for acknowledged message {}", - ack.getMessageId()); return; } @@ -311,21 +193,19 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { switch (ack.getStatus()) { case RECEIVED, PROCESSED -> { - pending.markAsAcknowledged(); - pendingDeliveryRepository.save(pending); - log.info("[MessageDelivery] Message {} acknowledged successfully", ack.getMessageId()); + pendingDeliveryRepository.delete(pending); } case FAILED -> { pending.markAsFailed(ack.getErrorMessage()); pendingDeliveryRepository.save(pending); - log.warn("[MessageDelivery] Message {} failed on client: {}", + log.warn("[MessageDelivery] Message {} failed on client: {}", ack.getMessageId(), ack.getErrorMessage()); } } } catch (Exception e) { - log.error("[MessageDelivery] Error handling acknowledgment for message {}: {}", - ack.getMessageId(), e.getMessage(), e); + log.error("[MessageDelivery] Error handling ACK for message {}: {}", + ack.getMessageId(), e.getMessage()); } } @@ -347,18 +227,15 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { .findByStatusAndNextRetryAtBefore(DeliveryStatus.SENT, LocalDateTime.now()); if (readyForRetry.isEmpty()) { - log.debug("[MessageDelivery] No pending deliveries ready for retry"); return; } - log.info("[MessageDelivery] Retrying {} pending deliveries", readyForRetry.size()); - for (PendingDelivery pending : readyForRetry) { retryDelivery(pending); } } catch (Exception e) { - log.error("[MessageDelivery] Error during retry process: {}", e.getMessage(), e); + log.error("[MessageDelivery] Error during retry process: {}", e.getMessage()); } } @@ -368,90 +245,49 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { return; } - log.info("[MessageDelivery] Client {} reconnected - retrying pending messages", clientId); - - // Retry in-memory pending ACK messages for this client - int inMemoryCount = 0; - for (PendingAckMessage pending : pendingAckMessages.values()) { - if (clientId.equals(pending.clientId)) { - inMemoryCount++; - try { - SendOptions sendOptions = SendOptions.reliable(); - pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions) - .thenAccept(v -> log.info("[MessageDelivery] Reconnect retry sent for message {} to client {}", - pending.messageId, clientId)) - .exceptionally(ex -> { - log.error("[MessageDelivery] Reconnect retry failed for message {}: {}", - pending.messageId, ex.getMessage()); - return null; - }); - } catch (Exception e) { - log.error("[MessageDelivery] Error during reconnect retry for message {}: {}", - pending.messageId, e.getMessage(), e); - } - } - } - - // Retry database pending deliveries for this client try { List pendingDeliveries = pendingDeliveryRepository .findByStatusIn(List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT)); - int dbCount = 0; for (PendingDelivery pending : pendingDeliveries) { String topic = pending.getTopic(); if (topic != null && topic.startsWith(clientId + "/")) { - dbCount++; retryDelivery(pending); } } - log.info("[MessageDelivery] Triggered retry for client {}: {} in-memory, {} from database", - clientId, inMemoryCount, dbCount); - } catch (Exception e) { - log.error("[MessageDelivery] Error retrying database deliveries for client {}: {}", - clientId, e.getMessage(), e); + log.error("[MessageDelivery] Error retrying deliveries for client {}: {}", clientId, e.getMessage()); } } @Override public void cleanupOldDeliveries() { try { - // Clean up acknowledged deliveries older than configured retention LocalDateTime cutoff = LocalDateTime.now().minus(Duration.ofDays(7)); List oldAcknowledged = pendingDeliveryRepository .findByStatusAndAcknowledgedAtBefore(DeliveryStatus.ACKNOWLEDGED, cutoff); - + if (!oldAcknowledged.isEmpty()) { pendingDeliveryRepository.deleteAll(oldAcknowledged); - log.info("[MessageDelivery] Cleaned up {} old acknowledged deliveries", oldAcknowledged.size()); } - // Mark expired deliveries List expired = pendingDeliveryRepository .findByStatusInAndExpiresAtBefore( - List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT), + List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT), LocalDateTime.now() ); - + for (PendingDelivery pending : expired) { pending.markAsExpired(); pendingDeliveryRepository.save(pending); } - - if (!expired.isEmpty()) { - log.info("[MessageDelivery] Marked {} deliveries as expired", expired.size()); - } } catch (Exception e) { - log.error("[MessageDelivery] Error during cleanup: {}", e.getMessage(), e); + log.error("[MessageDelivery] Error during cleanup: {}", e.getMessage()); } } - /** - * Update pending delivery after successful send - */ private void updatePendingDeliveryAfterSend(String messageId, Duration ackTimeout) { try { Optional pendingOpt = pendingDeliveryRepository.findByMessageId(messageId); @@ -466,9 +302,6 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { } } - /** - * Mark pending delivery as failed - */ private void markPendingDeliveryFailed(String messageId, String reason) { try { Optional pendingOpt = pendingDeliveryRepository.findByMessageId(messageId); @@ -482,40 +315,23 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { } } - /** - * Retry a pending delivery - */ private void retryDelivery(PendingDelivery pending) { try { - // Check if expired if (pending.isExpired()) { pending.markAsExpired(); pendingDeliveryRepository.save(pending); - log.warn("[MessageDelivery] Message {} expired, not retrying", pending.getMessageId()); return; } - // Check if max retries reached if (pending.hasReachedMaxRetries()) { pending.markAsFailed("Max retries reached"); pendingDeliveryRepository.save(pending); - log.warn("[MessageDelivery] Message {} reached max retries", pending.getMessageId()); return; } - // Increment retry count - pending.incrementRetryCount(); - - // Calculate next retry time with exponential backoff - Duration backoffDelay = calculateBackoff(pending.getRetryCount()); - LocalDateTime nextRetry = LocalDateTime.now().plus(backoffDelay); - - // Extract clientId and messageType from topic - // Topic format: clientId/messageType String topic = pending.getTopic(); String[] parts = topic.split("/"); if (parts.length < 2) { - log.error("[MessageDelivery] Invalid topic format for retry: {}", topic); pending.markAsFailed("Invalid topic format"); pendingDeliveryRepository.save(pending); return; @@ -524,76 +340,64 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { String clientId = parts[0]; String messageType = parts[1]; - // Only retry if client is connected + Duration backoffDelay = calculateBackoff(pending.getRetryCount() + 1); + LocalDateTime nextRetry = LocalDateTime.now().plus(backoffDelay); + if (!clientConnectionService.isClientConnected(clientId)) { - log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected", - pending.getMessageId(), clientId); + pending.setNextRetryAt(nextRetry); + pendingDeliveryRepository.save(pending); return; } - // Send via plugin manager + pending.incrementRetryCount(); + SendOptions options = SendOptions.reliable(); pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options) .thenAccept(v -> { pending.markAsSent(nextRetry); pendingDeliveryRepository.save(pending); - log.info("[MessageDelivery] Retry {} successful for message {}", - pending.getRetryCount(), pending.getMessageId()); }) .exceptionally(ex -> { - log.error("[MessageDelivery] Retry failed for message {}: {}", - pending.getMessageId(), ex.getMessage()); + log.error("[MessageDelivery] Retry failed for message {}: {}", pending.getMessageId(), ex.getMessage()); pending.markAsFailed(ex.getMessage()); pendingDeliveryRepository.save(pending); return null; }); } catch (Exception e) { - log.error("[MessageDelivery] Error retrying delivery {}: {}", - pending.getMessageId(), e.getMessage(), e); + log.error("[MessageDelivery] Error retrying delivery {}: {}", pending.getMessageId(), e.getMessage()); } } - /** - * Send acknowledgment back to client - */ private void sendAcknowledgment(MessageEnvelope envelope) { try { - // Extract client ID from topic (e.g., /server/{clientId}/... or clientId/messageType) String clientId = extractClientIdFromTopic(envelope.getTopic()); if (clientId == null) { - log.warn("[MessageDelivery] Cannot send ACK, no clientId in topic: {}", envelope.getTopic()); return; } - // Create acknowledgment message AcknowledgmentMessage ack = new AcknowledgmentMessage( envelope.getMessageId(), AckStatus.RECEIVED, "server" ); - // Send ACK to client using new API String ackJson = objectMapper.writeValueAsString(ack); byte[] ackData = ackJson.getBytes(StandardCharsets.UTF_8); + log.info("[MessageDelivery] Sending ACK for message {} to client {}", envelope.getMessageId(), clientId); + pluginManager.sendAckToClient(clientId, envelope.getMessageId(), ackData, SendOptions.fireAndForget()) - .thenAccept(v -> log.debug("[MessageDelivery] Sent ACK for message {}", envelope.getMessageId())) .exceptionally(ex -> { - log.error("[MessageDelivery] Failed to send ACK for message {}: {}", - envelope.getMessageId(), ex.getMessage()); + log.error("[MessageDelivery] Failed to send ACK for message {}: {}", envelope.getMessageId(), ex.getMessage()); return null; }); } catch (Exception e) { - log.error("[MessageDelivery] Error sending acknowledgment for message {}: {}", - envelope.getMessageId(), e.getMessage(), e); + log.error("[MessageDelivery] Error sending ACK for message {}: {}", envelope.getMessageId(), e.getMessage()); } } - /** - * Calculate exponential backoff delay - */ private Duration calculateBackoff(int retryCount) { long delayMs = (long) (config.getRetryInitialDelay().toMillis() * Math.pow(config.getRetryBackoffMultiplier(), retryCount - 1)); @@ -601,16 +405,11 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { return Duration.ofMillis(Math.min(delayMs, maxDelayMs)); } - /** - * Extract client ID from topic pattern. - * Supports both old format (/server/{clientId}/...) and new format (clientId/messageType) - */ private String extractClientIdFromTopic(String topic) { if (topic == null) { return null; } - // Old format: /server/{clientId}/... if (topic.startsWith("/server/")) { String[] parts = topic.split("/"); if (parts.length > 2) { @@ -618,7 +417,6 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { } } - // New format: clientId/messageType if (topic.contains("/")) { String[] parts = topic.split("/"); if (parts.length >= 1) { @@ -629,4 +427,3 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { return null; } } - diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/MessageEnvelope.java b/src/main/java/de/assecutor/votianlt/messaging/model/MessageEnvelope.java index 36eda1d..3d033a8 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/model/MessageEnvelope.java +++ b/src/main/java/de/assecutor/votianlt/messaging/model/MessageEnvelope.java @@ -1,15 +1,9 @@ package de.assecutor.votianlt.messaging.model; -import com.fasterxml.jackson.annotation.JsonGetter; -import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import org.bson.types.ObjectId; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.index.Indexed; -import org.springframework.data.mongodb.core.mapping.Document; -import org.springframework.data.mongodb.core.mapping.Field; import java.time.LocalDateTime; import java.util.HashMap; @@ -19,64 +13,52 @@ import java.util.UUID; /** * Envelope that wraps all messages sent through the messaging system. * Contains metadata for delivery tracking and acknowledgment. + * This is a DTO class - not persisted to MongoDB. */ @Data @NoArgsConstructor @AllArgsConstructor -@Document(collection = "message_envelopes") +@JsonIgnoreProperties(ignoreUnknown = true) public class MessageEnvelope { - @Id - @JsonIgnore - private ObjectId id; - /** * Unique identifier for this message (UUID) */ - @Field("message_id") - @Indexed(unique = true) private String messageId; /** * Timestamp when the envelope was created */ - @Field("timestamp") private LocalDateTime timestamp; /** * Target topic for this message */ - @Field("topic") private String topic; /** * The actual message payload (can be any serializable object) */ - @Field("payload") private Object payload; /** * Whether this message requires acknowledgment from the receiver */ - @Field("requires_ack") private boolean requiresAck; /** * Number of times this message has been retried */ - @Field("retry_count") private int retryCount; /** * When this message expires and should no longer be delivered */ - @Field("expires_at") private LocalDateTime expiresAt; /** * Additional metadata for the message */ - @Field("metadata") private Map metadata; /** @@ -116,13 +98,4 @@ public class MessageEnvelope { } this.metadata.put(key, value); } - - /** - * Returns the ObjectId as string for JSON serialization - */ - @JsonGetter("id") - public String getIdAsString() { - return id != null ? id.toString() : null; - } } - diff --git a/src/main/java/de/assecutor/votianlt/pages/view/AuthenticatedStartView.java b/src/main/java/de/assecutor/votianlt/pages/view/AuthenticatedStartView.java index 7e3d523..8af45ba 100644 --- a/src/main/java/de/assecutor/votianlt/pages/view/AuthenticatedStartView.java +++ b/src/main/java/de/assecutor/votianlt/pages/view/AuthenticatedStartView.java @@ -98,9 +98,11 @@ public class AuthenticatedStartView extends VerticalLayout { // Features Grid HorizontalLayout featuresGrid = new HorizontalLayout(); - featuresGrid.setWidthFull(); featuresGrid.setSpacing(true); + featuresGrid.setJustifyContentMode(FlexComponent.JustifyContentMode.CENTER); featuresGrid.setDefaultVerticalComponentAlignment(FlexComponent.Alignment.START); + featuresGrid.getStyle().set("flex-wrap", "wrap"); + featuresGrid.getStyle().set("width", "100%"); // Feature Cards featuresGrid.add(createFeatureCard(VaadinIcon.COG, "Einrichtungsassistent", diff --git a/src/main/java/de/assecutor/votianlt/repository/MessageEnvelopeRepository.java b/src/main/java/de/assecutor/votianlt/repository/MessageEnvelopeRepository.java deleted file mode 100644 index b5eec83..0000000 --- a/src/main/java/de/assecutor/votianlt/repository/MessageEnvelopeRepository.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.assecutor.votianlt.repository; - -import de.assecutor.votianlt.messaging.model.MessageEnvelope; -import org.bson.types.ObjectId; -import org.springframework.data.mongodb.repository.MongoRepository; -import org.springframework.stereotype.Repository; - -import java.time.LocalDateTime; -import java.util.List; -import java.util.Optional; - -/** - * Repository for MessageEnvelope entities. - */ -@Repository -public interface MessageEnvelopeRepository extends MongoRepository { - - /** - * Find envelope by message ID - */ - Optional findByMessageId(String messageId); - - /** - * Find all envelopes for a specific topic - */ - List findByTopic(String topic); - - /** - * Find expired envelopes - */ - List findByExpiresAtBefore(LocalDateTime dateTime); - - /** - * Find envelopes created after a specific time - */ - List findByTimestampAfter(LocalDateTime dateTime); - - /** - * Delete envelopes older than specified time - */ - void deleteByTimestampBefore(LocalDateTime dateTime); -} - diff --git a/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java index 2444378..789c784 100644 --- a/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java +++ b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java @@ -108,21 +108,35 @@ public class ClientConnectionService { /** * Handles a pong response from a client. + * Searches by both clientId and userId since pong is sent to /server/{userId}/pong. * - * @param clientId The client that sent the pong + * @param id The client or user identifier that sent the pong */ - public void handlePong(String clientId) { - if (clientId == null || clientId.isBlank()) { - log.warn("Received pong from null or blank clientId"); + public void handlePong(String id) { + if (id == null || id.isBlank()) { + log.warn("Received pong from null or blank id"); return; } - ClientState state = connectedClients.get(clientId); + // First try direct lookup by clientId + String clientId = id; + ClientState state = connectedClients.get(id); + + // If not found, search by userId + if (state == null) { + for (Map.Entry entry : connectedClients.entrySet()) { + if (id.equals(entry.getValue().userId())) { + clientId = entry.getKey(); + state = entry.getValue(); + break; + } + } + } + if (state != null) { boolean wasDisconnected = !state.connected(); ClientState updatedState = state.withPongReceived(Instant.now()); connectedClients.put(clientId, updatedState); - log.debug("Pong received from client: clientId={}", clientId); // If client was disconnected and is now reconnected, retry pending messages if (wasDisconnected) { @@ -130,19 +144,29 @@ public class ClientConnectionService { messageDeliveryService.retryPendingDeliveriesForClient(clientId); } } else { - log.warn("Received pong from unknown client: clientId={}", clientId); + log.warn("Received pong from unknown id: {}", id); } } /** * Checks if a client is currently connected. + * Searches by both clientId and userId. * - * @param clientId The client identifier + * @param id The client or user identifier * @return true if the client is connected */ - public boolean isClientConnected(String clientId) { - ClientState state = connectedClients.get(clientId); - return state != null && state.connected(); + public boolean isClientConnected(String id) { + if (id == null || id.isBlank()) { + return false; + } + // First try direct lookup by clientId + ClientState state = connectedClients.get(id); + if (state != null && state.connected()) { + return true; + } + // Then search by userId + return connectedClients.values().stream() + .anyMatch(s -> s.connected() && id.equals(s.userId())); } /** @@ -209,10 +233,9 @@ public class ClientConnectionService { } } - // Send ping to connected clients - if (state.connected()) { - log.info("[ClientConnectionService] Sending ping to client: {}", clientId); - sendPing(clientId); + // Send ping to connected clients (use userId for topic) + if (state.connected() && state.userId() != null) { + sendPing(state.userId()); ClientState updatedState = state.withPingSent(now); connectedClients.put(clientId, updatedState); } @@ -220,11 +243,11 @@ public class ClientConnectionService { } /** - * Sends a ping message to a specific client. + * Sends a ping message to a specific user. * - * @param clientId The target client + * @param userId The target user ID (MongoDB ObjectId) */ - private void sendPing(String clientId) { + private void sendPing(String userId) { try { Map pingPayload = Map.of( "type", "ping", @@ -233,24 +256,16 @@ public class ClientConnectionService { String json = objectMapper.writeValueAsString(pingPayload); byte[] payload = json.getBytes(StandardCharsets.UTF_8); - log.info("[ClientConnectionService] Ping payload for client {}: {}", clientId, json); SendOptions options = SendOptions.builder() .qos(1) .retained(false) .build(); - pluginManager.sendToClient(clientId, "ping", payload, options) - .whenComplete((result, error) -> { - if (error != null) { - log.warn("[ClientConnectionService] Failed to send ping to client {}: {}", clientId, error.getMessage()); - } else { - log.info("[ClientConnectionService] Ping sent successfully to client: {}", clientId); - } - }); + pluginManager.sendToClient(userId, "ping", payload, options); } catch (Exception e) { - log.error("Error sending ping to client {}: {}", clientId, e.getMessage(), e); + log.error("Error sending ping to user {}: {}", userId, e.getMessage()); } }