From b58c3e2398d96b4b3645a8a07a3849cd227e5a29 Mon Sep 17 00:00:00 2001 From: Sven Carstensen Date: Sun, 14 Sep 2025 17:11:36 +0200 Subject: [PATCH] MQTT --- pom.xml | 6 +- .../controller/MessageController.java | 170 +++++++++++------- .../votianlt/dto/AppLoginRequest.java | 1 + .../votianlt/mqtt/MqttPublisher.java | 14 +- .../votianlt/mqtt/MqttV5ClientManager.java | 39 ++-- 5 files changed, 146 insertions(+), 84 deletions(-) diff --git a/pom.xml b/pom.xml index 31a5c78..61256ac 100644 --- a/pom.xml +++ b/pom.xml @@ -109,7 +109,11 @@ 2.0.1 - + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + diff --git a/src/main/java/de/assecutor/votianlt/controller/MessageController.java b/src/main/java/de/assecutor/votianlt/controller/MessageController.java index 7905ccf..42f6048 100644 --- a/src/main/java/de/assecutor/votianlt/controller/MessageController.java +++ b/src/main/java/de/assecutor/votianlt/controller/MessageController.java @@ -25,6 +25,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * MQTT message controller for handling real-time communication with apps. @@ -34,6 +35,9 @@ import java.util.Map; @Slf4j public class MessageController { + // Map to store userId -> clientId mapping for active sessions + private final Map userClientIdMapping = new ConcurrentHashMap<>(); + @Autowired private MqttPublisher mqttPublisher; @@ -97,92 +101,88 @@ public class MessageController { } /** - * Send notification to specific user + * Send notification to specific user (removed MQTT publishing) */ public void sendNotificationToUser(String username, String message) { - Map notification = Map.of( - "message", message, - "timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), - "type", "notification" - ); - - log.info("Sending notification to user '{}': {}", username, notification); - mqttPublisher.publishAsJson("v1/users/" + username + "/notifications", notification, true); - log.info("Notification sent to '/user/{}/queue/notifications'", username); + log.info("Notification for user '{}': {}", username, message); + // Note: MQTT notification publishing has been removed } /** - * Send broadcast message to all connected clients + * Send broadcast message to all connected clients (removed MQTT publishing) */ public void sendBroadcastMessage(String message) { - Map broadcast = Map.of( - "message", message, - "timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), - "type", "broadcast" - ); - - log.info("Sending broadcast message: {}", broadcast); - mqttPublisher.publishAsJson("v1/broadcasts", broadcast); - log.info("Broadcast message sent to '/topic/broadcasts'"); + log.info("Broadcast message: {}", message); + // Note: MQTT broadcast publishing has been removed } /** * Authentication endpoint for mobile app users via MQTT. - * Client sends to /app/auth/login with payload { email, password }. - * The response is sent back to the requesting user on /user/queue/auth + * Client sends to /server/login with payload { email, password, clientId }. + * The response is sent back to the requesting client on /client/{clientId}/auth */ - public AppLoginResponse handleAppLogin(AppLoginRequest request) { - log.info("MQTT Endpoint '/app/auth/login' called with email: {}", - request != null ? request.getEmail() : "null"); - - if (request == null || request.getEmail() == null || request.getPassword() == null - || request.getEmail().isBlank() || request.getPassword().isBlank()) { - AppLoginResponse response = new AppLoginResponse(false, "E-Mail und Passwort sind erforderlich", null, null, null); - log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'", - false, "E-Mail und Passwort sind erforderlich"); - return response; + public void handleAppLogin(AppLoginRequest request) { + log.info("MQTT Endpoint '/server/login' called with email: {}, clientId: {}", + request != null ? request.getEmail() : "null", + request != null ? request.getClientId() : "null"); + + AppLoginResponse response; + + if (request == null || request.getEmail() == null || request.getPassword() == null || request.getClientId() == null + || request.getEmail().isBlank() || request.getPassword().isBlank() || request.getClientId().isBlank()) { + response = new AppLoginResponse(false, "E-Mail, Passwort und Client-ID sind erforderlich", null, null, null); + } else { + AppUser user = appUserRepository.findByEmail(request.getEmail()); + if (user == null) { + response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null); + } else { + boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword()); + if (!ok) { + response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null); + } else { + response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString()); + // Store clientId mapping for this user session + storeClientIdMapping(user.getIdAsString(), request.getClientId()); + } + } } - AppUser user = appUserRepository.findByEmail(request.getEmail()); - if (user == null) { - AppLoginResponse response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null); - log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'", - false, "Benutzer nicht gefunden"); - return response; + // Send response via MQTT to specific client + if (request != null && request.getClientId() != null && !request.getClientId().isBlank()) { + mqttPublisher.publishAsJson("/client/" + request.getClientId() + "/auth", response, false); + log.info("MQTT Response sent to '/client/{}/auth': success={}, message='{}'", + request.getClientId(), response.isSuccess(), response.getMessage()); } - - boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword()); - if (!ok) { - AppLoginResponse response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null); - log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'", - false, "Ungültige Anmeldedaten"); - return response; - } - - AppLoginResponse response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString()); - log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}', appUserId='{}'", - true, "Anmeldung erfolgreich", response.getAppUserId()); - return response; } /** * Endpoint to retrieve jobs assigned to a specific app user with related cargo items and tasks. - * Client sends to /app/jobs/assigned with payload { appUserId }. - * The response is sent back to the requesting user on /user/queue/jobs + * Client sends to /server/{clientId}/jobs/assigned with payload { appUserId }. + * The response is sent back to the requesting client on /client/{clientId}/jobs */ - public List handleGetAssignedJobs(Map request) { - log.info("MQTT Endpoint '/app/jobs/assigned' called with data: {}", request); + public void handleGetAssignedJobs(Map request) { + log.info("MQTT Endpoint '/server/{clientId}/jobs/assigned' called with data: {}", request); log.debug("Starting to process jobs request for MQTT endpoint"); if (request == null || !request.containsKey("appUserId")) { - log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': empty list (no appUserId provided)"); - return List.of(); // Return empty list if no appUserId provided + log.info("Assigned jobs request missing appUserId; returning empty list"); + return; // Return empty list if no appUserId provided } String appUserId = request.get("appUserId").toString(); if (appUserId == null || appUserId.isBlank()) { - log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': empty list (appUserId is blank)"); - return List.of(); // Return empty list if appUserId is blank + log.info("Assigned jobs request blank appUserId; returning empty list"); + return; // Return empty list if appUserId is blank + } + + // Attempt to get clientId from request (injected from topic) or from stored mapping + String clientId = null; + try { + Object cid = request.get("clientId"); + if (cid != null) clientId = cid.toString(); + } catch (Exception ignored) {} + if (clientId == null || clientId.isBlank()) { + clientId = getClientIdForUserId(appUserId); } // Find jobs assigned to this app user @@ -205,8 +205,14 @@ public class MessageController { }) .toList(); - log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': {} jobs with related data found for appUserId='{}'", - jobsWithRelatedData.size(), appUserId); + // Publish to the requesting client's topic if clientId is known + if (clientId != null && !clientId.isBlank()) { + String topic = "/client/" + clientId + "/jobs"; + mqttPublisher.publishAsJson(topic, jobsWithRelatedData, false); + log.info("Published {} assigned jobs for appUserId='{}' to topic '{}'", jobsWithRelatedData.size(), appUserId, topic); + } else { + log.warn("No clientId available to publish assigned jobs for appUserId='{}'. Skipping MQTT publish.", appUserId); + } // Log complete JSON for debugging log.debug("About to serialize {} jobs to JSON for logging", jobsWithRelatedData.size()); @@ -216,6 +222,7 @@ public class MessageController { String jsonOutput = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobsWithRelatedData); log.info("=== COMPLETE JSON RESPONSE FOR MQTT CLIENT ==="); log.info("AppUserId: {}", appUserId); + log.info("ClientId: {}", clientId); log.info("Number of jobs: {}", jobsWithRelatedData.size()); log.info("JSON Data:\n{}", jsonOutput); log.info("=== END JSON RESPONSE ==="); @@ -223,7 +230,6 @@ public class MessageController { log.error("Failed to serialize jobs to JSON for logging: {}", e.getMessage(), e); } - return jobsWithRelatedData; } /** @@ -370,8 +376,8 @@ public class MessageController { event.put("event", "taskCompleted"); event.put("taskType", task.getTaskType()); - // Publish to MQTT task topic - mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event); + // Task event publishing has been removed + log.info("Task completed: taskId={}, taskType={}, completed={}", task.getIdAsString(), task.getTaskType(), task.isCompleted()); response.put("success", true); response.putAll(event); @@ -444,8 +450,8 @@ public class MessageController { event.put("event", "taskCompleted"); event.put("taskType", task.getTaskType()); - // Publish to MQTT task topic - mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event); + // Task event publishing has been removed + log.info("Task completed: taskId={}, taskType={}, completed={}", task.getIdAsString(), task.getTaskType(), task.isCompleted()); response.put("success", true); response.putAll(event); @@ -462,4 +468,36 @@ public class MessageController { return response; } } + + /** + * Helper method to get the user ID for a task by looking up the associated job + */ + private String getUserIdForTask(BaseTask task) { + try { + java.util.Optional jobOpt = jobRepository.findById(task.getJobId()); + if (jobOpt.isPresent()) { + Job job = jobOpt.get(); + return job.getAppUser(); + } + return null; + } catch (Exception e) { + log.error("Error getting user ID for task {}: {}", task.getIdAsString(), e.getMessage(), e); + return null; + } + } + + /** + * Store the mapping between userId and clientId for active session + */ + private void storeClientIdMapping(String userId, String clientId) { + userClientIdMapping.put(userId, clientId); + log.debug("Stored clientId mapping: userId={} -> clientId={}", userId, clientId); + } + + /** + * Get the clientId for a given userId + */ + private String getClientIdForUserId(String userId) { + return userClientIdMapping.get(userId); + } } \ No newline at end of file diff --git a/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java b/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java index a98d0f4..f6bdc28 100644 --- a/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java +++ b/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java @@ -10,4 +10,5 @@ import lombok.NoArgsConstructor; public class AppLoginRequest { private String email; private String password; + private String clientId; } \ No newline at end of file diff --git a/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java b/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java index 3c65a40..6b59506 100644 --- a/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java +++ b/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java @@ -1,6 +1,7 @@ package de.assecutor.votianlt.mqtt; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.context.annotation.Lazy; @@ -20,11 +21,13 @@ public interface MqttPublisher { @Slf4j class MqttPublisherImpl implements MqttPublisher { - private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper; private final MqttV5ClientManager clientManager; public MqttPublisherImpl(@Lazy MqttV5ClientManager clientManager) { this.clientManager = clientManager; + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); } @Override @@ -39,7 +42,14 @@ class MqttPublisherImpl implements MqttPublisher { byte[] bytes = json.getBytes(java.nio.charset.StandardCharsets.UTF_8); // Default QoS 2 clientManager.publish(topic, bytes, 2, retained); - log.debug("[MQTT v5] published topic={} retained={} bytes={}", topic, retained, bytes.length); + + // Log all published JSON documents + log.info("=== MQTT JSON PUBLISHED ==="); + log.info("Topic: {}", topic); + log.info("Retained: {}", retained); + log.info("JSON Data: {}", json); + log.info("=== END MQTT PUBLISH ==="); + } catch (Exception e) { log.error("Failed to serialize/publish MQTT message for topic {}: {}", topic, e.getMessage(), e); } diff --git a/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java b/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java index f4b0324..4454514 100644 --- a/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java +++ b/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java @@ -86,13 +86,12 @@ public class MqttV5ClientManager implements SmartLifecycle { // Subscribe to topics with QoS String[] topics = new String[]{ - "v1/app/+/task/photo/completed", - "v1/app/+/task/confirm", - "v1/app/+/task/completed", - "v1/app/+/job/status", - "v1/app/+/device/location", - "v1/app/+/jobs/assigned", - "v1/app/+/auth/login" + "/server/+/task/photo/completed", + "/server/+/task/confirm", + "/server/+/task/completed", + "/server/+/job/status", + "/server/+/jobs/assigned", + "/server/login" }; MqttQos qos = mapQos(props.getDefaultQos()); for (String topic : topics) { @@ -131,19 +130,29 @@ public class MqttV5ClientManager implements SmartLifecycle { private void routeInbound(String topic, Map payload) { try { - if (topic.matches("v1/app/.+/task/photo/completed")) { + if (topic.matches("/server/.+/task/photo/completed")) { messageController.handlePhotoTaskCompleted(payload); - } else if (topic.matches("v1/app/.+/task/confirm")) { + } else if (topic.matches("/server/.+/task/confirm")) { messageController.handleTaskConfirmation(payload); - } else if (topic.matches("v1/app/.+/task/completed")) { + } else if (topic.matches("/server/.+/task/completed")) { messageController.handleTaskCompleted(payload); - } else if (topic.matches("v1/app/.+/job/status")) { + } else if (topic.matches("/server/.+/job/status")) { messageController.handleJobStatusUpdate(payload); - } else if (topic.matches("v1/app/.+/device/location")) { - messageController.handleDeviceLocation(payload); - } else if (topic.matches("v1/app/.+/jobs/assigned")) { + } else if (topic.matches("/server/.+/jobs/assigned")) { + // Extract clientId from topic: /server/{clientId}/jobs/assigned + try { + String[] parts = topic.split("/"); + if (parts.length >= 5 && "server".equals(parts[1])) { + String clientId = parts[2]; + if (clientId != null && !clientId.isBlank()) { + payload.put("clientId", clientId); + } + } + } catch (Exception ignore) { + // ignore extraction errors + } messageController.handleGetAssignedJobs(payload); - } else if (topic.matches("v1/app/.+/auth/login")) { + } else if (topic.equals("/server/login")) { var om = new ObjectMapper(); de.assecutor.votianlt.dto.AppLoginRequest req = om.convertValue(payload, de.assecutor.votianlt.dto.AppLoginRequest.class); messageController.handleAppLogin(req);