diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 4f31da7..82fa38c 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -8,9 +8,11 @@ "Bash(rm:*)", "Bash(lsof:*)", "Bash(xargs kill:*)", - "Bash(cat:*)" + "Bash(cat:*)", + "Bash(mongosh:*)", + "Bash(mongo:*)" ], "deny": [], "ask": [] } -} \ No newline at end of file +} diff --git a/CLAUDE.md b/CLAUDE.md index b1e98a9..9a71044 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -7,10 +7,12 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - **Start development server**: `./mvnw` (runs Spring Boot with Vaadin dev mode) - **Build for production**: `./mvnw -Pproduction package` - **Clean build**: `./mvnw clean compile` +- **Format code**: `./mvnw spotless:apply` (applies Eclipse formatter for Java, Prettier for TypeScript) +- **Check formatting**: `./mvnw spotless:check` ## Architecture Overview -This is a **Vaadin Spring Boot** application for job/task management with real-time mobile app communication via MQTT. The system manages logistics jobs with tasks that mobile app users complete. +This is a **Vaadin Spring Boot** application for job/task management with real-time mobile app communication via a pluggable messaging transport layer. The system manages logistics jobs with tasks that mobile app users complete. ### Core Architecture Layers @@ -20,9 +22,15 @@ This is a **Vaadin Spring Boot** application for job/task management with real-t **Backend Services**: - `src/main/java/de/assecutor/votianlt/service/` - Business logic -- `src/main/java/de/assecutor/votianlt/controller/` - MQTT message handling +- `src/main/java/de/assecutor/votianlt/controller/` - Message handling (routes inbound messages to processors) - `src/main/java/de/assecutor/votianlt/repository/` - MongoDB data access +**Messaging Layer** (`src/main/java/de/assecutor/votianlt/messaging/`): +- `plugin/` - Transport plugin interface and implementations (MQTT, extensible for WebSocket, gRPC) +- `delivery/` - Reliable message delivery with acknowledgment tracking, retries, and expiry +- `model/` - Message envelopes, delivery status, pending deliveries +- `config/` - Messaging configuration and wiring + **Models**: - `src/main/java/de/assecutor/votianlt/model/` - Domain entities - Task hierarchy: `BaseTask` with subtypes (`PhotoTask`, `BarcodeTask`, `SignatureTask`, etc.) @@ -36,10 +44,15 @@ This is a **Vaadin Spring Boot** application for job/task management with real-t - `AppUser` - Mobile app users (task executors) - `AppUser.owner` field links to `User` for notifications -**MQTT Communication**: -- `MqttV5ClientManager` handles bidirectional communication with mobile apps -- `MessageController` routes inbound MQTT messages and processes task completions -- Topics: `/server/{clientId}/task_completed`, `/server/login`, etc. +**Messaging Plugin Architecture**: +- `MessagingPlugin` interface abstracts transport protocols (currently MQTT via HiveMQ) +- `MessageDeliveryService` provides guaranteed delivery with acknowledgment tracking +- `AcknowledgmentHandler` processes ACKs and updates delivery status +- Plugins are responsible for topic/channel structure; delivery layer uses `clientId` and `messageType` + +**Client Connection Monitoring**: +- `ClientConnectionService` tracks connected mobile clients via ping/pong mechanism +- Server sends ping to `/client/{clientId}/ping`, client responds on `/server/{clientId}/pong` **History Tracking**: `JobHistoryService` logs all job/task changes with detailed audit trail displayed in `JobHistoryView`. @@ -51,6 +64,7 @@ This is a **Vaadin Spring Boot** application for job/task management with real-t - `jobs` - Main job entities with status tracking - `tasks` - Polymorphic task storage (discriminated by `taskType`) - `job_history` - Audit trail for all job changes +- `pending_deliveries` - Message delivery tracking for retries - `photos`, `barcodes`, `signatures` - Task completion data - `users` - Web interface users - `app_user` - Mobile app users @@ -58,15 +72,16 @@ This is a **Vaadin Spring Boot** application for job/task management with real-t ## Configuration -**Database**: MongoDB at `192.168.180.25:27017/votianlt` -**MQTT**: HiveMQ client connects to `mqtt-2.assecutor.de` with credentials `app`/`apppwd` -**Email**: SMTP via `mailhub.assecutor.org:587` using Spring Boot mail auto-configuration +**Database**: MongoDB (configurable via `spring.data.mongodb.uri`) +**Messaging**: Plugin-based, currently MQTT via HiveMQ (`app.messaging.plugin.*` properties) +**Email**: SMTP via Spring Boot mail auto-configuration ## Development Environment **Java 21** with **Spring Boot 3.4.3** and **Vaadin 24.7.0** **Security**: Spring Security with role-based access (`USER` role required) -**Profiles**: `production` profile available for optimized builds +**Formatting**: Spotless Maven plugin with Eclipse formatter (Java) and Prettier (TypeScript) +**Profiles**: `production` profile for optimized builds, `integration-test` profile for failsafe plugin ## Key Integration Points @@ -80,4 +95,9 @@ When modifying job status flow: 2. Modify `EmailService.updateJobStatusToCompleted()` logic 3. Consider email notification templates -MQTT message routing follows pattern: extract `taskType` from payload, route to appropriate processor method in `MessageController`. \ No newline at end of file +When adding new messaging transports: +1. Implement `MessagingPlugin` interface +2. Register in `PluginMessagingConfig` +3. Add configuration properties under `app.messaging.plugin..*` + +Message routing follows pattern: `MessageController` receives messages via `MessageDeliveryService`, extracts `taskType`/`messageType` from payload, routes to appropriate processor method. diff --git a/README.md b/README.md index 6243ed5..f481d84 100644 --- a/README.md +++ b/README.md @@ -1,9 +1 @@ -# Votianlt README - -To start the application in development mode, run: `./mvnw` - -To build the application in production mode, run: `./mvnw -Pproduction package` - -## Getting Started - -The [Getting Started](https://vaadin.com/docs/latest/getting-started) guide will quickly familiarize you with your new Votianlt implementation. You'll learn how to set up your development environment, understand the project structure, and find resources to help you add muscles to your skeleton — transforming it into a fully-featured application. +docker buildx build --platform linux/amd64 -t appcreationgmbh/votianlt:0.8.0 --push . \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3689245..804180c 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ de.assecutor.votianlt votianlt - 0.5.2 + 0.8.0 jar diff --git a/src/main/bundles/prod.bundle b/src/main/bundles/prod.bundle index ab47147..8656b0a 100644 Binary files a/src/main/bundles/prod.bundle and b/src/main/bundles/prod.bundle differ diff --git a/src/main/java/de/assecutor/votianlt/controller/MessageController.java b/src/main/java/de/assecutor/votianlt/controller/MessageController.java index d88c17c..1c593cf 100644 --- a/src/main/java/de/assecutor/votianlt/controller/MessageController.java +++ b/src/main/java/de/assecutor/votianlt/controller/MessageController.java @@ -25,6 +25,7 @@ import de.assecutor.votianlt.model.Comment; import de.assecutor.votianlt.service.JobHistoryService; import de.assecutor.votianlt.service.EmailService; import de.assecutor.votianlt.service.MessageService; +import de.assecutor.votianlt.service.ClientConnectionService; import de.assecutor.votianlt.model.JobStatus; import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.databind.ObjectMapper; @@ -72,12 +73,14 @@ public class MessageController { private final MessageService messageService; private final UserService userService; private final ObjectMapper objectMapper; + private final ClientConnectionService clientConnectionService; public MessageController(MqttPublisher mqttPublisher, AppUserRepository appUserRepository, AppUserService appUserService, JobRepository jobRepository, CargoItemRepository cargoItemRepository, TaskRepository taskRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository, SignatureRepository signatureRepository, CommentRepository commentRepository, JobHistoryService jobHistoryService, - EmailService emailService, MessageService messageService, UserService userService, ObjectMapper objectMapper) { + EmailService emailService, MessageService messageService, UserService userService, ObjectMapper objectMapper, + ClientConnectionService clientConnectionService) { this.mqttPublisher = mqttPublisher; this.appUserRepository = appUserRepository; this.appUserService = appUserService; @@ -93,6 +96,7 @@ public class MessageController { this.messageService = messageService; this.userService = userService; this.objectMapper = objectMapper; + this.clientConnectionService = clientConnectionService; } /** @@ -123,6 +127,8 @@ public class MessageController { response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString()); // Store clientId mapping for this user session storeClientIdMapping(user.getIdAsString(), request.getClientId()); + // Register client for ping/pong monitoring + clientConnectionService.registerClient(request.getClientId(), user.getIdAsString()); } } } @@ -172,7 +178,15 @@ public class MessageController { // Find jobs assigned to this app user List assignedJobs = jobRepository.findByAppUser(appUserId); - log.debug("Found {} jobs for appUserId: {}", assignedJobs.size(), appUserId); + log.info("Found {} jobs for appUserId: {}", assignedJobs.size(), appUserId); + + // Debug: Log all jobs and their app_user values to diagnose assignment issues + List allJobs = jobRepository.findAll(); + log.info("DEBUG: Total jobs in database: {}", allJobs.size()); + for (Job job : allJobs) { + log.info("DEBUG: Job {} (number: {}) has app_user='{}', digitalProcessing={}", + job.getIdAsString(), job.getJobNumber(), job.getAppUser(), job.isDigitalProcessing()); + } // For each job, fetch related cargo items and tasks (ordered by task order) List jobsWithRelatedData = assignedJobs.stream().map(job -> { @@ -608,6 +622,22 @@ public class MessageController { return clientIdUserMapping.get(clientId); } + /** + * Handle pong response from a client. + * Client sends to /server/{clientId}/pong with payload { timestamp }. + * Used for connection monitoring. + */ + public void handlePong(Map payload) { + String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null; + log.debug("Received pong from client: {}", clientId); + + if (clientId != null && !clientId.isBlank()) { + clientConnectionService.handlePong(clientId); + } else { + log.warn("Received pong without clientId in payload"); + } + } + /** * Handle incoming message from a client via MQTT. * Client sends to /server/{clientId}/message with payload: diff --git a/src/main/java/de/assecutor/votianlt/messaging/config/PluginMessagingConfig.java b/src/main/java/de/assecutor/votianlt/messaging/config/PluginMessagingConfig.java index 7f5f907..8283d72 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/config/PluginMessagingConfig.java +++ b/src/main/java/de/assecutor/votianlt/messaging/config/PluginMessagingConfig.java @@ -1,10 +1,13 @@ package de.assecutor.votianlt.messaging.config; +import de.assecutor.votianlt.controller.MessageController; +import de.assecutor.votianlt.dto.AppLoginRequest; import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService; import de.assecutor.votianlt.messaging.model.AcknowledgmentMessage; import de.assecutor.votianlt.messaging.model.MessageEnvelope; import de.assecutor.votianlt.messaging.plugin.*; import de.assecutor.votianlt.messaging.plugin.mqtt.MqttMessagingPlugin; +import de.assecutor.votianlt.service.ClientConnectionService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; @@ -14,6 +17,7 @@ import org.springframework.context.event.EventListener; import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.charset.StandardCharsets; +import java.util.Map; /** * Configuration for the plugin-based messaging system. @@ -61,8 +65,10 @@ public class PluginMessagingConfig { MessagingPlugin plugin = createPlugin(pluginType); PluginConfig config = createPluginConfig(pluginType); - // Get MessageDeliveryService from context (after all beans are created) + // Get beans from context (after all beans are created) MessageDeliveryService deliveryService = event.getApplicationContext().getBean(MessageDeliveryService.class); + MessageController messageController = event.getApplicationContext().getBean(MessageController.class); + ClientConnectionService clientConnectionService = event.getApplicationContext().getBean(ClientConnectionService.class); // Set up a listener to subscribe when connected log.info("[PluginMessagingConfig] Adding state listener"); @@ -72,7 +78,7 @@ public class PluginMessagingConfig { if (stateEvent.isConnected()) { log.info("[PluginMessagingConfig] Plugin connected, setting up subscriptions"); try { - setupSubscriptions(deliveryService); + setupSubscriptions(deliveryService, messageController, clientConnectionService); log.info("[PluginMessagingConfig] Subscriptions setup completed"); } catch (Exception e) { log.error("[PluginMessagingConfig] Error setting up subscriptions: {}", e.getMessage(), e); @@ -133,7 +139,9 @@ public class PluginMessagingConfig { /** * Setup message subscriptions using the new plugin API. */ - private void setupSubscriptions(MessageDeliveryService deliveryService) { + private void setupSubscriptions(MessageDeliveryService deliveryService, + MessageController messageController, + ClientConnectionService clientConnectionService) { log.info("[PluginMessagingConfig] Setting up message subscriptions"); try { @@ -157,12 +165,13 @@ public class PluginMessagingConfig { "task_completed", "jobs/assigned", "message", - "login" + "login", + "pong" }; for (String messageType : messageTypes) { pluginManager.registerMessageHandler(messageType, (clientId, payload) -> - handleEnvelopedMessage(clientId, payload, deliveryService)); + handleEnvelopedMessage(clientId, payload, deliveryService, messageController, clientConnectionService)); } log.info("[PluginMessagingConfig] Message subscriptions initialized"); @@ -175,8 +184,10 @@ public class PluginMessagingConfig { /** * Handle incoming enveloped message. + * Supports both new envelope format and legacy format for backwards compatibility. */ - private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService) { + private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService, + MessageController messageController, ClientConnectionService clientConnectionService) { try { String json = new String(payload, StandardCharsets.UTF_8); log.info("[PluginMessagingConfig] Received JSON from client {}: {}", clientId, json); @@ -184,13 +195,75 @@ public class PluginMessagingConfig { // Try to parse as envelope first try { MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class); - deliveryService.handleIncomingMessage(envelope); + // Valid envelope - check if it has required fields + if (envelope.getMessageId() != null && envelope.getTopic() != null) { + deliveryService.handleIncomingMessage(envelope); + return; + } } catch (Exception e) { - // If not an envelope, it might be a legacy message - log and skip - log.warn("[PluginMessagingConfig] Received non-enveloped message from client {}, skipping. JSON: {}", clientId, json); + // Not a valid envelope, try legacy format + log.debug("[PluginMessagingConfig] Message is not an envelope, trying legacy format"); } + + // Handle legacy format (direct payload without envelope) + handleLegacyMessage(clientId, json, messageController, clientConnectionService); + } catch (Exception e) { - log.error("[PluginMessagingConfig] Error handling enveloped message from client {}: {}", clientId, e.getMessage(), e); + log.error("[PluginMessagingConfig] Error handling message from client {}: {}", clientId, e.getMessage(), e); + } + } + + /** + * Handle legacy message format (without envelope wrapper). + * This supports older clients that don't use the envelope format. + */ + @SuppressWarnings("unchecked") + private void handleLegacyMessage(String clientId, String json, + MessageController messageController, ClientConnectionService clientConnectionService) { + try { + Map payload = objectMapper.readValue(json, Map.class); + log.info("[PluginMessagingConfig] Processing legacy message from client {}: {}", clientId, payload); + + // Check if this is a login request (has email, password, clientId) + if (payload.containsKey("email") && payload.containsKey("password") && payload.containsKey("clientId")) { + AppLoginRequest loginRequest = objectMapper.convertValue(payload, AppLoginRequest.class); + log.info("[PluginMessagingConfig] Routing legacy login request for email: {}", loginRequest.getEmail()); + messageController.handleAppLogin(loginRequest); + return; + } + + // Check if this is a pong response + if ("pong".equals(payload.get("type"))) { + String pongClientId = clientId != null ? clientId : (String) payload.get("clientId"); + if (pongClientId != null) { + log.debug("[PluginMessagingConfig] Routing legacy pong from client: {}", pongClientId); + clientConnectionService.handlePong(pongClientId); + } + return; + } + + // Check if this is a task completion + if (payload.containsKey("taskType") || payload.containsKey("taskId")) { + String taskType = payload.get("taskType") != null ? payload.get("taskType").toString() : null; + log.info("[PluginMessagingConfig] Routing legacy task_completed from client: {}", clientId); + messageController.handleTaskCompleted(payload, taskType); + return; + } + + // Check if this is a jobs/assigned request + if (payload.containsKey("appUserId")) { + if (clientId != null) { + payload.put("clientId", clientId); + } + log.info("[PluginMessagingConfig] Routing legacy jobs/assigned request from client: {}", clientId); + messageController.handleGetAssignedJobs(payload); + return; + } + + log.warn("[PluginMessagingConfig] Unknown legacy message format from client {}: {}", clientId, json); + + } catch (Exception e) { + log.error("[PluginMessagingConfig] Error handling legacy message from client {}: {}", clientId, e.getMessage(), e); } } } diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/AcknowledgmentHandler.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/AcknowledgmentHandler.java index dae79e6..0a02383 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/AcknowledgmentHandler.java +++ b/src/main/java/de/assecutor/votianlt/messaging/delivery/AcknowledgmentHandler.java @@ -51,6 +51,8 @@ public class AcknowledgmentHandler { handleLogin(payloadMap); } else if (topic.matches("/server/.+/message")) { handleIncomingMessage(topic, payloadMap); + } else if (topic.matches("/server/.+/pong")) { + handlePong(topic, payloadMap); } else { log.debug("[AckHandler] No route for topic {}", topic); } @@ -124,5 +126,24 @@ public class AcknowledgmentHandler { log.error("[AckHandler] Error handling incoming message: {}", e.getMessage(), e); } } + + /** + * Handle pong response from client for connection monitoring + */ + private void handlePong(String topic, Map payload) { + try { + // Extract clientId from topic: /server/{clientId}/pong + String[] parts = topic.split("/"); + String clientId = parts.length > 2 ? parts[2] : null; + if (clientId != null && !clientId.isBlank()) { + payload.put("clientId", clientId); + } else { + log.warn("[AckHandler] Couldn't extract clientId from topic {} for pong", topic); + } + messageController.handlePong(payload); + } catch (Exception e) { + log.error("[AckHandler] Error handling pong: {}", e.getMessage(), e); + } + } } diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryService.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryService.java index 5d0c127..043f45e 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryService.java +++ b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryService.java @@ -97,6 +97,14 @@ public interface MessageDeliveryService { */ void retryPendingDeliveries(); + /** + * Retry pending deliveries for a specific client. + * Called when a client reconnects. + * + * @param clientId The client identifier + */ + void retryPendingDeliveriesForClient(String clientId); + /** * Clean up expired and completed deliveries. * Called by scheduled task. diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java index c3a00cf..ff587af 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java +++ b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java @@ -6,6 +6,7 @@ import de.assecutor.votianlt.messaging.plugin.PluginManager; import de.assecutor.votianlt.messaging.plugin.SendOptions; import de.assecutor.votianlt.repository.MessageEnvelopeRepository; import de.assecutor.votianlt.repository.PendingDeliveryRepository; +import de.assecutor.votianlt.service.ClientConnectionService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -45,6 +46,7 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { private final AcknowledgmentHandler acknowledgmentHandler; private final DeliveryConfig config; private final ObjectMapper objectMapper; + private final ClientConnectionService clientConnectionService; // In-memory tracking of messages awaiting acknowledgment private final Map pendingAckMessages = new ConcurrentHashMap<>(); @@ -77,13 +79,15 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { MessageEnvelopeRepository envelopeRepository, AcknowledgmentHandler acknowledgmentHandler, DeliveryConfig config, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, + ClientConnectionService clientConnectionService) { this.pluginManager = pluginManager; this.pendingDeliveryRepository = pendingDeliveryRepository; this.envelopeRepository = envelopeRepository; this.acknowledgmentHandler = acknowledgmentHandler; this.config = config; this.objectMapper = objectMapper; + this.clientConnectionService = clientConnectionService; } @PostConstruct @@ -133,6 +137,13 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { List toRemove = new ArrayList<>(); for (PendingAckMessage pending : pendingAckMessages.values()) { + // Only retry if client is connected + if (!clientConnectionService.isClientConnected(pending.clientId)) { + log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected", + pending.messageId, pending.clientId); + continue; + } + pending.retryCount++; if (pending.retryCount > ackMaxRetries) { @@ -351,6 +362,59 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { } } + @Override + public void retryPendingDeliveriesForClient(String clientId) { + if (clientId == null || clientId.isBlank()) { + return; + } + + log.info("[MessageDelivery] Client {} reconnected - retrying pending messages", clientId); + + // Retry in-memory pending ACK messages for this client + int inMemoryCount = 0; + for (PendingAckMessage pending : pendingAckMessages.values()) { + if (clientId.equals(pending.clientId)) { + inMemoryCount++; + try { + SendOptions sendOptions = SendOptions.reliable(); + pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions) + .thenAccept(v -> log.info("[MessageDelivery] Reconnect retry sent for message {} to client {}", + pending.messageId, clientId)) + .exceptionally(ex -> { + log.error("[MessageDelivery] Reconnect retry failed for message {}: {}", + pending.messageId, ex.getMessage()); + return null; + }); + } catch (Exception e) { + log.error("[MessageDelivery] Error during reconnect retry for message {}: {}", + pending.messageId, e.getMessage(), e); + } + } + } + + // Retry database pending deliveries for this client + try { + List pendingDeliveries = pendingDeliveryRepository + .findByStatusIn(List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT)); + + int dbCount = 0; + for (PendingDelivery pending : pendingDeliveries) { + String topic = pending.getTopic(); + if (topic != null && topic.startsWith(clientId + "/")) { + dbCount++; + retryDelivery(pending); + } + } + + log.info("[MessageDelivery] Triggered retry for client {}: {} in-memory, {} from database", + clientId, inMemoryCount, dbCount); + + } catch (Exception e) { + log.error("[MessageDelivery] Error retrying database deliveries for client {}: {}", + clientId, e.getMessage(), e); + } + } + @Override public void cleanupOldDeliveries() { try { @@ -460,6 +524,13 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService { String clientId = parts[0]; String messageType = parts[1]; + // Only retry if client is connected + if (!clientConnectionService.isClientConnected(clientId)) { + log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected", + pending.getMessageId(), clientId); + return; + } + // Send via plugin manager SendOptions options = SendOptions.reliable(); pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options) diff --git a/src/main/java/de/assecutor/votianlt/repository/JobRepository.java b/src/main/java/de/assecutor/votianlt/repository/JobRepository.java index dd9a549..9535c45 100644 --- a/src/main/java/de/assecutor/votianlt/repository/JobRepository.java +++ b/src/main/java/de/assecutor/votianlt/repository/JobRepository.java @@ -86,8 +86,12 @@ public interface JobRepository extends MongoRepository { long countByIsDraftTrue(); /** - * Findet alle Aufträge, die einem bestimmten App-Nutzer zugewiesen sind + * Findet alle nicht abgeschlossenen Aufträge, die einem bestimmten App-Nutzer zugewiesen sind. + * Excludes jobs with status COMPLETED or CANCELLED. + * Uses explicit query because @Field("app_user") annotation is not always + * respected by Spring Data MongoDB query derivation. */ + @Query("{'app_user': ?0, 'status': {'$nin': ['COMPLETED', 'CANCELLED']}}") List findByAppUser(String appUser); /** diff --git a/src/main/java/de/assecutor/votianlt/repository/PendingDeliveryRepository.java b/src/main/java/de/assecutor/votianlt/repository/PendingDeliveryRepository.java index 3e18b04..7bed118 100644 --- a/src/main/java/de/assecutor/votianlt/repository/PendingDeliveryRepository.java +++ b/src/main/java/de/assecutor/votianlt/repository/PendingDeliveryRepository.java @@ -41,6 +41,11 @@ public interface PendingDeliveryRepository extends MongoRepository findByStatusInAndExpiresAtBefore(List statuses, LocalDateTime dateTime); + /** + * Find deliveries with specific statuses + */ + List findByStatusIn(List statuses); + /** * Find all deliveries for a specific client */ diff --git a/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java new file mode 100644 index 0000000..2444378 --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java @@ -0,0 +1,276 @@ +package de.assecutor.votianlt.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService; +import de.assecutor.votianlt.messaging.plugin.PluginManager; +import de.assecutor.votianlt.messaging.plugin.SendOptions; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Service for managing client connections via Ping/Pong mechanism. + * Tracks connected clients and periodically checks their connectivity. + */ +@Service +@Slf4j +public class ClientConnectionService { + + /** + * Represents the connection state of a client. + */ + public record ClientState( + String clientId, + String userId, + boolean connected, + Instant lastPingSent, + Instant lastPongReceived, + Instant connectedAt + ) { + public ClientState withPingSent(Instant pingSent) { + return new ClientState(clientId, userId, connected, pingSent, lastPongReceived, connectedAt); + } + + public ClientState withPongReceived(Instant pongReceived) { + return new ClientState(clientId, userId, true, lastPingSent, pongReceived, connectedAt); + } + + public ClientState withConnected(boolean isConnected) { + return new ClientState(clientId, userId, isConnected, lastPingSent, lastPongReceived, connectedAt); + } + } + + private final Map connectedClients = new ConcurrentHashMap<>(); + private final PluginManager pluginManager; + private final ObjectMapper objectMapper; + private final MessageDeliveryService messageDeliveryService; + + @Value("${app.client.ping.interval-seconds:15}") + private int pingIntervalSeconds; + + @Value("${app.client.ping.timeout-seconds:5}") + private int pingTimeoutSeconds; + + public ClientConnectionService(PluginManager pluginManager, ObjectMapper objectMapper, + @Lazy MessageDeliveryService messageDeliveryService) { + this.pluginManager = pluginManager; + this.objectMapper = objectMapper; + this.messageDeliveryService = messageDeliveryService; + } + + /** + * Registers a client as connected after successful login. + * + * @param clientId The unique client identifier + * @param userId The user ID associated with this client + */ + public void registerClient(String clientId, String userId) { + if (clientId == null || clientId.isBlank()) { + log.warn("Cannot register client with null or blank clientId"); + return; + } + + ClientState previousState = connectedClients.get(clientId); + boolean wasDisconnected = previousState != null && !previousState.connected(); + + Instant now = Instant.now(); + ClientState state = new ClientState(clientId, userId, true, null, now, now); + connectedClients.put(clientId, state); + log.info("[ClientConnectionService] Client registered: clientId={}, userId={}, totalClients={}", + clientId, userId, connectedClients.size()); + + // If client was previously disconnected, retry pending messages + if (wasDisconnected) { + log.info("Client {} re-registered after disconnect - triggering pending message retry", clientId); + messageDeliveryService.retryPendingDeliveriesForClient(clientId); + } + } + + /** + * Unregisters a client (e.g., on explicit logout). + * + * @param clientId The client identifier to unregister + */ + public void unregisterClient(String clientId) { + ClientState removed = connectedClients.remove(clientId); + if (removed != null) { + log.info("Client unregistered: clientId={}, userId={}", clientId, removed.userId()); + } + } + + /** + * Handles a pong response from a client. + * + * @param clientId The client that sent the pong + */ + public void handlePong(String clientId) { + if (clientId == null || clientId.isBlank()) { + log.warn("Received pong from null or blank clientId"); + return; + } + + ClientState state = connectedClients.get(clientId); + if (state != null) { + boolean wasDisconnected = !state.connected(); + ClientState updatedState = state.withPongReceived(Instant.now()); + connectedClients.put(clientId, updatedState); + log.debug("Pong received from client: clientId={}", clientId); + + // If client was disconnected and is now reconnected, retry pending messages + if (wasDisconnected) { + log.info("Client {} reconnected via pong - triggering pending message retry", clientId); + messageDeliveryService.retryPendingDeliveriesForClient(clientId); + } + } else { + log.warn("Received pong from unknown client: clientId={}", clientId); + } + } + + /** + * Checks if a client is currently connected. + * + * @param clientId The client identifier + * @return true if the client is connected + */ + public boolean isClientConnected(String clientId) { + ClientState state = connectedClients.get(clientId); + return state != null && state.connected(); + } + + /** + * Gets all connected client IDs. + * + * @return Set of connected client IDs + */ + public Set getConnectedClientIds() { + return connectedClients.entrySet().stream() + .filter(e -> e.getValue().connected()) + .map(Map.Entry::getKey) + .collect(java.util.stream.Collectors.toSet()); + } + + /** + * Gets the connection state for a specific client. + * + * @param clientId The client identifier + * @return ClientState or null if not found + */ + public ClientState getClientState(String clientId) { + return connectedClients.get(clientId); + } + + /** + * Scheduled task to send pings to all connected clients. + * Runs based on the configured interval (app.client.ping.interval-seconds). + */ + @Scheduled(fixedRateString = "${app.client.ping.interval-seconds:15}000") + public void sendPingsToAllClients() { + log.info("[ClientConnectionService] Ping cycle started - pluginConnected={}, connectedClients={}", + pluginManager.isConnected(), connectedClients.size()); + + if (!pluginManager.isConnected()) { + log.info("[ClientConnectionService] Plugin not connected, skipping ping cycle"); + return; + } + + if (connectedClients.isEmpty()) { + log.info("[ClientConnectionService] No connected clients, skipping ping cycle"); + return; + } + + log.info("[ClientConnectionService] Starting ping cycle for {} clients", connectedClients.size()); + Instant now = Instant.now(); + + for (Map.Entry entry : connectedClients.entrySet()) { + String clientId = entry.getKey(); + ClientState state = entry.getValue(); + + // Check if previous ping timed out + if (state.lastPingSent() != null && state.connected()) { + Instant expectedPongBy = state.lastPingSent().plusSeconds(pingTimeoutSeconds); + boolean pongReceivedAfterPing = state.lastPongReceived() != null + && state.lastPongReceived().isAfter(state.lastPingSent()); + + if (now.isAfter(expectedPongBy) && !pongReceivedAfterPing) { + // Client did not respond in time - mark as disconnected + ClientState disconnectedState = state.withConnected(false); + connectedClients.put(clientId, disconnectedState); + log.warn("Client timed out, marking as disconnected: clientId={}, userId={}", + clientId, state.userId()); + continue; + } + } + + // Send ping to connected clients + if (state.connected()) { + log.info("[ClientConnectionService] Sending ping to client: {}", clientId); + sendPing(clientId); + ClientState updatedState = state.withPingSent(now); + connectedClients.put(clientId, updatedState); + } + } + } + + /** + * Sends a ping message to a specific client. + * + * @param clientId The target client + */ + private void sendPing(String clientId) { + try { + Map pingPayload = Map.of( + "type", "ping", + "timestamp", Instant.now().toEpochMilli() + ); + + String json = objectMapper.writeValueAsString(pingPayload); + byte[] payload = json.getBytes(StandardCharsets.UTF_8); + log.info("[ClientConnectionService] Ping payload for client {}: {}", clientId, json); + + SendOptions options = SendOptions.builder() + .qos(1) + .retained(false) + .build(); + + pluginManager.sendToClient(clientId, "ping", payload, options) + .whenComplete((result, error) -> { + if (error != null) { + log.warn("[ClientConnectionService] Failed to send ping to client {}: {}", clientId, error.getMessage()); + } else { + log.info("[ClientConnectionService] Ping sent successfully to client: {}", clientId); + } + }); + + } catch (Exception e) { + log.error("Error sending ping to client {}: {}", clientId, e.getMessage(), e); + } + } + + /** + * Gets the number of currently connected clients. + * + * @return Number of connected clients + */ + public int getConnectedClientCount() { + return (int) connectedClients.values().stream() + .filter(ClientState::connected) + .count(); + } + + /** + * Gets the total number of registered clients (connected and disconnected). + * + * @return Total number of registered clients + */ + public int getTotalClientCount() { + return connectedClients.size(); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 822139b..a943b92 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -20,6 +20,8 @@ spring.mustache.check-template-location=false # Launch the default browser when starting the application in development mode vaadin.launch-browser=true +# Disable Vaadin Copilot to avoid NullPointerException in dev mode +vaadin.copilot.enabled=false # To improve the performance during development. # For more information https://vaadin.com/docs/latest/flow/integrations/spring/configuration#special-configuration-parameters vaadin.allowed-packages=com.vaadin,org.vaadin,de.assecutor.votianlt @@ -78,5 +80,24 @@ app.messaging.plugin.mqtt.username=app app.messaging.plugin.mqtt.password=apppwd app.messaging.plugin.mqtt.client.id=votianlt-server +# Client Connection Monitoring (Ping/Pong) +# Server sends ping to: /client/{clientId}/ping +# Client responds to: /server/{clientId}/pong +# +# Ping JSON (Server -> Client): +# { +# "type": "ping", +# "timestamp": 1702835000000 +# } +# +# Pong JSON (Client -> Server): +# { +# "type": "pong", +# "timestamp": 1702835000000 +# } +# +app.client.ping.interval-seconds=15 +app.client.ping.timeout-seconds=5 + # Application Version - automatically set from pom.xml during build app.version=@project.version@ \ No newline at end of file