MQTT
This commit is contained in:
6
pom.xml
6
pom.xml
@@ -109,7 +109,11 @@
|
||||
<version>2.0.1</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Jackson JSR310 module for Java 8 date/time support -->
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-jsr310</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* MQTT message controller for handling real-time communication with apps.
|
||||
@@ -34,6 +35,9 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
public class MessageController {
|
||||
|
||||
// Map to store userId -> clientId mapping for active sessions
|
||||
private final Map<String, String> userClientIdMapping = new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
private MqttPublisher mqttPublisher;
|
||||
|
||||
@@ -97,92 +101,88 @@ public class MessageController {
|
||||
}
|
||||
|
||||
/**
|
||||
* Send notification to specific user
|
||||
* Send notification to specific user (removed MQTT publishing)
|
||||
*/
|
||||
public void sendNotificationToUser(String username, String message) {
|
||||
Map<String, Object> notification = Map.of(
|
||||
"message", message,
|
||||
"timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
|
||||
"type", "notification"
|
||||
);
|
||||
|
||||
log.info("Sending notification to user '{}': {}", username, notification);
|
||||
mqttPublisher.publishAsJson("v1/users/" + username + "/notifications", notification, true);
|
||||
log.info("Notification sent to '/user/{}/queue/notifications'", username);
|
||||
log.info("Notification for user '{}': {}", username, message);
|
||||
// Note: MQTT notification publishing has been removed
|
||||
}
|
||||
|
||||
/**
|
||||
* Send broadcast message to all connected clients
|
||||
* Send broadcast message to all connected clients (removed MQTT publishing)
|
||||
*/
|
||||
public void sendBroadcastMessage(String message) {
|
||||
Map<String, Object> broadcast = Map.of(
|
||||
"message", message,
|
||||
"timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
|
||||
"type", "broadcast"
|
||||
);
|
||||
|
||||
log.info("Sending broadcast message: {}", broadcast);
|
||||
mqttPublisher.publishAsJson("v1/broadcasts", broadcast);
|
||||
log.info("Broadcast message sent to '/topic/broadcasts'");
|
||||
log.info("Broadcast message: {}", message);
|
||||
// Note: MQTT broadcast publishing has been removed
|
||||
}
|
||||
|
||||
/**
|
||||
* Authentication endpoint for mobile app users via MQTT.
|
||||
* Client sends to /app/auth/login with payload { email, password }.
|
||||
* The response is sent back to the requesting user on /user/queue/auth
|
||||
* Client sends to /server/login with payload { email, password, clientId }.
|
||||
* The response is sent back to the requesting client on /client/{clientId}/auth
|
||||
*/
|
||||
public AppLoginResponse handleAppLogin(AppLoginRequest request) {
|
||||
log.info("MQTT Endpoint '/app/auth/login' called with email: {}",
|
||||
request != null ? request.getEmail() : "null");
|
||||
public void handleAppLogin(AppLoginRequest request) {
|
||||
log.info("MQTT Endpoint '/server/login' called with email: {}, clientId: {}",
|
||||
request != null ? request.getEmail() : "null",
|
||||
request != null ? request.getClientId() : "null");
|
||||
|
||||
if (request == null || request.getEmail() == null || request.getPassword() == null
|
||||
|| request.getEmail().isBlank() || request.getPassword().isBlank()) {
|
||||
AppLoginResponse response = new AppLoginResponse(false, "E-Mail und Passwort sind erforderlich", null, null, null);
|
||||
log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'",
|
||||
false, "E-Mail und Passwort sind erforderlich");
|
||||
return response;
|
||||
AppLoginResponse response;
|
||||
|
||||
if (request == null || request.getEmail() == null || request.getPassword() == null || request.getClientId() == null
|
||||
|| request.getEmail().isBlank() || request.getPassword().isBlank() || request.getClientId().isBlank()) {
|
||||
response = new AppLoginResponse(false, "E-Mail, Passwort und Client-ID sind erforderlich", null, null, null);
|
||||
} else {
|
||||
AppUser user = appUserRepository.findByEmail(request.getEmail());
|
||||
if (user == null) {
|
||||
response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null);
|
||||
} else {
|
||||
boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword());
|
||||
if (!ok) {
|
||||
response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null);
|
||||
} else {
|
||||
response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString());
|
||||
// Store clientId mapping for this user session
|
||||
storeClientIdMapping(user.getIdAsString(), request.getClientId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
AppUser user = appUserRepository.findByEmail(request.getEmail());
|
||||
if (user == null) {
|
||||
AppLoginResponse response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null);
|
||||
log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'",
|
||||
false, "Benutzer nicht gefunden");
|
||||
return response;
|
||||
// Send response via MQTT to specific client
|
||||
if (request != null && request.getClientId() != null && !request.getClientId().isBlank()) {
|
||||
mqttPublisher.publishAsJson("/client/" + request.getClientId() + "/auth", response, false);
|
||||
log.info("MQTT Response sent to '/client/{}/auth': success={}, message='{}'",
|
||||
request.getClientId(), response.isSuccess(), response.getMessage());
|
||||
}
|
||||
|
||||
boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword());
|
||||
if (!ok) {
|
||||
AppLoginResponse response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null);
|
||||
log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}'",
|
||||
false, "Ungültige Anmeldedaten");
|
||||
return response;
|
||||
}
|
||||
|
||||
AppLoginResponse response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString());
|
||||
log.info("MQTT Response for '/app/auth/login' sent to '/user/queue/auth': success={}, message='{}', appUserId='{}'",
|
||||
true, "Anmeldung erfolgreich", response.getAppUserId());
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Endpoint to retrieve jobs assigned to a specific app user with related cargo items and tasks.
|
||||
* Client sends to /app/jobs/assigned with payload { appUserId }.
|
||||
* The response is sent back to the requesting user on /user/queue/jobs
|
||||
* Client sends to /server/{clientId}/jobs/assigned with payload { appUserId }.
|
||||
* The response is sent back to the requesting client on /client/{clientId}/jobs
|
||||
*/
|
||||
public List<JobWithRelatedDataDTO> handleGetAssignedJobs(Map<String, Object> request) {
|
||||
log.info("MQTT Endpoint '/app/jobs/assigned' called with data: {}", request);
|
||||
public void handleGetAssignedJobs(Map<String, Object> request) {
|
||||
log.info("MQTT Endpoint '/server/{clientId}/jobs/assigned' called with data: {}", request);
|
||||
log.debug("Starting to process jobs request for MQTT endpoint");
|
||||
|
||||
if (request == null || !request.containsKey("appUserId")) {
|
||||
log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': empty list (no appUserId provided)");
|
||||
return List.of(); // Return empty list if no appUserId provided
|
||||
log.info("Assigned jobs request missing appUserId; returning empty list");
|
||||
return; // Return empty list if no appUserId provided
|
||||
}
|
||||
|
||||
String appUserId = request.get("appUserId").toString();
|
||||
if (appUserId == null || appUserId.isBlank()) {
|
||||
log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': empty list (appUserId is blank)");
|
||||
return List.of(); // Return empty list if appUserId is blank
|
||||
log.info("Assigned jobs request blank appUserId; returning empty list");
|
||||
return; // Return empty list if appUserId is blank
|
||||
}
|
||||
|
||||
// Attempt to get clientId from request (injected from topic) or from stored mapping
|
||||
String clientId = null;
|
||||
try {
|
||||
Object cid = request.get("clientId");
|
||||
if (cid != null) clientId = cid.toString();
|
||||
} catch (Exception ignored) {}
|
||||
if (clientId == null || clientId.isBlank()) {
|
||||
clientId = getClientIdForUserId(appUserId);
|
||||
}
|
||||
|
||||
// Find jobs assigned to this app user
|
||||
@@ -205,8 +205,14 @@ public class MessageController {
|
||||
})
|
||||
.toList();
|
||||
|
||||
log.info("MQTT Response for '/app/jobs/assigned' sent to '/user/queue/jobs': {} jobs with related data found for appUserId='{}'",
|
||||
jobsWithRelatedData.size(), appUserId);
|
||||
// Publish to the requesting client's topic if clientId is known
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
String topic = "/client/" + clientId + "/jobs";
|
||||
mqttPublisher.publishAsJson(topic, jobsWithRelatedData, false);
|
||||
log.info("Published {} assigned jobs for appUserId='{}' to topic '{}'", jobsWithRelatedData.size(), appUserId, topic);
|
||||
} else {
|
||||
log.warn("No clientId available to publish assigned jobs for appUserId='{}'. Skipping MQTT publish.", appUserId);
|
||||
}
|
||||
|
||||
// Log complete JSON for debugging
|
||||
log.debug("About to serialize {} jobs to JSON for logging", jobsWithRelatedData.size());
|
||||
@@ -216,6 +222,7 @@ public class MessageController {
|
||||
String jsonOutput = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobsWithRelatedData);
|
||||
log.info("=== COMPLETE JSON RESPONSE FOR MQTT CLIENT ===");
|
||||
log.info("AppUserId: {}", appUserId);
|
||||
log.info("ClientId: {}", clientId);
|
||||
log.info("Number of jobs: {}", jobsWithRelatedData.size());
|
||||
log.info("JSON Data:\n{}", jsonOutput);
|
||||
log.info("=== END JSON RESPONSE ===");
|
||||
@@ -223,7 +230,6 @@ public class MessageController {
|
||||
log.error("Failed to serialize jobs to JSON for logging: {}", e.getMessage(), e);
|
||||
}
|
||||
|
||||
return jobsWithRelatedData;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -370,8 +376,8 @@ public class MessageController {
|
||||
event.put("event", "taskCompleted");
|
||||
event.put("taskType", task.getTaskType());
|
||||
|
||||
// Publish to MQTT task topic
|
||||
mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event);
|
||||
// Task event publishing has been removed
|
||||
log.info("Task completed: taskId={}, taskType={}, completed={}", task.getIdAsString(), task.getTaskType(), task.isCompleted());
|
||||
|
||||
response.put("success", true);
|
||||
response.putAll(event);
|
||||
@@ -444,8 +450,8 @@ public class MessageController {
|
||||
event.put("event", "taskCompleted");
|
||||
event.put("taskType", task.getTaskType());
|
||||
|
||||
// Publish to MQTT task topic
|
||||
mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event);
|
||||
// Task event publishing has been removed
|
||||
log.info("Task completed: taskId={}, taskType={}, completed={}", task.getIdAsString(), task.getTaskType(), task.isCompleted());
|
||||
|
||||
response.put("success", true);
|
||||
response.putAll(event);
|
||||
@@ -462,4 +468,36 @@ public class MessageController {
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to get the user ID for a task by looking up the associated job
|
||||
*/
|
||||
private String getUserIdForTask(BaseTask task) {
|
||||
try {
|
||||
java.util.Optional<Job> jobOpt = jobRepository.findById(task.getJobId());
|
||||
if (jobOpt.isPresent()) {
|
||||
Job job = jobOpt.get();
|
||||
return job.getAppUser();
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.error("Error getting user ID for task {}: {}", task.getIdAsString(), e.getMessage(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the mapping between userId and clientId for active session
|
||||
*/
|
||||
private void storeClientIdMapping(String userId, String clientId) {
|
||||
userClientIdMapping.put(userId, clientId);
|
||||
log.debug("Stored clientId mapping: userId={} -> clientId={}", userId, clientId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the clientId for a given userId
|
||||
*/
|
||||
private String getClientIdForUserId(String userId) {
|
||||
return userClientIdMapping.get(userId);
|
||||
}
|
||||
}
|
||||
@@ -10,4 +10,5 @@ import lombok.NoArgsConstructor;
|
||||
public class AppLoginRequest {
|
||||
private String email;
|
||||
private String password;
|
||||
private String clientId;
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
@@ -20,11 +21,13 @@ public interface MqttPublisher {
|
||||
@Slf4j
|
||||
class MqttPublisherImpl implements MqttPublisher {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private final ObjectMapper objectMapper;
|
||||
private final MqttV5ClientManager clientManager;
|
||||
|
||||
public MqttPublisherImpl(@Lazy MqttV5ClientManager clientManager) {
|
||||
this.clientManager = clientManager;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -39,7 +42,14 @@ class MqttPublisherImpl implements MqttPublisher {
|
||||
byte[] bytes = json.getBytes(java.nio.charset.StandardCharsets.UTF_8);
|
||||
// Default QoS 2
|
||||
clientManager.publish(topic, bytes, 2, retained);
|
||||
log.debug("[MQTT v5] published topic={} retained={} bytes={}", topic, retained, bytes.length);
|
||||
|
||||
// Log all published JSON documents
|
||||
log.info("=== MQTT JSON PUBLISHED ===");
|
||||
log.info("Topic: {}", topic);
|
||||
log.info("Retained: {}", retained);
|
||||
log.info("JSON Data: {}", json);
|
||||
log.info("=== END MQTT PUBLISH ===");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to serialize/publish MQTT message for topic {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
|
||||
@@ -86,13 +86,12 @@ public class MqttV5ClientManager implements SmartLifecycle {
|
||||
|
||||
// Subscribe to topics with QoS
|
||||
String[] topics = new String[]{
|
||||
"v1/app/+/task/photo/completed",
|
||||
"v1/app/+/task/confirm",
|
||||
"v1/app/+/task/completed",
|
||||
"v1/app/+/job/status",
|
||||
"v1/app/+/device/location",
|
||||
"v1/app/+/jobs/assigned",
|
||||
"v1/app/+/auth/login"
|
||||
"/server/+/task/photo/completed",
|
||||
"/server/+/task/confirm",
|
||||
"/server/+/task/completed",
|
||||
"/server/+/job/status",
|
||||
"/server/+/jobs/assigned",
|
||||
"/server/login"
|
||||
};
|
||||
MqttQos qos = mapQos(props.getDefaultQos());
|
||||
for (String topic : topics) {
|
||||
@@ -131,19 +130,29 @@ public class MqttV5ClientManager implements SmartLifecycle {
|
||||
|
||||
private void routeInbound(String topic, Map<String, Object> payload) {
|
||||
try {
|
||||
if (topic.matches("v1/app/.+/task/photo/completed")) {
|
||||
if (topic.matches("/server/.+/task/photo/completed")) {
|
||||
messageController.handlePhotoTaskCompleted(payload);
|
||||
} else if (topic.matches("v1/app/.+/task/confirm")) {
|
||||
} else if (topic.matches("/server/.+/task/confirm")) {
|
||||
messageController.handleTaskConfirmation(payload);
|
||||
} else if (topic.matches("v1/app/.+/task/completed")) {
|
||||
} else if (topic.matches("/server/.+/task/completed")) {
|
||||
messageController.handleTaskCompleted(payload);
|
||||
} else if (topic.matches("v1/app/.+/job/status")) {
|
||||
} else if (topic.matches("/server/.+/job/status")) {
|
||||
messageController.handleJobStatusUpdate(payload);
|
||||
} else if (topic.matches("v1/app/.+/device/location")) {
|
||||
messageController.handleDeviceLocation(payload);
|
||||
} else if (topic.matches("v1/app/.+/jobs/assigned")) {
|
||||
} else if (topic.matches("/server/.+/jobs/assigned")) {
|
||||
// Extract clientId from topic: /server/{clientId}/jobs/assigned
|
||||
try {
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length >= 5 && "server".equals(parts[1])) {
|
||||
String clientId = parts[2];
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
payload.put("clientId", clientId);
|
||||
}
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
// ignore extraction errors
|
||||
}
|
||||
messageController.handleGetAssignedJobs(payload);
|
||||
} else if (topic.matches("v1/app/.+/auth/login")) {
|
||||
} else if (topic.equals("/server/login")) {
|
||||
var om = new ObjectMapper();
|
||||
de.assecutor.votianlt.dto.AppLoginRequest req = om.convertValue(payload, de.assecutor.votianlt.dto.AppLoginRequest.class);
|
||||
messageController.handleAppLogin(req);
|
||||
|
||||
Reference in New Issue
Block a user