diff --git a/pom.xml b/pom.xml
index 31a5c78..61256ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,11 @@
2.0.1
-
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+
diff --git a/src/main/java/de/assecutor/votianlt/controller/MessageController.java b/src/main/java/de/assecutor/votianlt/controller/MessageController.java
index 7905ccf..42f6048 100644
--- a/src/main/java/de/assecutor/votianlt/controller/MessageController.java
+++ b/src/main/java/de/assecutor/votianlt/controller/MessageController.java
@@ -25,6 +25,7 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* MQTT message controller for handling real-time communication with apps.
@@ -34,6 +35,9 @@ import java.util.Map;
@Slf4j
public class MessageController {
+ // Map to store userId -> clientId mapping for active sessions
+ private final Map userClientIdMapping = new ConcurrentHashMap<>();
+
@Autowired
private MqttPublisher mqttPublisher;
@@ -97,92 +101,88 @@ public class MessageController {
}
/**
- * Send notification to specific user
+ * Send notification to specific user (removed MQTT publishing)
*/
public void sendNotificationToUser(String username, String message) {
- Map notification = Map.of(
- "message", message,
- "timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
- "type", "notification"
- );
-
- log.info("Sending notification to user '{}': {}", username, notification);
- mqttPublisher.publishAsJson("v1/users/" + username + "/notifications", notification, true);
- log.info("Notification sent to '/user/{}/queue/notifications'", username);
+ log.info("Notification for user '{}': {}", username, message);
+ // Note: MQTT notification publishing has been removed
}
/**
- * Send broadcast message to all connected clients
+ * Send broadcast message to all connected clients (removed MQTT publishing)
*/
public void sendBroadcastMessage(String message) {
- Map broadcast = Map.of(
- "message", message,
- "timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
- "type", "broadcast"
- );
-
- log.info("Sending broadcast message: {}", broadcast);
- mqttPublisher.publishAsJson("v1/broadcasts", broadcast);
- log.info("Broadcast message sent to '/topic/broadcasts'");
+ log.info("Broadcast message: {}", message);
+ // Note: MQTT broadcast publishing has been removed
}
/**
* Authentication endpoint for mobile app users via MQTT.
- * Client sends to /app/auth/login with payload { email, password }.
- * The response is sent back to the requesting user on /user/queue/auth
+ * Client sends to /server/login with payload { email, password, clientId }.
+ * The response is sent back to the requesting client on /client/{clientId}/auth
*/
- public AppLoginResponse handleAppLogin(AppLoginRequest request) {
- log.info("MQTT Endpoint '/app/auth/login' called with email: {}",
- request != null ? request.getEmail() : "null");
-
- if (request == null || request.getEmail() == null || request.getPassword() == null
- || request.getEmail().isBlank() || request.getPassword().isBlank()) {
- AppLoginResponse response = new AppLoginResponse(false, "E-Mail und Passwort sind erforderlich", null, null, null);
- log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'",
- false, "E-Mail und Passwort sind erforderlich");
- return response;
+ public void handleAppLogin(AppLoginRequest request) {
+ log.info("MQTT Endpoint '/server/login' called with email: {}, clientId: {}",
+ request != null ? request.getEmail() : "null",
+ request != null ? request.getClientId() : "null");
+
+ AppLoginResponse response;
+
+ if (request == null || request.getEmail() == null || request.getPassword() == null || request.getClientId() == null
+ || request.getEmail().isBlank() || request.getPassword().isBlank() || request.getClientId().isBlank()) {
+ response = new AppLoginResponse(false, "E-Mail, Passwort und Client-ID sind erforderlich", null, null, null);
+ } else {
+ AppUser user = appUserRepository.findByEmail(request.getEmail());
+ if (user == null) {
+ response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null);
+ } else {
+ boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword());
+ if (!ok) {
+ response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null);
+ } else {
+ response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString());
+ // Store clientId mapping for this user session
+ storeClientIdMapping(user.getIdAsString(), request.getClientId());
+ }
+ }
}
- AppUser user = appUserRepository.findByEmail(request.getEmail());
- if (user == null) {
- AppLoginResponse response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null);
- log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'",
- false, "Benutzer nicht gefunden");
- return response;
+ // Send response via MQTT to specific client
+ if (request != null && request.getClientId() != null && !request.getClientId().isBlank()) {
+ mqttPublisher.publishAsJson("/client/" + request.getClientId() + "/auth", response, false);
+ log.info("MQTT Response sent to '/client/{}/auth': success={}, message='{}'",
+ request.getClientId(), response.isSuccess(), response.getMessage());
}
-
- boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword());
- if (!ok) {
- AppLoginResponse response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null);
- log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'",
- false, "Ungültige Anmeldedaten");
- return response;
- }
-
- AppLoginResponse response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString());
- log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}', appUserId='{}'",
- true, "Anmeldung erfolgreich", response.getAppUserId());
- return response;
}
/**
* Endpoint to retrieve jobs assigned to a specific app user with related cargo items and tasks.
- * Client sends to /app/jobs/assigned with payload { appUserId }.
- * The response is sent back to the requesting user on /user/queue/jobs
+ * Client sends to /server/{clientId}/jobs/assigned with payload { appUserId }.
+ * The response is sent back to the requesting client on /client/{clientId}/jobs
*/
- public List handleGetAssignedJobs(Map request) {
- log.info("MQTT Endpoint '/app/jobs/assigned' called with data: {}", request);
+ public void handleGetAssignedJobs(Map request) {
+ log.info("MQTT Endpoint '/server/{clientId}/jobs/assigned' called with data: {}", request);
log.debug("Starting to process jobs request for MQTT endpoint");
if (request == null || !request.containsKey("appUserId")) {
- log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': empty list (no appUserId provided)");
- return List.of(); // Return empty list if no appUserId provided
+ log.info("Assigned jobs request missing appUserId; returning empty list");
+ return; // Return empty list if no appUserId provided
}
String appUserId = request.get("appUserId").toString();
if (appUserId == null || appUserId.isBlank()) {
- log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': empty list (appUserId is blank)");
- return List.of(); // Return empty list if appUserId is blank
+ log.info("Assigned jobs request blank appUserId; returning empty list");
+ return; // Return empty list if appUserId is blank
+ }
+
+ // Attempt to get clientId from request (injected from topic) or from stored mapping
+ String clientId = null;
+ try {
+ Object cid = request.get("clientId");
+ if (cid != null) clientId = cid.toString();
+ } catch (Exception ignored) {}
+ if (clientId == null || clientId.isBlank()) {
+ clientId = getClientIdForUserId(appUserId);
}
// Find jobs assigned to this app user
@@ -205,8 +205,14 @@ public class MessageController {
})
.toList();
- log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': {} jobs with related data found for appUserId='{}'",
- jobsWithRelatedData.size(), appUserId);
+ // Publish to the requesting client's topic if clientId is known
+ if (clientId != null && !clientId.isBlank()) {
+ String topic = "/client/" + clientId + "/jobs";
+ mqttPublisher.publishAsJson(topic, jobsWithRelatedData, false);
+ log.info("Published {} assigned jobs for appUserId='{}' to topic '{}'", jobsWithRelatedData.size(), appUserId, topic);
+ } else {
+ log.warn("No clientId available to publish assigned jobs for appUserId='{}'. Skipping MQTT publish.", appUserId);
+ }
// Log complete JSON for debugging
log.debug("About to serialize {} jobs to JSON for logging", jobsWithRelatedData.size());
@@ -216,6 +222,7 @@ public class MessageController {
String jsonOutput = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobsWithRelatedData);
log.info("=== COMPLETE JSON RESPONSE FOR MQTT CLIENT ===");
log.info("AppUserId: {}", appUserId);
+ log.info("ClientId: {}", clientId);
log.info("Number of jobs: {}", jobsWithRelatedData.size());
log.info("JSON Data:\n{}", jsonOutput);
log.info("=== END JSON RESPONSE ===");
@@ -223,7 +230,6 @@ public class MessageController {
log.error("Failed to serialize jobs to JSON for logging: {}", e.getMessage(), e);
}
- return jobsWithRelatedData;
}
/**
@@ -370,8 +376,8 @@ public class MessageController {
event.put("event", "taskCompleted");
event.put("taskType", task.getTaskType());
- // Publish to MQTT task topic
- mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event);
+ // Task event publishing has been removed
+ log.info("Task completed: taskId={}, taskType={}, completed={}", task.getIdAsString(), task.getTaskType(), task.isCompleted());
response.put("success", true);
response.putAll(event);
@@ -444,8 +450,8 @@ public class MessageController {
event.put("event", "taskCompleted");
event.put("taskType", task.getTaskType());
- // Publish to MQTT task topic
- mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event);
+ // Task event publishing has been removed
+ log.info("Task completed: taskId={}, taskType={}, completed={}", task.getIdAsString(), task.getTaskType(), task.isCompleted());
response.put("success", true);
response.putAll(event);
@@ -462,4 +468,36 @@ public class MessageController {
return response;
}
}
+
+ /**
+ * Helper method to get the user ID for a task by looking up the associated job
+ */
+ private String getUserIdForTask(BaseTask task) {
+ try {
+ java.util.Optional jobOpt = jobRepository.findById(task.getJobId());
+ if (jobOpt.isPresent()) {
+ Job job = jobOpt.get();
+ return job.getAppUser();
+ }
+ return null;
+ } catch (Exception e) {
+ log.error("Error getting user ID for task {}: {}", task.getIdAsString(), e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * Store the mapping between userId and clientId for active session
+ */
+ private void storeClientIdMapping(String userId, String clientId) {
+ userClientIdMapping.put(userId, clientId);
+ log.debug("Stored clientId mapping: userId={} -> clientId={}", userId, clientId);
+ }
+
+ /**
+ * Get the clientId for a given userId
+ */
+ private String getClientIdForUserId(String userId) {
+ return userClientIdMapping.get(userId);
+ }
}
\ No newline at end of file
diff --git a/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java b/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java
index a98d0f4..f6bdc28 100644
--- a/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java
+++ b/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java
@@ -10,4 +10,5 @@ import lombok.NoArgsConstructor;
public class AppLoginRequest {
private String email;
private String password;
+ private String clientId;
}
\ No newline at end of file
diff --git a/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java b/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java
index 3c65a40..6b59506 100644
--- a/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java
+++ b/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java
@@ -1,6 +1,7 @@
package de.assecutor.votianlt.mqtt;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Lazy;
@@ -20,11 +21,13 @@ public interface MqttPublisher {
@Slf4j
class MqttPublisherImpl implements MqttPublisher {
- private final ObjectMapper objectMapper = new ObjectMapper();
+ private final ObjectMapper objectMapper;
private final MqttV5ClientManager clientManager;
public MqttPublisherImpl(@Lazy MqttV5ClientManager clientManager) {
this.clientManager = clientManager;
+ this.objectMapper = new ObjectMapper();
+ this.objectMapper.registerModule(new JavaTimeModule());
}
@Override
@@ -39,7 +42,14 @@ class MqttPublisherImpl implements MqttPublisher {
byte[] bytes = json.getBytes(java.nio.charset.StandardCharsets.UTF_8);
// Default QoS 2
clientManager.publish(topic, bytes, 2, retained);
- log.debug("[MQTT v5] published topic={} retained={} bytes={}", topic, retained, bytes.length);
+
+ // Log all published JSON documents
+ log.info("=== MQTT JSON PUBLISHED ===");
+ log.info("Topic: {}", topic);
+ log.info("Retained: {}", retained);
+ log.info("JSON Data: {}", json);
+ log.info("=== END MQTT PUBLISH ===");
+
} catch (Exception e) {
log.error("Failed to serialize/publish MQTT message for topic {}: {}", topic, e.getMessage(), e);
}
diff --git a/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java b/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java
index f4b0324..4454514 100644
--- a/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java
+++ b/src/main/java/de/assecutor/votianlt/mqtt/MqttV5ClientManager.java
@@ -86,13 +86,12 @@ public class MqttV5ClientManager implements SmartLifecycle {
// Subscribe to topics with QoS
String[] topics = new String[]{
- "v1/app/+/task/photo/completed",
- "v1/app/+/task/confirm",
- "v1/app/+/task/completed",
- "v1/app/+/job/status",
- "v1/app/+/device/location",
- "v1/app/+/jobs/assigned",
- "v1/app/+/auth/login"
+ "/server/+/task/photo/completed",
+ "/server/+/task/confirm",
+ "/server/+/task/completed",
+ "/server/+/job/status",
+ "/server/+/jobs/assigned",
+ "/server/login"
};
MqttQos qos = mapQos(props.getDefaultQos());
for (String topic : topics) {
@@ -131,19 +130,29 @@ public class MqttV5ClientManager implements SmartLifecycle {
private void routeInbound(String topic, Map payload) {
try {
- if (topic.matches("v1/app/.+/task/photo/completed")) {
+ if (topic.matches("/server/.+/task/photo/completed")) {
messageController.handlePhotoTaskCompleted(payload);
- } else if (topic.matches("v1/app/.+/task/confirm")) {
+ } else if (topic.matches("/server/.+/task/confirm")) {
messageController.handleTaskConfirmation(payload);
- } else if (topic.matches("v1/app/.+/task/completed")) {
+ } else if (topic.matches("/server/.+/task/completed")) {
messageController.handleTaskCompleted(payload);
- } else if (topic.matches("v1/app/.+/job/status")) {
+ } else if (topic.matches("/server/.+/job/status")) {
messageController.handleJobStatusUpdate(payload);
- } else if (topic.matches("v1/app/.+/device/location")) {
- messageController.handleDeviceLocation(payload);
- } else if (topic.matches("v1/app/.+/jobs/assigned")) {
+ } else if (topic.matches("/server/.+/jobs/assigned")) {
+ // Extract clientId from topic: /server/{clientId}/jobs/assigned
+ try {
+ String[] parts = topic.split("/");
+ if (parts.length >= 5 && "server".equals(parts[1])) {
+ String clientId = parts[2];
+ if (clientId != null && !clientId.isBlank()) {
+ payload.put("clientId", clientId);
+ }
+ }
+ } catch (Exception ignore) {
+ // ignore extraction errors
+ }
messageController.handleGetAssignedJobs(payload);
- } else if (topic.matches("v1/app/.+/auth/login")) {
+ } else if (topic.equals("/server/login")) {
var om = new ObjectMapper();
de.assecutor.votianlt.dto.AppLoginRequest req = om.convertValue(payload, de.assecutor.votianlt.dto.AppLoginRequest.class);
messageController.handleAppLogin(req);