MQTT
This commit is contained in:
@@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
import de.assecutor.votianlt.mqtt.MqttPublisher;
|
import de.assecutor.votianlt.mqtt.MqttPublisher;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
@@ -38,82 +37,28 @@ public class MessageController {
|
|||||||
// Map to store userId -> clientId mapping for active sessions
|
// Map to store userId -> clientId mapping for active sessions
|
||||||
private final Map<String, String> userClientIdMapping = new ConcurrentHashMap<>();
|
private final Map<String, String> userClientIdMapping = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Autowired
|
private final MqttPublisher mqttPublisher;
|
||||||
private MqttPublisher mqttPublisher;
|
|
||||||
|
|
||||||
@Autowired
|
private final AppUserRepository appUserRepository;
|
||||||
private AppUserRepository appUserRepository;
|
|
||||||
|
|
||||||
@Autowired
|
private final AppUserService appUserService;
|
||||||
private AppUserService appUserService;
|
|
||||||
|
|
||||||
@Autowired
|
private final JobRepository jobRepository;
|
||||||
private JobRepository jobRepository;
|
|
||||||
|
|
||||||
@Autowired
|
private final CargoItemRepository cargoItemRepository;
|
||||||
private CargoItemRepository cargoItemRepository;
|
|
||||||
|
|
||||||
@Autowired
|
private final TaskRepository taskRepository;
|
||||||
private TaskRepository taskRepository;
|
|
||||||
|
|
||||||
@Autowired
|
private final PhotoRepository photoRepository;
|
||||||
private PhotoRepository photoRepository;
|
|
||||||
|
|
||||||
/**
|
public MessageController(MqttPublisher mqttPublisher, AppUserRepository appUserRepository, AppUserService appUserService, JobRepository jobRepository, CargoItemRepository cargoItemRepository, TaskRepository taskRepository, PhotoRepository photoRepository) {
|
||||||
* Handles messages sent to /app/message and broadcasts them to all subscribers of /topic/messages
|
this.mqttPublisher = mqttPublisher;
|
||||||
*/
|
this.appUserRepository = appUserRepository;
|
||||||
public Map<String, Object> handleMessage(Map<String, Object> message) {
|
this.appUserService = appUserService;
|
||||||
log.error("=== ANY MESSAGE RECEIVED === MQTT Endpoint '/app/message' called");
|
this.jobRepository = jobRepository;
|
||||||
log.info("MQTT Endpoint '/app/message' called with data: {}", message);
|
this.cargoItemRepository = cargoItemRepository;
|
||||||
|
this.taskRepository = taskRepository;
|
||||||
// Add timestamp to the message
|
this.photoRepository = photoRepository;
|
||||||
message.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
|
||||||
message.put("processed", true);
|
|
||||||
|
|
||||||
log.info("MQTT Response for '/app/message' sent to '/topic/messages': {}", message);
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handles job status updates from apps
|
|
||||||
*/
|
|
||||||
public Map<String, Object> handleJobStatusUpdate(Map<String, Object> jobUpdate) {
|
|
||||||
log.info("MQTT Endpoint '/app/job/status' called with data: {}", jobUpdate);
|
|
||||||
|
|
||||||
jobUpdate.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
|
||||||
jobUpdate.put("source", "app");
|
|
||||||
|
|
||||||
log.info("MQTT Response for '/app/job/status' sent to '/topic/job-updates': {}", jobUpdate);
|
|
||||||
return jobUpdate;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handles device location updates from mobile apps
|
|
||||||
*/
|
|
||||||
public Map<String, Object> handleDeviceLocation(Map<String, Object> locationUpdate) {
|
|
||||||
log.info("MQTT Endpoint '/app/device/location' called with data: {}", locationUpdate);
|
|
||||||
|
|
||||||
locationUpdate.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
|
||||||
locationUpdate.put("processed", true);
|
|
||||||
|
|
||||||
log.info("MQTT Response for '/app/device/location' sent to '/topic/device-locations': {}", locationUpdate);
|
|
||||||
return locationUpdate;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send notification to specific user (removed MQTT publishing)
|
|
||||||
*/
|
|
||||||
public void sendNotificationToUser(String username, String message) {
|
|
||||||
log.info("Notification for user '{}': {}", username, message);
|
|
||||||
// Note: MQTT notification publishing has been removed
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send broadcast message to all connected clients (removed MQTT publishing)
|
|
||||||
*/
|
|
||||||
public void sendBroadcastMessage(String message) {
|
|
||||||
log.info("Broadcast message: {}", message);
|
|
||||||
// Note: MQTT broadcast publishing has been removed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -196,10 +141,8 @@ public class MessageController {
|
|||||||
List<BaseTask> tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId());
|
List<BaseTask> tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId());
|
||||||
|
|
||||||
// Log task details for debugging
|
// Log task details for debugging
|
||||||
tasks.forEach(task -> {
|
tasks.forEach(task -> log.info("Task details for job {}: type={}, order={}",
|
||||||
log.info("Task details for job {}: type={}, order={}",
|
job.getId(), task.getTaskType(), task.getTaskOrder()));
|
||||||
job.getId(), task.getTaskType(), task.getTaskOrder());
|
|
||||||
});
|
|
||||||
|
|
||||||
return new JobWithRelatedDataDTO(job, cargoItems, tasks);
|
return new JobWithRelatedDataDTO(job, cargoItems, tasks);
|
||||||
})
|
})
|
||||||
@@ -238,9 +181,9 @@ public class MessageController {
|
|||||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||||
* This endpoint accepts any task type (fallback for GENERIC or unknown types).
|
* This endpoint accepts any task type (fallback for GENERIC or unknown types).
|
||||||
*/
|
*/
|
||||||
public Map<String, Object> handleTaskCompleted(Map<String, Object> payload) {
|
public void handleTaskCompleted(Map<String, Object> payload) {
|
||||||
log.info("MQTT Endpoint '/app/task/completed' called with data: {}", payload);
|
log.info("MQTT Endpoint '/app/task/completed' called with data: {}", payload);
|
||||||
return processTaskCompletion(payload, null); // null means accept any task type
|
processTaskCompletion(payload, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -248,9 +191,9 @@ public class MessageController {
|
|||||||
* Client sends to /app/task/confirm with payload { taskId, completedBy?, note? }.
|
* Client sends to /app/task/confirm with payload { taskId, completedBy?, note? }.
|
||||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||||
*/
|
*/
|
||||||
public Map<String, Object> handleTaskConfirmation(Map<String, Object> payload) {
|
public void handleTaskConfirmation(Map<String, Object> payload) {
|
||||||
log.info("MQTT Endpoint '/app/task/confirm' called with data: {}", payload);
|
log.info("MQTT Endpoint '/app/task/confirm' called with data: {}", payload);
|
||||||
return processTaskCompletion(payload, "CONFIRMATION");
|
processTaskCompletion(payload, "CONFIRMATION");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -259,46 +202,16 @@ public class MessageController {
|
|||||||
* The extraData contains: { photos: base64List, count: base64List.length }
|
* The extraData contains: { photos: base64List, count: base64List.length }
|
||||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||||
*/
|
*/
|
||||||
public Map<String, Object> handlePhotoTaskCompleted(Map<String, Object> payload) {
|
public void handlePhotoTaskCompleted(Map<String, Object> payload) {
|
||||||
log.info("MQTT Endpoint '/app/task/photo/completed' called");
|
log.info("MQTT Endpoint '/app/task/photo/completed' called");
|
||||||
return processPhotoTaskCompletion(payload);
|
processPhotoTaskCompletion(payload);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Report signature task completion from apps.
|
|
||||||
* Client sends to /app/task/signature/completed with payload { taskId, completedBy?, note? }.
|
|
||||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
|
||||||
*/
|
|
||||||
public Map<String, Object> handleSignatureTaskCompleted(Map<String, Object> payload) {
|
|
||||||
log.info("MQTT Endpoint '/app/task/signature/completed' called with data: {}", payload);
|
|
||||||
return processTaskCompletion(payload, "SIGNATURE");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Report barcode task completion from apps.
|
|
||||||
* Client sends to /app/task/barcode/completed with payload { taskId, completedBy?, note? }.
|
|
||||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
|
||||||
*/
|
|
||||||
public Map<String, Object> handleBarcodeTaskCompleted(Map<String, Object> payload) {
|
|
||||||
log.info("MQTT Endpoint '/app/task/barcode/completed' called with data: {}", payload);
|
|
||||||
return processTaskCompletion(payload, "BARCODE");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Report todolist task completion from apps.
|
|
||||||
* Client sends to /app/task/todolist/completed with payload { taskId, completedBy?, note? }.
|
|
||||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
|
||||||
*/
|
|
||||||
public Map<String, Object> handleTodolistTaskCompleted(Map<String, Object> payload) {
|
|
||||||
log.info("MQTT Endpoint '/app/task/todolist/completed' called with data: {}", payload);
|
|
||||||
return processTaskCompletion(payload, "TODOLIST");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specialized method to process photo task completion with extraData handling.
|
* Specialized method to process photo task completion with extraData handling.
|
||||||
* Saves photo data to the photos collection and processes task completion.
|
* Saves photo data to the photos collection and processes task completion.
|
||||||
*/
|
*/
|
||||||
private Map<String, Object> processPhotoTaskCompletion(Map<String, Object> payload) {
|
private void processPhotoTaskCompletion(Map<String, Object> payload) {
|
||||||
Map<String, Object> response = new java.util.HashMap<>();
|
Map<String, Object> response = new java.util.HashMap<>();
|
||||||
response.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
response.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
||||||
response.put("type", "taskCompletedAck");
|
response.put("type", "taskCompletedAck");
|
||||||
@@ -307,7 +220,7 @@ public class MessageController {
|
|||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "taskId ist erforderlich");
|
response.put("message", "taskId ist erforderlich");
|
||||||
log.info("Photo task completion failed: {}", response);
|
log.info("Photo task completion failed: {}", response);
|
||||||
return response;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String taskIdStr = payload.get("taskId").toString();
|
String taskIdStr = payload.get("taskId").toString();
|
||||||
@@ -320,7 +233,7 @@ public class MessageController {
|
|||||||
if (opt.isEmpty()) {
|
if (opt.isEmpty()) {
|
||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Task nicht gefunden");
|
response.put("message", "Task nicht gefunden");
|
||||||
return response;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
BaseTask task = opt.get();
|
BaseTask task = opt.get();
|
||||||
@@ -330,7 +243,7 @@ public class MessageController {
|
|||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Task-Typ stimmt nicht mit dem Endpunkt überein. Erwartet: PHOTO, Gefunden: " + task.getTaskType());
|
response.put("message", "Task-Typ stimmt nicht mit dem Endpunkt überein. Erwartet: PHOTO, Gefunden: " + task.getTaskType());
|
||||||
log.warn("Task type mismatch for taskId={}: expected=PHOTO, actual={}", taskIdStr, task.getTaskType());
|
log.warn("Task type mismatch for taskId={}: expected=PHOTO, actual={}", taskIdStr, task.getTaskType());
|
||||||
return response;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process extraData if present
|
// Process extraData if present
|
||||||
@@ -382,16 +295,13 @@ public class MessageController {
|
|||||||
response.put("success", true);
|
response.put("success", true);
|
||||||
response.putAll(event);
|
response.putAll(event);
|
||||||
log.info("Photo task completion processed successfully for taskId={}", taskIdStr);
|
log.info("Photo task completion processed successfully for taskId={}", taskIdStr);
|
||||||
return response;
|
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Ungültige taskId");
|
response.put("message", "Ungültige taskId");
|
||||||
return response;
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error processing photo task completion", e);
|
log.error("Error processing photo task completion", e);
|
||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Fehler bei der Verarbeitung");
|
response.put("message", "Fehler bei der Verarbeitung");
|
||||||
return response;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -399,7 +309,7 @@ public class MessageController {
|
|||||||
* Common method to process task completion for different task types.
|
* Common method to process task completion for different task types.
|
||||||
* This method contains the shared logic for all task completion endpoints.
|
* This method contains the shared logic for all task completion endpoints.
|
||||||
*/
|
*/
|
||||||
private Map<String, Object> processTaskCompletion(Map<String, Object> payload, String expectedTaskType) {
|
private void processTaskCompletion(Map<String, Object> payload, String expectedTaskType) {
|
||||||
Map<String, Object> response = new java.util.HashMap<>();
|
Map<String, Object> response = new java.util.HashMap<>();
|
||||||
response.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
response.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
||||||
response.put("type", "taskCompletedAck");
|
response.put("type", "taskCompletedAck");
|
||||||
@@ -408,7 +318,7 @@ public class MessageController {
|
|||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "taskId ist erforderlich");
|
response.put("message", "taskId ist erforderlich");
|
||||||
log.info("Task completion failed: {}", response);
|
log.info("Task completion failed: {}", response);
|
||||||
return response;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String taskIdStr = payload.get("taskId").toString();
|
String taskIdStr = payload.get("taskId").toString();
|
||||||
@@ -421,7 +331,7 @@ public class MessageController {
|
|||||||
if (opt.isEmpty()) {
|
if (opt.isEmpty()) {
|
||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Task nicht gefunden");
|
response.put("message", "Task nicht gefunden");
|
||||||
return response;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
BaseTask task = opt.get();
|
BaseTask task = opt.get();
|
||||||
@@ -431,7 +341,7 @@ public class MessageController {
|
|||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Task-Typ stimmt nicht mit dem Endpunkt überein. Erwartet: " + expectedTaskType + ", Gefunden: " + task.getTaskType());
|
response.put("message", "Task-Typ stimmt nicht mit dem Endpunkt überein. Erwartet: " + expectedTaskType + ", Gefunden: " + task.getTaskType());
|
||||||
log.warn("Task type mismatch for taskId={}: expected={}, actual={}", taskIdStr, expectedTaskType, task.getTaskType());
|
log.warn("Task type mismatch for taskId={}: expected={}, actual={}", taskIdStr, expectedTaskType, task.getTaskType());
|
||||||
return response;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
task.setCompleted(true);
|
task.setCompleted(true);
|
||||||
@@ -456,33 +366,13 @@ public class MessageController {
|
|||||||
response.put("success", true);
|
response.put("success", true);
|
||||||
response.putAll(event);
|
response.putAll(event);
|
||||||
log.info("Task completion processed successfully for taskId={}, taskType={}", taskIdStr, task.getTaskType());
|
log.info("Task completion processed successfully for taskId={}, taskType={}", taskIdStr, task.getTaskType());
|
||||||
return response;
|
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Ungültige taskId");
|
response.put("message", "Ungültige taskId");
|
||||||
return response;
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error processing task completion", e);
|
log.error("Error processing task completion", e);
|
||||||
response.put("success", false);
|
response.put("success", false);
|
||||||
response.put("message", "Fehler bei der Verarbeitung");
|
response.put("message", "Fehler bei der Verarbeitung");
|
||||||
return response;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper method to get the user ID for a task by looking up the associated job
|
|
||||||
*/
|
|
||||||
private String getUserIdForTask(BaseTask task) {
|
|
||||||
try {
|
|
||||||
java.util.Optional<Job> jobOpt = jobRepository.findById(task.getJobId());
|
|
||||||
if (jobOpt.isPresent()) {
|
|
||||||
Job job = jobOpt.get();
|
|
||||||
return job.getAppUser();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Error getting user ID for task {}: {}", task.getIdAsString(), e.getMessage(), e);
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user