Erweiterungen
This commit is contained in:
@@ -102,9 +102,6 @@ public class MessageController {
|
|||||||
* sent back to the requesting client on /client/{clientId}/auth
|
* sent back to the requesting client on /client/{clientId}/auth
|
||||||
*/
|
*/
|
||||||
public void handleAppLogin(AppLoginRequest request) {
|
public void handleAppLogin(AppLoginRequest request) {
|
||||||
log.info("MQTT Endpoint '/server/login' called with email: {}, clientId: {}",
|
|
||||||
request != null ? request.getEmail() : "null", request != null ? request.getClientId() : "null");
|
|
||||||
|
|
||||||
AppLoginResponse response;
|
AppLoginResponse response;
|
||||||
|
|
||||||
if (request == null || request.getEmail() == null || request.getPassword() == null
|
if (request == null || request.getEmail() == null || request.getPassword() == null
|
||||||
@@ -133,8 +130,6 @@ public class MessageController {
|
|||||||
// Send response via MQTT to specific client
|
// Send response via MQTT to specific client
|
||||||
if (request != null && request.getClientId() != null && !request.getClientId().isBlank()) {
|
if (request != null && request.getClientId() != null && !request.getClientId().isBlank()) {
|
||||||
mqttPublisher.publishAsJson("/client/" + request.getClientId() + "/auth", response, false);
|
mqttPublisher.publishAsJson("/client/" + request.getClientId() + "/auth", response, false);
|
||||||
log.info("MQTT Response sent to '/client/{}/auth': success={}, message='{}'", request.getClientId(),
|
|
||||||
response.isSuccess(), response.getMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,18 +140,13 @@ public class MessageController {
|
|||||||
* /client/{clientId}/jobs
|
* /client/{clientId}/jobs
|
||||||
*/
|
*/
|
||||||
public void handleGetAssignedJobs(Map<String, Object> request) {
|
public void handleGetAssignedJobs(Map<String, Object> request) {
|
||||||
log.info("MQTT Endpoint '/server/{clientId}/jobs/assigned' called with data: {}", request);
|
|
||||||
log.debug("Starting to process jobs request for MQTT endpoint");
|
|
||||||
|
|
||||||
if (request == null || !request.containsKey("appUserId")) {
|
if (request == null || !request.containsKey("appUserId")) {
|
||||||
log.info("Assigned jobs request missing appUserId; returning empty list");
|
return;
|
||||||
return; // Return empty list if no appUserId provided
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String appUserId = request.get("appUserId").toString();
|
String appUserId = request.get("appUserId").toString();
|
||||||
if (appUserId == null || appUserId.isBlank()) {
|
if (appUserId == null || appUserId.isBlank()) {
|
||||||
log.info("Assigned jobs request blank appUserId; returning empty list");
|
return;
|
||||||
return; // Return empty list if appUserId is blank
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to get clientId from request (injected from topic) or from stored
|
// Attempt to get clientId from request (injected from topic) or from stored
|
||||||
@@ -167,7 +157,7 @@ public class MessageController {
|
|||||||
if (cid != null)
|
if (cid != null)
|
||||||
clientId = cid.toString();
|
clientId = cid.toString();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.debug("Could not extract clientId from request: {}", e.getMessage());
|
// Ignore
|
||||||
}
|
}
|
||||||
if (clientId == null || clientId.isBlank()) {
|
if (clientId == null || clientId.isBlank()) {
|
||||||
clientId = getClientIdForUserId(appUserId);
|
clientId = getClientIdForUserId(appUserId);
|
||||||
@@ -175,25 +165,11 @@ public class MessageController {
|
|||||||
|
|
||||||
// Find jobs assigned to this app user
|
// Find jobs assigned to this app user
|
||||||
List<Job> assignedJobs = jobRepository.findByAppUser(appUserId);
|
List<Job> assignedJobs = jobRepository.findByAppUser(appUserId);
|
||||||
log.info("Found {} jobs for appUserId: {}", assignedJobs.size(), appUserId);
|
|
||||||
|
|
||||||
// Debug: Log all jobs and their app_user values to diagnose assignment issues
|
|
||||||
List<Job> allJobs = jobRepository.findAll();
|
|
||||||
log.info("DEBUG: Total jobs in database: {}", allJobs.size());
|
|
||||||
for (Job job : allJobs) {
|
|
||||||
log.info("DEBUG: Job {} (number: {}) has app_user='{}', digitalProcessing={}", job.getIdAsString(),
|
|
||||||
job.getJobNumber(), job.getAppUser(), job.isDigitalProcessing());
|
|
||||||
}
|
|
||||||
|
|
||||||
// For each job, fetch related cargo items and tasks (ordered by task order)
|
// For each job, fetch related cargo items and tasks (ordered by task order)
|
||||||
List<JobWithRelatedDataDTO> jobsWithRelatedData = assignedJobs.stream().map(job -> {
|
List<JobWithRelatedDataDTO> jobsWithRelatedData = assignedJobs.stream().map(job -> {
|
||||||
List<CargoItem> cargoItems = cargoItemRepository.findByJobId(job.getId());
|
List<CargoItem> cargoItems = cargoItemRepository.findByJobId(job.getId());
|
||||||
List<BaseTask> tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId());
|
List<BaseTask> tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId());
|
||||||
|
|
||||||
// Log task details for debugging
|
|
||||||
tasks.forEach(task -> log.info("Task details for job {}: type={}, order={}", job.getId(),
|
|
||||||
task.getTaskType(), task.getTaskOrder()));
|
|
||||||
|
|
||||||
return new JobWithRelatedDataDTO(job, cargoItems, tasks);
|
return new JobWithRelatedDataDTO(job, cargoItems, tasks);
|
||||||
}).toList();
|
}).toList();
|
||||||
|
|
||||||
@@ -201,27 +177,7 @@ public class MessageController {
|
|||||||
if (clientId != null && !clientId.isBlank()) {
|
if (clientId != null && !clientId.isBlank()) {
|
||||||
String topic = "/client/" + clientId + "/jobs";
|
String topic = "/client/" + clientId + "/jobs";
|
||||||
mqttPublisher.publishAsJson(topic, jobsWithRelatedData, false);
|
mqttPublisher.publishAsJson(topic, jobsWithRelatedData, false);
|
||||||
log.info("Published {} assigned jobs for appUserId='{}' to topic '{}'", jobsWithRelatedData.size(),
|
|
||||||
appUserId, topic);
|
|
||||||
} else {
|
|
||||||
log.warn("No clientId available to publish assigned jobs for appUserId='{}'. Skipping MQTT publish.",
|
|
||||||
appUserId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log complete JSON for debugging
|
|
||||||
log.debug("About to serialize {} jobs to JSON for logging", jobsWithRelatedData.size());
|
|
||||||
try {
|
|
||||||
String jsonOutput = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobsWithRelatedData);
|
|
||||||
log.info("=== COMPLETE JSON RESPONSE FOR MQTT CLIENT ===");
|
|
||||||
log.info("AppUserId: {}", appUserId);
|
|
||||||
log.info("ClientId: {}", clientId);
|
|
||||||
log.info("Number of jobs: {}", jobsWithRelatedData.size());
|
|
||||||
log.info("JSON Data:\n{}", jsonOutput);
|
|
||||||
log.info("=== END JSON RESPONSE ===");
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Failed to serialize jobs to JSON for logging: {}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -252,30 +208,14 @@ public class MessageController {
|
|||||||
public void handleTaskCompleted(Map<String, Object> payload, String taskType) {
|
public void handleTaskCompleted(Map<String, Object> payload, String taskType) {
|
||||||
String key = taskType == null ? "" : taskType.trim().toUpperCase();
|
String key = taskType == null ? "" : taskType.trim().toUpperCase();
|
||||||
|
|
||||||
log.info("handleTaskCompleted called with taskType={}, data: {}", taskType, payload);
|
|
||||||
|
|
||||||
switch (key) {
|
switch (key) {
|
||||||
case "PHOTO" -> {
|
case "PHOTO" -> processPhotoTaskCompletion(payload);
|
||||||
processPhotoTaskCompletion(payload);
|
case "CONFIRMATION" -> processConfirmationTaskCompletion(payload);
|
||||||
}
|
case "SIGNATURE" -> processSignatureTaskCompletion(payload);
|
||||||
case "CONFIRMATION" -> {
|
case "TODOLIST" -> processTodoListTaskCompletion(payload);
|
||||||
processConfirmationTaskCompletion(payload);
|
case "BARCODE" -> processBarcodeTaskCompletion(payload);
|
||||||
}
|
case "COMMENT" -> processCommentTaskCompletion(payload);
|
||||||
case "SIGNATURE" -> {
|
default -> log.error("[TASK] Unknown taskType: {}", taskType);
|
||||||
processSignatureTaskCompletion(payload);
|
|
||||||
}
|
|
||||||
case "TODOLIST" -> {
|
|
||||||
processTodoListTaskCompletion(payload);
|
|
||||||
}
|
|
||||||
case "BARCODE" -> {
|
|
||||||
processBarcodeTaskCompletion(payload);
|
|
||||||
}
|
|
||||||
case "COMMENT" -> {
|
|
||||||
processCommentTaskCompletion(payload);
|
|
||||||
}
|
|
||||||
default -> {
|
|
||||||
log.info("ERROR: handleTaskCompleted called with taskType={}, data: {}", taskType, payload);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -294,7 +234,6 @@ public class MessageController {
|
|||||||
try {
|
try {
|
||||||
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
||||||
if (opt.isEmpty()) {
|
if (opt.isEmpty()) {
|
||||||
log.warn("Task not found for barcode completion. taskId={}", taskId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
BaseTask task = opt.get();
|
BaseTask task = opt.get();
|
||||||
@@ -304,8 +243,6 @@ public class MessageController {
|
|||||||
if (extra instanceof Map<?, ?> extraData) {
|
if (extra instanceof Map<?, ?> extraData) {
|
||||||
Object barcodesObj = extraData.get("barcodes");
|
Object barcodesObj = extraData.get("barcodes");
|
||||||
if (barcodesObj instanceof List<?> barcodesList) {
|
if (barcodesObj instanceof List<?> barcodesList) {
|
||||||
// Suppressing unchecked cast warning as extraData structure is validated from
|
|
||||||
// MQTT payload
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
List<String> barcodes = (List<String>) barcodesList;
|
List<String> barcodes = (List<String>) barcodesList;
|
||||||
|
|
||||||
@@ -314,33 +251,24 @@ public class MessageController {
|
|||||||
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
||||||
Barcode barcodeEntry = new Barcode(new ObjectId(taskId.toString()), barcodeString,
|
Barcode barcodeEntry = new Barcode(new ObjectId(taskId.toString()), barcodeString,
|
||||||
completedBy);
|
completedBy);
|
||||||
|
|
||||||
barcodeRepository.save(barcodeEntry);
|
barcodeRepository.save(barcodeEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
extraDataSummary = barcodes.size() + " Barcode(s) gescannt: "
|
extraDataSummary = barcodes.size() + " Barcode(s) gescannt: "
|
||||||
+ String.join(", ", barcodes.subList(0, Math.min(3, barcodes.size())))
|
+ String.join(", ", barcodes.subList(0, Math.min(3, barcodes.size())))
|
||||||
+ (barcodes.size() > 3 ? "..." : "");
|
+ (barcodes.size() > 3 ? "..." : "");
|
||||||
log.info("Saved {} barcodes for taskId={}", barcodes.size(), taskId);
|
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Keine Barcodes gescannt";
|
extraDataSummary = "Keine Barcodes gescannt";
|
||||||
log.info("No barcodes found in extraData for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Barcode-Daten fehlerhaft";
|
extraDataSummary = "Barcode-Daten fehlerhaft";
|
||||||
log.warn("extraData.barcodes is not a List for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Keine Extra-Daten";
|
extraDataSummary = "Keine Extra-Daten";
|
||||||
log.warn("extraData is not a Map for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, mark the task as completed with history logging
|
|
||||||
completeTaskWithHistory(taskId, extraDataSummary);
|
completeTaskWithHistory(taskId, extraDataSummary);
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
log.error("Invalid taskId format for barcode completion: {}", taskId);
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
log.error("Error while processing barcode task completion (taskId={}): {}", taskId, ex.getMessage(), ex);
|
log.error("[TASK] Barcode completion error: {}", ex.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -349,7 +277,6 @@ public class MessageController {
|
|||||||
try {
|
try {
|
||||||
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
||||||
if (opt.isEmpty()) {
|
if (opt.isEmpty()) {
|
||||||
log.warn("Task not found for signature completion. taskId={}", taskId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
BaseTask task = opt.get();
|
BaseTask task = opt.get();
|
||||||
@@ -363,29 +290,21 @@ public class MessageController {
|
|||||||
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
||||||
Signature signatureEntry = new Signature(new ObjectId(taskId.toString()), signatureSvg,
|
Signature signatureEntry = new Signature(new ObjectId(taskId.toString()), signatureSvg,
|
||||||
completedBy);
|
completedBy);
|
||||||
|
|
||||||
signatureRepository.save(signatureEntry);
|
signatureRepository.save(signatureEntry);
|
||||||
extraDataSummary = "Unterschrift erfasst (SVG, " + signatureSvg.length() + " Zeichen)";
|
extraDataSummary = "Unterschrift erfasst (SVG, " + signatureSvg.length() + " Zeichen)";
|
||||||
log.info("Saved signature for taskId={}", taskId);
|
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Leere Unterschrift";
|
extraDataSummary = "Leere Unterschrift";
|
||||||
log.info("Empty signature SVG found for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Unterschrift-Daten fehlerhaft";
|
extraDataSummary = "Unterschrift-Daten fehlerhaft";
|
||||||
log.warn("extraData.signatureSvg is not a String for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Keine Extra-Daten";
|
extraDataSummary = "Keine Extra-Daten";
|
||||||
log.warn("extraData is not a Map for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, mark the task as completed with history logging
|
|
||||||
completeTaskWithHistory(taskId, extraDataSummary);
|
completeTaskWithHistory(taskId, extraDataSummary);
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
log.error("Invalid taskId format for signature completion: {}", taskId);
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
log.error("Error while processing signature task completion (taskId={}): {}", taskId, ex.getMessage(), ex);
|
log.error("[TASK] Signature completion error: {}", ex.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -394,19 +313,15 @@ public class MessageController {
|
|||||||
try {
|
try {
|
||||||
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
||||||
if (opt.isEmpty()) {
|
if (opt.isEmpty()) {
|
||||||
log.warn("Task not found for photo completion. taskId={}", taskId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
BaseTask task = opt.get();
|
BaseTask task = opt.get();
|
||||||
ObjectId jobId = new ObjectId(task.getJobIdAsString());
|
|
||||||
|
|
||||||
String extraDataSummary = null;
|
String extraDataSummary = null;
|
||||||
Object extra = payload.get("extraData");
|
Object extra = payload.get("extraData");
|
||||||
if (extra instanceof Map<?, ?> extraData) {
|
if (extra instanceof Map<?, ?> extraData) {
|
||||||
Object photosObj = extraData.get("photos");
|
Object photosObj = extraData.get("photos");
|
||||||
if (photosObj instanceof List<?> photosList) {
|
if (photosObj instanceof List<?> photosList) {
|
||||||
// Suppressing unchecked cast warning as extraData structure is validated from
|
|
||||||
// MQTT payload
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
List<String> photos = (List<String>) photosList;
|
List<String> photos = (List<String>) photosList;
|
||||||
|
|
||||||
@@ -414,31 +329,22 @@ public class MessageController {
|
|||||||
for (String photoString : photos) {
|
for (String photoString : photos) {
|
||||||
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
||||||
Photo photoEntry = new Photo(new ObjectId(taskId.toString()), photoString, completedBy);
|
Photo photoEntry = new Photo(new ObjectId(taskId.toString()), photoString, completedBy);
|
||||||
|
|
||||||
photoRepository.save(photoEntry);
|
photoRepository.save(photoEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
extraDataSummary = photos.size() + " Foto(s) aufgenommen";
|
extraDataSummary = photos.size() + " Foto(s) aufgenommen";
|
||||||
log.info("Saved {} photos for taskId={}, jobId={}", photos.size(), taskId, jobId);
|
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Keine Fotos aufgenommen";
|
extraDataSummary = "Keine Fotos aufgenommen";
|
||||||
log.info("No photos found in extraData for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Foto-Daten fehlerhaft";
|
extraDataSummary = "Foto-Daten fehlerhaft";
|
||||||
log.warn("extraData.photos is not a List for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Keine Extra-Daten";
|
extraDataSummary = "Keine Extra-Daten";
|
||||||
log.warn("extraData is not a Map for taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, mark the task as completed with history logging
|
|
||||||
completeTaskWithHistory(taskId, extraDataSummary);
|
completeTaskWithHistory(taskId, extraDataSummary);
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
log.error("Invalid taskId format for photo completion: {}", taskId);
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
log.error("Error while processing photo task completion (taskId={}): {}", taskId, ex.getMessage(), ex);
|
log.error("[TASK] Photo completion error: {}", ex.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -447,7 +353,6 @@ public class MessageController {
|
|||||||
try {
|
try {
|
||||||
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
var opt = taskRepository.findById(new ObjectId(taskId.toString()));
|
||||||
if (opt.isEmpty()) {
|
if (opt.isEmpty()) {
|
||||||
log.warn("Task not found for comment completion. taskId={}", taskId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
BaseTask task = opt.get();
|
BaseTask task = opt.get();
|
||||||
@@ -457,27 +362,20 @@ public class MessageController {
|
|||||||
if (extra instanceof Map<?, ?> extraData) {
|
if (extra instanceof Map<?, ?> extraData) {
|
||||||
Object commentTextObj = extraData.get("commentText");
|
Object commentTextObj = extraData.get("commentText");
|
||||||
if (commentTextObj instanceof String commentText && !commentText.isBlank()) {
|
if (commentTextObj instanceof String commentText && !commentText.isBlank()) {
|
||||||
// Save comment to database
|
|
||||||
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
||||||
Comment commentEntry = new Comment(new ObjectId(taskId.toString()), commentText, completedBy);
|
Comment commentEntry = new Comment(new ObjectId(taskId.toString()), commentText, completedBy);
|
||||||
commentRepository.save(commentEntry);
|
commentRepository.save(commentEntry);
|
||||||
|
|
||||||
extraDataSummary = "Kommentar: " + commentText;
|
extraDataSummary = "Kommentar: " + commentText;
|
||||||
log.info("Comment saved for taskId={}: {}", taskId, commentText);
|
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Kommentar abgegeben (leer)";
|
extraDataSummary = "Kommentar abgegeben (leer)";
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
extraDataSummary = "Kommentar abgegeben";
|
extraDataSummary = "Kommentar abgegeben";
|
||||||
log.warn("extraData is not a Map for comment task completion, taskId={}", taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, mark the task as completed with history logging
|
|
||||||
completeTaskWithHistory(taskId, extraDataSummary);
|
completeTaskWithHistory(taskId, extraDataSummary);
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
log.error("Invalid taskId format for comment completion: {}", taskId);
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
log.error("Error while processing comment task completion (taskId={}): {}", taskId, ex.getMessage(), ex);
|
log.error("[TASK] Comment completion error: {}", ex.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -487,7 +385,6 @@ public class MessageController {
|
|||||||
ObjectId taskId = new ObjectId(taskIdStr);
|
ObjectId taskId = new ObjectId(taskIdStr);
|
||||||
var opt = taskRepository.findById(taskId);
|
var opt = taskRepository.findById(taskId);
|
||||||
if (opt.isEmpty()) {
|
if (opt.isEmpty()) {
|
||||||
log.warn("Task not found for completion. taskId={}", taskIdStr);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
BaseTask task = opt.get();
|
BaseTask task = opt.get();
|
||||||
@@ -500,12 +397,11 @@ public class MessageController {
|
|||||||
ObjectId jobId = new ObjectId(task.getJobIdAsString());
|
ObjectId jobId = new ObjectId(task.getJobIdAsString());
|
||||||
String taskType = task.getTaskType() != null ? task.getTaskType().toString() : "Unknown";
|
String taskType = task.getTaskType() != null ? task.getTaskType().toString() : "Unknown";
|
||||||
String taskDisplayName = task.getDisplayName() != null ? task.getDisplayName() : taskType;
|
String taskDisplayName = task.getDisplayName() != null ? task.getDisplayName() : taskType;
|
||||||
|
|
||||||
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
||||||
jobHistoryService.logTaskCompletion(jobId, taskType, taskIdStr, completedBy, taskDisplayName,
|
jobHistoryService.logTaskCompletion(jobId, taskType, taskIdStr, completedBy, taskDisplayName,
|
||||||
extraDataSummary);
|
extraDataSummary);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Failed to log task completion history for task {}: {}", taskIdStr, e.getMessage());
|
// Ignore history logging errors
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send email notification for task completion
|
// Send email notification for task completion
|
||||||
@@ -513,56 +409,35 @@ public class MessageController {
|
|||||||
ObjectId jobId = new ObjectId(task.getJobIdAsString());
|
ObjectId jobId = new ObjectId(task.getJobIdAsString());
|
||||||
String taskType = task.getTaskType() != null ? task.getTaskType().toString() : "Unknown";
|
String taskType = task.getTaskType() != null ? task.getTaskType().toString() : "Unknown";
|
||||||
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
||||||
|
|
||||||
emailService.sendTaskCompletionNotification(jobId, taskType, taskIdStr, completedBy);
|
emailService.sendTaskCompletionNotification(jobId, taskType, taskIdStr, completedBy);
|
||||||
|
|
||||||
// Check if all tasks are completed and handle job completion
|
|
||||||
checkAndHandleJobCompletion(jobId, completedBy);
|
checkAndHandleJobCompletion(jobId, completedBy);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Failed to send task completion email notification for task {}: {}", taskIdStr,
|
// Ignore email notification errors
|
||||||
e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String completedBy = task.getCompletedBy() != null ? task.getCompletedBy() : "Unknown";
|
|
||||||
log.info("Task marked completed. taskId={}, completedBy={}, extraData={}", taskIdStr, completedBy,
|
|
||||||
extraDataSummary);
|
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
log.error("Invalid taskId format for completion: {}", taskIdStr);
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
log.error("Error while marking task completed (taskId={}): {}", taskIdStr, ex.getMessage(), ex);
|
log.error("[TASK] Completion error: {}", ex.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkAndHandleJobCompletion(ObjectId jobId, String completedBy) {
|
private void checkAndHandleJobCompletion(ObjectId jobId, String completedBy) {
|
||||||
try {
|
try {
|
||||||
// Check if all tasks for this job are completed
|
|
||||||
var allTasks = taskRepository.findByJobIdOrderByTaskOrderAsc(jobId);
|
var allTasks = taskRepository.findByJobIdOrderByTaskOrderAsc(jobId);
|
||||||
if (allTasks.isEmpty()) {
|
if (allTasks.isEmpty()) {
|
||||||
log.debug("No tasks found for job {}", jobId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean allCompleted = allTasks.stream().allMatch(task -> task.isCompleted());
|
boolean allCompleted = allTasks.stream().allMatch(task -> task.isCompleted());
|
||||||
|
|
||||||
if (allCompleted) {
|
if (allCompleted) {
|
||||||
log.info("All tasks completed for job {}, updating job status and sending completion notification",
|
|
||||||
jobId);
|
|
||||||
|
|
||||||
// Update job status to COMPLETED
|
|
||||||
updateJobStatusToCompleted(jobId);
|
updateJobStatusToCompleted(jobId);
|
||||||
|
|
||||||
// Send completion notification
|
|
||||||
try {
|
try {
|
||||||
emailService.sendJobCompletionNotification(jobId, completedBy);
|
emailService.sendJobCompletionNotification(jobId, completedBy);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Failed to send job completion notification for job {}: {}", jobId, e.getMessage());
|
// Ignore email notification errors
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
long completedCount = allTasks.stream().mapToLong(task -> task.isCompleted() ? 1L : 0L).sum();
|
|
||||||
log.debug("Job {} not yet complete: {}/{} tasks completed", jobId, completedCount, allTasks.size());
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to check job completion for job {}: {}", jobId, e.getMessage(), e);
|
// Ignore job completion check errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -570,26 +445,17 @@ public class MessageController {
|
|||||||
try {
|
try {
|
||||||
Optional<Job> jobOpt = jobRepository.findById(jobId);
|
Optional<Job> jobOpt = jobRepository.findById(jobId);
|
||||||
if (jobOpt.isEmpty()) {
|
if (jobOpt.isEmpty()) {
|
||||||
log.warn("Job not found for status update: {}", jobId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Job job = jobOpt.get();
|
Job job = jobOpt.get();
|
||||||
JobStatus oldStatus = job.getStatus();
|
|
||||||
|
|
||||||
// Only update if not already completed
|
|
||||||
if (job.getStatus() != JobStatus.COMPLETED) {
|
if (job.getStatus() != JobStatus.COMPLETED) {
|
||||||
job.setStatus(JobStatus.COMPLETED);
|
job.setStatus(JobStatus.COMPLETED);
|
||||||
job.setUpdatedAt(LocalDateTime.now());
|
job.setUpdatedAt(LocalDateTime.now());
|
||||||
jobRepository.save(job);
|
jobRepository.save(job);
|
||||||
|
|
||||||
log.info("Job status updated from {} to COMPLETED for job {}", oldStatus != null ? oldStatus : "null",
|
|
||||||
job.getJobNumber());
|
|
||||||
} else {
|
|
||||||
log.debug("Job {} already has COMPLETED status", job.getJobNumber());
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to update job status to COMPLETED for job {}: {}", jobId, e.getMessage(), e);
|
// Ignore job status update errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -599,7 +465,6 @@ public class MessageController {
|
|||||||
private void storeClientIdMapping(String userId, String clientId) {
|
private void storeClientIdMapping(String userId, String clientId) {
|
||||||
userClientIdMapping.put(userId, clientId);
|
userClientIdMapping.put(userId, clientId);
|
||||||
clientIdUserMapping.put(clientId, userId);
|
clientIdUserMapping.put(clientId, userId);
|
||||||
log.debug("Stored clientId mapping: userId={} <-> clientId={}", userId, clientId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -615,12 +480,8 @@ public class MessageController {
|
|||||||
*/
|
*/
|
||||||
public void handlePong(Map<String, Object> payload) {
|
public void handlePong(Map<String, Object> payload) {
|
||||||
String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null;
|
String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null;
|
||||||
log.debug("Received pong from client: {}", clientId);
|
|
||||||
|
|
||||||
if (clientId != null && !clientId.isBlank()) {
|
if (clientId != null && !clientId.isBlank()) {
|
||||||
clientConnectionService.handlePong(clientId);
|
clientConnectionService.handlePong(clientId);
|
||||||
} else {
|
|
||||||
log.warn("Received pong without clientId in payload");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -634,31 +495,18 @@ public class MessageController {
|
|||||||
* This clientId is stored as the receiver field in the message.
|
* This clientId is stored as the receiver field in the message.
|
||||||
*/
|
*/
|
||||||
public void handleIncomingMessage(Map<String, Object> payload) {
|
public void handleIncomingMessage(Map<String, Object> payload) {
|
||||||
log.info("MQTT Endpoint '/server/{clientId}/message' called with data: {}", payload);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Extract clientId from payload (added by MqttV5ClientManager from topic)
|
|
||||||
// The clientId IS the AppUser ID
|
|
||||||
String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null;
|
String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null;
|
||||||
|
|
||||||
if (clientId == null || clientId.isBlank()) {
|
if (clientId == null || clientId.isBlank()) {
|
||||||
log.warn("No clientId found in message payload, cannot process message");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add clientId as receiver to the payload
|
|
||||||
payload.put("receiver", clientId);
|
payload.put("receiver", clientId);
|
||||||
|
|
||||||
// Parse the payload
|
|
||||||
ChatMessageInboundPayload inboundPayload = ChatMessageInboundPayload.fromPayload(payload);
|
ChatMessageInboundPayload inboundPayload = ChatMessageInboundPayload.fromPayload(payload);
|
||||||
|
|
||||||
// Save the message with receiver = AppUser ID (clientId)
|
|
||||||
messageService.receiveMessageFromClient(inboundPayload);
|
messageService.receiveMessageFromClient(inboundPayload);
|
||||||
log.info("Successfully saved incoming message for AppUser '{}'", clientId);
|
|
||||||
} catch (IllegalArgumentException validationError) {
|
|
||||||
log.warn("Incoming chat message rejected: {}", validationError.getMessage());
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error handling incoming message: {}", e.getMessage(), e);
|
// Ignore message handling errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,19 +0,0 @@
|
|||||||
package de.assecutor.votianlt.event;
|
|
||||||
|
|
||||||
import de.assecutor.votianlt.model.Job;
|
|
||||||
import lombok.Getter;
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Event published when a new job is created
|
|
||||||
*/
|
|
||||||
@Getter
|
|
||||||
public class JobCreatedEvent extends ApplicationEvent {
|
|
||||||
|
|
||||||
private final Job job;
|
|
||||||
|
|
||||||
public JobCreatedEvent(Object source, Job job) {
|
|
||||||
super(source);
|
|
||||||
this.job = job;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -60,8 +60,6 @@ public class PluginMessagingConfig {
|
|||||||
*/
|
*/
|
||||||
@EventListener(ApplicationReadyEvent.class)
|
@EventListener(ApplicationReadyEvent.class)
|
||||||
public void initializePlugin(ApplicationReadyEvent event) {
|
public void initializePlugin(ApplicationReadyEvent event) {
|
||||||
log.info("[PluginMessagingConfig] Initializing messaging plugin: {}", pluginType);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
MessagingPlugin plugin = createPlugin(pluginType);
|
MessagingPlugin plugin = createPlugin(pluginType);
|
||||||
PluginConfig config = createPluginConfig(pluginType);
|
PluginConfig config = createPluginConfig(pluginType);
|
||||||
@@ -74,33 +72,25 @@ public class PluginMessagingConfig {
|
|||||||
.getBean(ClientConnectionService.class);
|
.getBean(ClientConnectionService.class);
|
||||||
|
|
||||||
// Set up a listener to subscribe when connected
|
// Set up a listener to subscribe when connected
|
||||||
log.info("[PluginMessagingConfig] Adding state listener");
|
|
||||||
pluginManager.addStateListener(stateEvent -> {
|
pluginManager.addStateListener(stateEvent -> {
|
||||||
log.info("[PluginMessagingConfig] State event received: state={}, isConnected={}",
|
|
||||||
stateEvent.getState(), stateEvent.isConnected());
|
|
||||||
if (stateEvent.isConnected()) {
|
if (stateEvent.isConnected()) {
|
||||||
log.info("[PluginMessagingConfig] Plugin connected, setting up subscriptions");
|
|
||||||
try {
|
try {
|
||||||
setupSubscriptions(deliveryService, messageController, clientConnectionService);
|
setupSubscriptions(deliveryService, messageController, clientConnectionService);
|
||||||
log.info("[PluginMessagingConfig] Subscriptions setup completed");
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[PluginMessagingConfig] Error setting up subscriptions: {}", e.getMessage(), e);
|
log.error("[MQTT] Error setting up subscriptions: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
} else {
|
} else if (stateEvent.getState() == ConnectionStateEvent.ConnectionState.DISCONNECTED) {
|
||||||
log.debug("[PluginMessagingConfig] Plugin not yet connected, waiting...");
|
log.info("[MQTT] Disconnected from broker");
|
||||||
|
} else if (stateEvent.getState() == ConnectionStateEvent.ConnectionState.ERROR) {
|
||||||
|
log.error("[MQTT] Connection error: {}", stateEvent.getErrorMessage());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
log.info("[PluginMessagingConfig] State listener added");
|
|
||||||
|
|
||||||
// Activate plugin (this will trigger connection and eventually the listener
|
// Activate plugin
|
||||||
// above)
|
|
||||||
pluginManager.activatePlugin(plugin, config);
|
pluginManager.activatePlugin(plugin, config);
|
||||||
|
|
||||||
log.info(
|
|
||||||
"[PluginMessagingConfig] Plugin activation initiated, subscriptions will be set up when connected");
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[PluginMessagingConfig] Failed to initialize plugin: {}", e.getMessage(), e);
|
log.error("[MQTT] Failed to initialize: {}", e.getMessage());
|
||||||
throw new RuntimeException("Failed to initialize messaging plugin", e);
|
throw new RuntimeException("Failed to initialize messaging plugin", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -146,22 +136,17 @@ public class PluginMessagingConfig {
|
|||||||
*/
|
*/
|
||||||
private void setupSubscriptions(MessageDeliveryService deliveryService, MessageController messageController,
|
private void setupSubscriptions(MessageDeliveryService deliveryService, MessageController messageController,
|
||||||
ClientConnectionService clientConnectionService) {
|
ClientConnectionService clientConnectionService) {
|
||||||
log.info("[PluginMessagingConfig] Setting up message subscriptions");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Register ACK handler
|
// Register ACK handler
|
||||||
pluginManager.registerAckHandler((messageId, payload) -> {
|
pluginManager.registerAckHandler((messageId, payload) -> {
|
||||||
try {
|
try {
|
||||||
String json = new String(payload, StandardCharsets.UTF_8);
|
String json = new String(payload, StandardCharsets.UTF_8);
|
||||||
log.info("[PluginMessagingConfig] Received ACK JSON: {}", json);
|
|
||||||
|
|
||||||
// ACK messages are wrapped in MessageEnvelope
|
|
||||||
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
|
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
|
||||||
AcknowledgmentMessage ack = objectMapper.convertValue(envelope.getPayload(),
|
AcknowledgmentMessage ack = objectMapper.convertValue(envelope.getPayload(),
|
||||||
AcknowledgmentMessage.class);
|
AcknowledgmentMessage.class);
|
||||||
deliveryService.handleAcknowledgment(ack);
|
deliveryService.handleAcknowledgment(ack);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[PluginMessagingConfig] Error handling ACK message: {}", e.getMessage(), e);
|
// Ignore ACK handling errors
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -174,10 +159,8 @@ public class PluginMessagingConfig {
|
|||||||
messageController, clientConnectionService));
|
messageController, clientConnectionService));
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("[PluginMessagingConfig] Message subscriptions initialized");
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[PluginMessagingConfig] Error setting up subscriptions: {}", e.getMessage(), e);
|
log.error("[MQTT] Error setting up subscriptions: {}", e.getMessage());
|
||||||
throw new RuntimeException("Failed to setup subscriptions", e);
|
throw new RuntimeException("Failed to setup subscriptions", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -190,26 +173,23 @@ public class PluginMessagingConfig {
|
|||||||
MessageController messageController, ClientConnectionService clientConnectionService) {
|
MessageController messageController, ClientConnectionService clientConnectionService) {
|
||||||
try {
|
try {
|
||||||
String json = new String(payload, StandardCharsets.UTF_8);
|
String json = new String(payload, StandardCharsets.UTF_8);
|
||||||
log.info("[PluginMessagingConfig] Received JSON from client {}: {}", clientId, json);
|
|
||||||
|
|
||||||
// Try to parse as envelope first
|
// Try to parse as envelope first
|
||||||
try {
|
try {
|
||||||
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
|
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
|
||||||
// Valid envelope - check if it has required fields
|
|
||||||
if (envelope.getMessageId() != null && envelope.getTopic() != null) {
|
if (envelope.getMessageId() != null && envelope.getTopic() != null) {
|
||||||
deliveryService.handleIncomingMessage(envelope);
|
deliveryService.handleIncomingMessage(envelope);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Not a valid envelope, try legacy format
|
// Not a valid envelope, try legacy format
|
||||||
log.debug("[PluginMessagingConfig] Message is not an envelope, trying legacy format");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle legacy format (direct payload without envelope)
|
// Handle legacy format (direct payload without envelope)
|
||||||
handleLegacyMessage(clientId, json, messageController, clientConnectionService);
|
handleLegacyMessage(clientId, json, messageController, clientConnectionService);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[PluginMessagingConfig] Error handling message from client {}: {}", clientId, e.getMessage(), e);
|
// Ignore message handling errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -222,12 +202,10 @@ public class PluginMessagingConfig {
|
|||||||
ClientConnectionService clientConnectionService) {
|
ClientConnectionService clientConnectionService) {
|
||||||
try {
|
try {
|
||||||
Map<String, Object> payload = objectMapper.readValue(json, Map.class);
|
Map<String, Object> payload = objectMapper.readValue(json, Map.class);
|
||||||
log.info("[PluginMessagingConfig] Processing legacy message from client {}: {}", clientId, payload);
|
|
||||||
|
|
||||||
// Check if this is a login request (has email, password, clientId)
|
// Check if this is a login request (has email, password, clientId)
|
||||||
if (payload.containsKey("email") && payload.containsKey("password") && payload.containsKey("clientId")) {
|
if (payload.containsKey("email") && payload.containsKey("password") && payload.containsKey("clientId")) {
|
||||||
AppLoginRequest loginRequest = objectMapper.convertValue(payload, AppLoginRequest.class);
|
AppLoginRequest loginRequest = objectMapper.convertValue(payload, AppLoginRequest.class);
|
||||||
log.info("[PluginMessagingConfig] Routing legacy login request for email: {}", loginRequest.getEmail());
|
|
||||||
messageController.handleAppLogin(loginRequest);
|
messageController.handleAppLogin(loginRequest);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -236,7 +214,6 @@ public class PluginMessagingConfig {
|
|||||||
if ("pong".equals(payload.get("type"))) {
|
if ("pong".equals(payload.get("type"))) {
|
||||||
String pongClientId = clientId != null ? clientId : (String) payload.get("clientId");
|
String pongClientId = clientId != null ? clientId : (String) payload.get("clientId");
|
||||||
if (pongClientId != null) {
|
if (pongClientId != null) {
|
||||||
log.debug("[PluginMessagingConfig] Routing legacy pong from client: {}", pongClientId);
|
|
||||||
clientConnectionService.handlePong(pongClientId);
|
clientConnectionService.handlePong(pongClientId);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@@ -245,7 +222,6 @@ public class PluginMessagingConfig {
|
|||||||
// Check if this is a task completion
|
// Check if this is a task completion
|
||||||
if (payload.containsKey("taskType") || payload.containsKey("taskId")) {
|
if (payload.containsKey("taskType") || payload.containsKey("taskId")) {
|
||||||
String taskType = payload.get("taskType") != null ? payload.get("taskType").toString() : null;
|
String taskType = payload.get("taskType") != null ? payload.get("taskType").toString() : null;
|
||||||
log.info("[PluginMessagingConfig] Routing legacy task_completed from client: {}", clientId);
|
|
||||||
messageController.handleTaskCompleted(payload, taskType);
|
messageController.handleTaskCompleted(payload, taskType);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -255,16 +231,12 @@ public class PluginMessagingConfig {
|
|||||||
if (clientId != null) {
|
if (clientId != null) {
|
||||||
payload.put("clientId", clientId);
|
payload.put("clientId", clientId);
|
||||||
}
|
}
|
||||||
log.info("[PluginMessagingConfig] Routing legacy jobs/assigned request from client: {}", clientId);
|
|
||||||
messageController.handleGetAssignedJobs(payload);
|
messageController.handleGetAssignedJobs(payload);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.warn("[PluginMessagingConfig] Unknown legacy message format from client {}: {}", clientId, json);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[PluginMessagingConfig] Error handling legacy message from client {}: {}", clientId,
|
// Ignore legacy message handling errors
|
||||||
e.getMessage(), e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package de.assecutor.votianlt.messaging.delivery;
|
package de.assecutor.votianlt.messaging.delivery;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@@ -8,7 +7,6 @@ import org.springframework.stereotype.Component;
|
|||||||
* Scheduled tasks for message delivery retry and cleanup.
|
* Scheduled tasks for message delivery retry and cleanup.
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
|
||||||
public class RetryScheduler {
|
public class RetryScheduler {
|
||||||
|
|
||||||
private final MessageDeliveryService deliveryService;
|
private final MessageDeliveryService deliveryService;
|
||||||
@@ -23,10 +21,9 @@ public class RetryScheduler {
|
|||||||
@Scheduled(fixedDelayString = "${app.messaging.delivery.retry-interval-seconds:30}000")
|
@Scheduled(fixedDelayString = "${app.messaging.delivery.retry-interval-seconds:30}000")
|
||||||
public void retryPendingDeliveries() {
|
public void retryPendingDeliveries() {
|
||||||
try {
|
try {
|
||||||
log.debug("[RetryScheduler] Starting retry task");
|
|
||||||
deliveryService.retryPendingDeliveries();
|
deliveryService.retryPendingDeliveries();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[RetryScheduler] Error in retry task: {}", e.getMessage(), e);
|
// Ignore retry errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,10 +33,9 @@ public class RetryScheduler {
|
|||||||
@Scheduled(fixedDelayString = "${app.messaging.delivery.cleanup-interval-minutes:60}000")
|
@Scheduled(fixedDelayString = "${app.messaging.delivery.cleanup-interval-minutes:60}000")
|
||||||
public void cleanupOldDeliveries() {
|
public void cleanupOldDeliveries() {
|
||||||
try {
|
try {
|
||||||
log.debug("[RetryScheduler] Starting cleanup task");
|
|
||||||
deliveryService.cleanupOldDeliveries();
|
deliveryService.cleanupOldDeliveries();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[RetryScheduler] Error in cleanup task: {}", e.getMessage(), e);
|
// Ignore cleanup errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,8 +56,6 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(PluginConfig config) throws PluginException {
|
public void init(PluginConfig config) throws PluginException {
|
||||||
log.info("[MqttPlugin] Initializing MQTT plugin");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
notifyConnectionState(ConnectionState.INITIALIZING, null);
|
notifyConnectionState(ConnectionState.INITIALIZING, null);
|
||||||
|
|
||||||
@@ -71,9 +69,6 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
int connectionTimeout = config.getInt(CONFIG_CONNECTION_TIMEOUT, 60);
|
int connectionTimeout = config.getInt(CONFIG_CONNECTION_TIMEOUT, 60);
|
||||||
int keepAlive = config.getInt(CONFIG_KEEP_ALIVE, 60);
|
int keepAlive = config.getInt(CONFIG_KEEP_ALIVE, 60);
|
||||||
|
|
||||||
log.info("[MqttPlugin] Connecting to {}:{} with clientId: {} (timeout: {}s, keepAlive: {}s)", brokerHost,
|
|
||||||
brokerPort, clientId, connectionTimeout, keepAlive);
|
|
||||||
|
|
||||||
// Build MQTT client
|
// Build MQTT client
|
||||||
var clientBuilder = MqttClient.builder().useMqttVersion5().identifier(clientId).serverHost(brokerHost)
|
var clientBuilder = MqttClient.builder().useMqttVersion5().identifier(clientId).serverHost(brokerHost)
|
||||||
.serverPort(brokerPort).automaticReconnect().initialDelay(1, java.util.concurrent.TimeUnit.SECONDS)
|
.serverPort(brokerPort).automaticReconnect().initialDelay(1, java.util.concurrent.TimeUnit.SECONDS)
|
||||||
@@ -93,68 +88,42 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
// Connect asynchronously
|
// Connect asynchronously
|
||||||
notifyConnectionState(ConnectionState.CONNECTING, null);
|
notifyConnectionState(ConnectionState.CONNECTING, null);
|
||||||
|
|
||||||
log.info("[MqttPlugin] Starting async connection to {}:{}", brokerHost, brokerPort);
|
|
||||||
|
|
||||||
mqttClient.connect(connectBuilder.build())
|
mqttClient.connect(connectBuilder.build())
|
||||||
.orTimeout(connectionTimeout, java.util.concurrent.TimeUnit.SECONDS)
|
.orTimeout(connectionTimeout, java.util.concurrent.TimeUnit.SECONDS)
|
||||||
.whenComplete((connAck, throwable) -> {
|
.whenComplete((connAck, throwable) -> {
|
||||||
if (throwable != null) {
|
if (throwable != null) {
|
||||||
String errorMsg = String.format("Connection to %s:%d failed: %s", brokerHost, brokerPort,
|
String errorMsg = String.format("Connection to %s:%d failed: %s", brokerHost, brokerPort,
|
||||||
throwable.getMessage());
|
throwable.getMessage());
|
||||||
log.error("[MqttPlugin] {}", errorMsg, throwable);
|
log.error("[MQTT] Connection failed: {}", errorMsg);
|
||||||
|
|
||||||
// Check for specific error types
|
|
||||||
if (throwable instanceof java.util.concurrent.TimeoutException) {
|
|
||||||
log.error(
|
|
||||||
"[MqttPlugin] Connection timeout - broker may be unreachable or firewall blocking connection");
|
|
||||||
} else if (throwable.getCause() instanceof java.net.UnknownHostException) {
|
|
||||||
log.error("[MqttPlugin] Unknown host - DNS resolution failed for {}", brokerHost);
|
|
||||||
} else if (throwable.getCause() instanceof java.net.ConnectException) {
|
|
||||||
log.error("[MqttPlugin] Connection refused - broker may be down or port {} is blocked",
|
|
||||||
brokerPort);
|
|
||||||
}
|
|
||||||
|
|
||||||
connected = false;
|
connected = false;
|
||||||
notifyConnectionState(ConnectionState.ERROR, errorMsg);
|
notifyConnectionState(ConnectionState.ERROR, errorMsg);
|
||||||
} else {
|
} else {
|
||||||
log.info("[MqttPlugin] Connected successfully - connAck: {}", connAck);
|
log.info("[MQTT] Server connected to {}:{}", brokerHost, brokerPort);
|
||||||
connected = true;
|
connected = true;
|
||||||
setupGlobalMessageHandler();
|
setupGlobalMessageHandler();
|
||||||
log.info("[MqttPlugin] Notifying CONNECTED state");
|
|
||||||
notifyConnectionState(ConnectionState.CONNECTED, null);
|
notifyConnectionState(ConnectionState.CONNECTED, null);
|
||||||
log.info("[MqttPlugin] CONNECTED state notification sent");
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
log.info("[MqttPlugin] Initialization complete - connection in progress");
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MqttPlugin] Initialization failed: {}", e.getMessage(), e);
|
log.error("[MQTT] Initialization failed: {}", e.getMessage(), e);
|
||||||
throw new PluginException("Failed to initialize MQTT plugin", e);
|
throw new PluginException("Failed to initialize MQTT plugin", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exit() throws PluginException {
|
public void exit() throws PluginException {
|
||||||
log.info("[MqttPlugin] Shutting down MQTT plugin");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
notifyConnectionState(ConnectionState.DISCONNECTING, null);
|
notifyConnectionState(ConnectionState.DISCONNECTING, null);
|
||||||
|
|
||||||
if (mqttClient != null) {
|
if (mqttClient != null) {
|
||||||
// Check actual client connection state, not just our flag
|
|
||||||
var clientState = mqttClient.getState();
|
var clientState = mqttClient.getState();
|
||||||
if (clientState.isConnected()) {
|
if (clientState.isConnected()) {
|
||||||
try {
|
try {
|
||||||
mqttClient.disconnect().join();
|
mqttClient.disconnect().join();
|
||||||
log.info("[MqttPlugin] Disconnected successfully");
|
|
||||||
} catch (Exception disconnectEx) {
|
} catch (Exception disconnectEx) {
|
||||||
// Log but don't throw - client may already be disconnected
|
// Client may already be disconnected
|
||||||
log.warn("[MqttPlugin] Disconnect failed (client may already be disconnected): {}",
|
|
||||||
disconnectEx.getMessage());
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.info("[MqttPlugin] Client already disconnected (state: {})", clientState);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,8 +133,7 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
notifyConnectionState(ConnectionState.DISCONNECTED, null);
|
notifyConnectionState(ConnectionState.DISCONNECTED, null);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MqttPlugin] Shutdown failed: {}", e.getMessage(), e);
|
log.error("[MQTT] Shutdown failed: {}", e.getMessage());
|
||||||
// Don't throw on shutdown - just log the error
|
|
||||||
connected = false;
|
connected = false;
|
||||||
messageHandlers.clear();
|
messageHandlers.clear();
|
||||||
ackHandler = null;
|
ackHandler = null;
|
||||||
@@ -185,7 +153,8 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String topic = String.format(TOPIC_TO_CLIENT, clientId, messageType);
|
String topic = String.format(TOPIC_TO_CLIENT, clientId, messageType);
|
||||||
log.debug("[MqttPlugin] Sending to client {} (type: {}) on topic: {}", clientId, messageType, topic);
|
String json = new String(payload, StandardCharsets.UTF_8);
|
||||||
|
log.info("[MQTT OUT] {} -> {}", topic, json);
|
||||||
|
|
||||||
return sendToTopic(topic, payload, options);
|
return sendToTopic(topic, payload, options);
|
||||||
}
|
}
|
||||||
@@ -198,8 +167,6 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String topic = String.format(TOPIC_ACK_TO_CLIENT, clientId);
|
String topic = String.format(TOPIC_ACK_TO_CLIENT, clientId);
|
||||||
log.debug("[MqttPlugin] Sending ACK to client {} for message {} on topic: {}", clientId, messageId, topic);
|
|
||||||
|
|
||||||
return sendToTopic(topic, payload, options);
|
return sendToTopic(topic, payload, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -214,31 +181,21 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
// Special case for login: subscribe to /server/login (without clientId)
|
// Special case for login: subscribe to /server/login (without clientId)
|
||||||
if ("login".equals(messageType)) {
|
if ("login".equals(messageType)) {
|
||||||
String loginTopic = "/server/login";
|
String loginTopic = "/server/login";
|
||||||
log.info("[MqttPlugin] Registering handler for message type '{}' with topic: {}", messageType, loginTopic);
|
|
||||||
|
|
||||||
mqttClient.subscribeWith().topicFilter(loginTopic).qos(MqttQos.EXACTLY_ONCE).send()
|
mqttClient.subscribeWith().topicFilter(loginTopic).qos(MqttQos.EXACTLY_ONCE).send()
|
||||||
.whenComplete((subAck, throwable) -> {
|
.whenComplete((subAck, throwable) -> {
|
||||||
if (throwable != null) {
|
if (throwable != null) {
|
||||||
log.error("[MqttPlugin] Subscription to {} failed: {}", loginTopic, throwable.getMessage());
|
log.error("[MQTT] Subscription to {} failed: {}", loginTopic, throwable.getMessage());
|
||||||
messageHandlers.remove(messageType);
|
messageHandlers.remove(messageType);
|
||||||
} else {
|
|
||||||
log.info("[MqttPlugin] Successfully subscribed to: {}", loginTopic);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// Standard pattern: /server/+/{messageType}
|
// Standard pattern: /server/+/{messageType}
|
||||||
String topicPattern = String.format(PATTERN_FROM_CLIENT, messageType);
|
String topicPattern = String.format(PATTERN_FROM_CLIENT, messageType);
|
||||||
log.info("[MqttPlugin] Registering handler for message type '{}' with pattern: {}", messageType,
|
|
||||||
topicPattern);
|
|
||||||
|
|
||||||
mqttClient.subscribeWith().topicFilter(topicPattern).qos(MqttQos.EXACTLY_ONCE).send()
|
mqttClient.subscribeWith().topicFilter(topicPattern).qos(MqttQos.EXACTLY_ONCE).send()
|
||||||
.whenComplete((subAck, throwable) -> {
|
.whenComplete((subAck, throwable) -> {
|
||||||
if (throwable != null) {
|
if (throwable != null) {
|
||||||
log.error("[MqttPlugin] Subscription to {} failed: {}", topicPattern,
|
log.error("[MQTT] Subscription to {} failed: {}", topicPattern, throwable.getMessage());
|
||||||
throwable.getMessage());
|
|
||||||
messageHandlers.remove(messageType);
|
messageHandlers.remove(messageType);
|
||||||
} else {
|
|
||||||
log.info("[MqttPlugin] Successfully subscribed to: {}", topicPattern);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -250,19 +207,15 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
throw new PluginException("MQTT client is not connected");
|
throw new PluginException("MQTT client is not connected");
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("[MqttPlugin] Registering ACK handler with pattern: {}", PATTERN_ACK_FROM_CLIENT);
|
|
||||||
|
|
||||||
this.ackHandler = handler;
|
this.ackHandler = handler;
|
||||||
|
|
||||||
// Subscribe to ACK topic pattern
|
// Subscribe to ACK topic pattern
|
||||||
mqttClient.subscribeWith().topicFilter(PATTERN_ACK_FROM_CLIENT).qos(MqttQos.EXACTLY_ONCE).send()
|
mqttClient.subscribeWith().topicFilter(PATTERN_ACK_FROM_CLIENT).qos(MqttQos.EXACTLY_ONCE).send()
|
||||||
.whenComplete((subAck, throwable) -> {
|
.whenComplete((subAck, throwable) -> {
|
||||||
if (throwable != null) {
|
if (throwable != null) {
|
||||||
log.error("[MqttPlugin] Subscription to {} failed: {}", PATTERN_ACK_FROM_CLIENT,
|
log.error("[MQTT] Subscription to {} failed: {}", PATTERN_ACK_FROM_CLIENT,
|
||||||
throwable.getMessage());
|
throwable.getMessage());
|
||||||
this.ackHandler = null;
|
this.ackHandler = null;
|
||||||
} else {
|
|
||||||
log.info("[MqttPlugin] Successfully subscribed to: {}", PATTERN_ACK_FROM_CLIENT);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -297,8 +250,6 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
mqttClient.publishes(com.hivemq.client.mqtt.MqttGlobalPublishFilter.ALL, publish -> {
|
mqttClient.publishes(com.hivemq.client.mqtt.MqttGlobalPublishFilter.ALL, publish -> {
|
||||||
handleIncomingMessage(publish);
|
handleIncomingMessage(publish);
|
||||||
});
|
});
|
||||||
|
|
||||||
log.info("[MqttPlugin] Global message handler configured");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -307,8 +258,10 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
private void handleIncomingMessage(Mqtt5Publish publish) {
|
private void handleIncomingMessage(Mqtt5Publish publish) {
|
||||||
String topic = publish.getTopic().toString();
|
String topic = publish.getTopic().toString();
|
||||||
byte[] payload = publish.getPayloadAsBytes();
|
byte[] payload = publish.getPayloadAsBytes();
|
||||||
|
String json = new String(payload, StandardCharsets.UTF_8);
|
||||||
|
|
||||||
log.debug("[MqttPlugin] Received message on topic: {}", topic);
|
// Log incoming message with topic and JSON
|
||||||
|
log.info("[MQTT IN] {} <- {}", topic, json);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Check if it's an ACK message (topic ends with /ack)
|
// Check if it's an ACK message (topic ends with /ack)
|
||||||
@@ -318,11 +271,9 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
// Check if it's a client message
|
// Check if it's a client message
|
||||||
else if (topic.startsWith("/server/")) {
|
else if (topic.startsWith("/server/")) {
|
||||||
handleClientMessage(topic, payload);
|
handleClientMessage(topic, payload);
|
||||||
} else {
|
|
||||||
log.warn("[MqttPlugin] Received message on unexpected topic: {}", topic);
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MqttPlugin] Error handling message on topic {}: {}", topic, e.getMessage(), e);
|
log.error("[MQTT] Error handling message on topic {}: {}", topic, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -332,27 +283,19 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
*/
|
*/
|
||||||
private void handleAckMessage(String topic, byte[] payload) {
|
private void handleAckMessage(String topic, byte[] payload) {
|
||||||
if (ackHandler == null) {
|
if (ackHandler == null) {
|
||||||
log.warn("[MqttPlugin] Received ACK but no handler registered: {}", topic);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract clientId from topic: /server/{clientId}/ack
|
// Extract clientId from topic: /server/{clientId}/ack
|
||||||
String[] parts = topic.split("/");
|
String[] parts = topic.split("/");
|
||||||
if (parts.length >= 4) {
|
if (parts.length >= 4) {
|
||||||
String clientId = parts[2]; // clientId is at index 2
|
|
||||||
|
|
||||||
// Extract messageId from payload
|
// Extract messageId from payload
|
||||||
String payloadStr = new String(payload, StandardCharsets.UTF_8);
|
String payloadStr = new String(payload, StandardCharsets.UTF_8);
|
||||||
String messageId = extractMessageIdFromPayload(payloadStr);
|
String messageId = extractMessageIdFromPayload(payloadStr);
|
||||||
|
|
||||||
if (messageId != null) {
|
if (messageId != null) {
|
||||||
log.debug("[MqttPlugin] Routing ACK for message: {} from client: {}", messageId, clientId);
|
|
||||||
ackHandler.onAckReceived(messageId, payload);
|
ackHandler.onAckReceived(messageId, payload);
|
||||||
} else {
|
|
||||||
log.warn("[MqttPlugin] Could not extract messageId from ACK payload: {}", payloadStr);
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.warn("[MqttPlugin] Invalid ACK topic format: {}", topic);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -410,10 +353,7 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
String messageType = parts[2];
|
String messageType = parts[2];
|
||||||
ClientMessageHandler handler = messageHandlers.get(messageType);
|
ClientMessageHandler handler = messageHandlers.get(messageType);
|
||||||
if (handler != null) {
|
if (handler != null) {
|
||||||
log.debug("[MqttPlugin] Routing login message (type: {})", messageType);
|
|
||||||
handler.onMessageReceived(null, payload);
|
handler.onMessageReceived(null, payload);
|
||||||
} else {
|
|
||||||
log.warn("[MqttPlugin] No handler registered for message type: {}", messageType);
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -427,13 +367,8 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
|
|
||||||
ClientMessageHandler handler = messageHandlers.get(messageType);
|
ClientMessageHandler handler = messageHandlers.get(messageType);
|
||||||
if (handler != null) {
|
if (handler != null) {
|
||||||
log.debug("[MqttPlugin] Routing message from client {} (type: {})", clientId, messageType);
|
|
||||||
handler.onMessageReceived(clientId, payload);
|
handler.onMessageReceived(clientId, payload);
|
||||||
} else {
|
|
||||||
log.warn("[MqttPlugin] No handler registered for message type: {}", messageType);
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.warn("[MqttPlugin] Invalid client message topic format: {}", topic);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -445,12 +380,9 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
var publishBuilder = Mqtt5Publish.builder().topic(topic).payload(payload).qos(mapQos(options.getQos()))
|
var publishBuilder = Mqtt5Publish.builder().topic(topic).payload(payload).qos(mapQos(options.getQos()))
|
||||||
.retain(options.isRetained());
|
.retain(options.isRetained());
|
||||||
|
|
||||||
return mqttClient.publish(publishBuilder.build()).thenApply(publishResult -> {
|
return mqttClient.publish(publishBuilder.build()).thenApply(publishResult -> null);
|
||||||
log.debug("[MqttPlugin] Message published to topic: {}", topic);
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MqttPlugin] Failed to publish to topic {}: {}", topic, e.getMessage(), e);
|
log.error("[MQTT] Failed to publish to topic {}: {}", topic, e.getMessage());
|
||||||
return CompletableFuture.failedFuture(new PluginException("Failed to publish message", e));
|
return CompletableFuture.failedFuture(new PluginException("Failed to publish message", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -471,20 +403,14 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
|||||||
* Notify connection state listener.
|
* Notify connection state listener.
|
||||||
*/
|
*/
|
||||||
private void notifyConnectionState(ConnectionState state, String message) {
|
private void notifyConnectionState(ConnectionState state, String message) {
|
||||||
log.debug("[MqttPlugin] notifyConnectionState called: state={}, listener={}", state,
|
|
||||||
connectionListener != null ? "present" : "null");
|
|
||||||
if (connectionListener != null) {
|
if (connectionListener != null) {
|
||||||
ConnectionStateEvent event = ConnectionStateEvent.builder().state(state).previousState(null)
|
ConnectionStateEvent event = ConnectionStateEvent.builder().state(state).previousState(null)
|
||||||
.errorMessage(message).pluginName(PLUGIN_NAME).build();
|
.errorMessage(message).pluginName(PLUGIN_NAME).build();
|
||||||
try {
|
try {
|
||||||
log.debug("[MqttPlugin] Calling connectionListener.onConnectionStateChanged");
|
|
||||||
connectionListener.onConnectionStateChanged(event);
|
connectionListener.onConnectionStateChanged(event);
|
||||||
log.debug("[MqttPlugin] connectionListener.onConnectionStateChanged completed");
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MqttPlugin] Error in connection listener: {}", e.getMessage(), e);
|
log.error("[MQTT] Error in connection listener: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.warn("[MqttPlugin] Connection listener is null, cannot notify state: {}", state);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,29 +56,13 @@ class MqttPublisherImpl implements MqttPublisher {
|
|||||||
// Use MessageDeliveryService for reliable delivery
|
// Use MessageDeliveryService for reliable delivery
|
||||||
DeliveryOptions options = DeliveryOptions.builder().requiresAck(true).retained(retained).build();
|
DeliveryOptions options = DeliveryOptions.builder().requiresAck(true).retained(retained).build();
|
||||||
|
|
||||||
deliveryService.sendToClient(clientId, messageType, payload, options).thenAccept(receipt -> {
|
deliveryService.sendToClient(clientId, messageType, payload, options).exceptionally(ex -> {
|
||||||
log.info("=== MESSAGE DELIVERY SUBMITTED ===");
|
log.error("[MQTT] Failed to deliver to {}: {}", topic, ex.getMessage());
|
||||||
log.info("Topic: {}", topic);
|
|
||||||
log.info("Message ID: {}", receipt.getMessageId());
|
|
||||||
log.info("Status: {}", receipt.getStatus());
|
|
||||||
log.info("Retained: {}", retained);
|
|
||||||
|
|
||||||
// Log payload for debugging
|
|
||||||
try {
|
|
||||||
String json = (payload instanceof String s) ? s : objectMapper.writeValueAsString(payload);
|
|
||||||
log.info("Payload: {}", json);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.debug("Could not serialize payload for logging: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("=== END MESSAGE DELIVERY ===");
|
|
||||||
}).exceptionally(ex -> {
|
|
||||||
log.error("Failed to submit message for delivery to topic {}: {}", topic, ex.getMessage(), ex);
|
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to publish message for topic {}: {}", topic, e.getMessage(), e);
|
log.error("[MQTT] Failed to publish to {}: {}", topic, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,21 +1,22 @@
|
|||||||
package de.assecutor.votianlt.pages.service;
|
package de.assecutor.votianlt.pages.service;
|
||||||
|
|
||||||
|
import de.assecutor.votianlt.dto.JobWithRelatedDataDTO;
|
||||||
import de.assecutor.votianlt.model.CargoItem;
|
import de.assecutor.votianlt.model.CargoItem;
|
||||||
import de.assecutor.votianlt.model.Job;
|
import de.assecutor.votianlt.model.Job;
|
||||||
import de.assecutor.votianlt.model.JobStatus;
|
import de.assecutor.votianlt.model.JobStatus;
|
||||||
import de.assecutor.votianlt.model.task.BaseTask;
|
import de.assecutor.votianlt.model.task.BaseTask;
|
||||||
|
import de.assecutor.votianlt.mqtt.MqttPublisher;
|
||||||
import de.assecutor.votianlt.repository.JobRepository;
|
import de.assecutor.votianlt.repository.JobRepository;
|
||||||
import de.assecutor.votianlt.repository.TaskRepository;
|
import de.assecutor.votianlt.repository.TaskRepository;
|
||||||
import de.assecutor.votianlt.security.SecurityService;
|
import de.assecutor.votianlt.security.SecurityService;
|
||||||
import de.assecutor.votianlt.repository.CargoItemRepository;
|
import de.assecutor.votianlt.repository.CargoItemRepository;
|
||||||
|
import de.assecutor.votianlt.service.ClientConnectionService;
|
||||||
import de.assecutor.votianlt.service.JobHistoryService;
|
import de.assecutor.votianlt.service.JobHistoryService;
|
||||||
import de.assecutor.votianlt.service.EmailService;
|
import de.assecutor.votianlt.service.EmailService;
|
||||||
import de.assecutor.votianlt.event.JobCreatedEvent;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.bson.types.ObjectId;
|
import org.bson.types.ObjectId;
|
||||||
import org.springframework.context.ApplicationEventPublisher;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
@@ -34,7 +35,8 @@ public class AddJobService {
|
|||||||
private final SecurityService securityService;
|
private final SecurityService securityService;
|
||||||
private final JobHistoryService jobHistoryService;
|
private final JobHistoryService jobHistoryService;
|
||||||
private final EmailService emailService;
|
private final EmailService emailService;
|
||||||
private final ApplicationEventPublisher eventPublisher;
|
private final ClientConnectionService clientConnectionService;
|
||||||
|
private final MqttPublisher mqttPublisher;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Speichert einen neuen Auftrag samt CargoItems und Tasks
|
* Speichert einen neuen Auftrag samt CargoItems und Tasks
|
||||||
@@ -121,13 +123,8 @@ public class AddJobService {
|
|||||||
e.getMessage());
|
e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish job created event for real-time UI updates
|
// MQTT-Benachrichtigung an Client senden, wenn online
|
||||||
try {
|
notifyClientJobCreated(savedJob);
|
||||||
eventPublisher.publishEvent(new JobCreatedEvent(this, savedJob));
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("Failed to publish job created event for job {}: {}", savedJob.getIdAsString(),
|
|
||||||
e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("Auftrag erfolgreich gespeichert: {}", savedJob.getJobNumber());
|
log.info("Auftrag erfolgreich gespeichert: {}", savedJob.getJobNumber());
|
||||||
return savedJob;
|
return savedJob;
|
||||||
@@ -183,4 +180,40 @@ public class AddJobService {
|
|||||||
}
|
}
|
||||||
return "System"; // Fallback wenn kein authentifizierter Benutzer gefunden wird
|
return "System"; // Fallback wenn kein authentifizierter Benutzer gefunden wird
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sendet den neu erstellten Job per MQTT an den zugewiesenen Client, falls dieser
|
||||||
|
* online ist.
|
||||||
|
*/
|
||||||
|
private void notifyClientJobCreated(Job job) {
|
||||||
|
if (!job.isDigitalProcessing()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String appUserId = job.getAppUser();
|
||||||
|
if (appUserId == null || appUserId.isBlank()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!clientConnectionService.isClientConnected(appUserId)) {
|
||||||
|
log.info("[JOB] Client {} not online, skipping job_created notification", appUserId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Lade CargoItems und Tasks für den Job
|
||||||
|
List<CargoItem> cargoItems = cargoItemRepository.findByJobId(job.getId());
|
||||||
|
List<BaseTask> tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId());
|
||||||
|
|
||||||
|
// Erstelle DTO mit allen Daten
|
||||||
|
JobWithRelatedDataDTO jobData = new JobWithRelatedDataDTO(job, cargoItems, tasks);
|
||||||
|
|
||||||
|
String topic = "/client/" + appUserId + "/job_created";
|
||||||
|
log.info("[JOB] Sending job_created to {}: jobId={}, jobNumber={}", topic, job.getId().toHexString(),
|
||||||
|
job.getJobNumber());
|
||||||
|
mqttPublisher.publishAsJson(topic, jobData, false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("[JOB] Failed to send job_created notification: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1004,12 +1004,6 @@ public class MessageDetailsView extends Main implements BeforeEnterObserver {
|
|||||||
ensureScrollAnchor();
|
ensureScrollAnchor();
|
||||||
scrollToBottom();
|
scrollToBottom();
|
||||||
|
|
||||||
// Play notification sound and show browser notification for incoming client messages
|
|
||||||
if (message.getOrigin() == MessageOrigin.CLIENT) {
|
|
||||||
playNotificationSound(ui);
|
|
||||||
showBrowserNotification(ui, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("Messages re-rendered with new message");
|
log.info("Messages re-rendered with new message");
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@@ -1017,62 +1011,4 @@ public class MessageDetailsView extends Main implements BeforeEnterObserver {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Play a notification sound when a new message arrives
|
|
||||||
*/
|
|
||||||
private void playNotificationSound(UI ui) {
|
|
||||||
ui.getPage().executeJs(
|
|
||||||
"const audio = new Audio('data:audio/wav;base64,UklGRnoGAABXQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YQoGAACBhYqFbF1fdJivrJBhNjVgodDbq2EcBj+a2/LDciUFLIHO8tiJNwgZaLvt559NEAxQp+PwtmMcBjiR1/LMeSwFJHfH8N2QQAoUXrTp66hVFApGn+DyvmwhBTGH0fPTgjMGHm7A7+OZSA0PVK3m8LRjHAU7k9nzyn0tBSd5ye/glEIKEl206O2oVhQLSKHi8r5uIgU0idT01IMzByBwwvHjmEgNDlWs5PCzYhsFO5TY88p+Kwcme8jw4JVCChNdt+jvp1QVDEih4vK+bSIGNIrV9dODMggib8Lx5JdIDQ9VrObws2IbBT6U2PXKfi0IJnzH8OCVQgoVXbfp76dVFQ5IouLyvW0jCDSL1fTSgTQJJG7C8eSWSA8RVq3m8LJgGwg/lNj0yn4tCSV7x+/glUILFl237++nVhYOSKPi8rxtIwo0i9X00oE1CiNuwvDklkkREVat5u+yXxwJP5PY9Ml+Lgoge8fv4JVCDBVct+7vqFYYEUij4vG8bSQKNIvV89GBNQshbcLw5JZJERFV\u003d\u003d'); audio.play().catch(err => console.log('Audio play failed:', err));");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Show a browser notification when a new message arrives
|
|
||||||
*/
|
|
||||||
private void showBrowserNotification(UI ui, Message message) {
|
|
||||||
String senderName = resolveAppUserName();
|
|
||||||
String preview = resolveNotificationPreview(message);
|
|
||||||
String title = senderName != null ? senderName : "Neue Nachricht";
|
|
||||||
|
|
||||||
ui.getPage().executeJs(
|
|
||||||
"if (!('Notification' in window)) {" + " console.log('Browser does not support notifications');"
|
|
||||||
+ "} else if (Notification.permission === 'granted') {" + " new Notification($0, { body: $1 });"
|
|
||||||
+ "} else if (Notification.permission !== 'denied') {" + " Notification.requestPermission().then(permission => {"
|
|
||||||
+ " if (permission === 'granted') {" + " new Notification($0, { body: $1 });" + " }" + " });" + "}",
|
|
||||||
title, preview);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resolve the AppUser name for the current conversation
|
|
||||||
*/
|
|
||||||
private String resolveAppUserName() {
|
|
||||||
if (participantKey == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
ObjectId appUserId = new ObjectId(participantKey);
|
|
||||||
AppUser appUser = appUserService.findById(appUserId);
|
|
||||||
return appUser != null ? appUser.getBezeichnung() : null;
|
|
||||||
} catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resolve a short preview text for the notification
|
|
||||||
*/
|
|
||||||
private String resolveNotificationPreview(Message message) {
|
|
||||||
if (message == null) {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
if (message.getContentType() == MessageContentType.IMAGE) {
|
|
||||||
return "📷 Bild";
|
|
||||||
}
|
|
||||||
String content = message.getContent();
|
|
||||||
if (content == null || content.isBlank()) {
|
|
||||||
return "(kein Inhalt)";
|
|
||||||
}
|
|
||||||
// Limit preview to 100 characters
|
|
||||||
return content.length() > 100 ? content.substring(0, 97) + "..." : content;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ import de.assecutor.votianlt.service.MessageService;
|
|||||||
import de.assecutor.votianlt.util.DateTimeFormatUtil;
|
import de.assecutor.votianlt.util.DateTimeFormatUtil;
|
||||||
import jakarta.annotation.security.RolesAllowed;
|
import jakarta.annotation.security.RolesAllowed;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.bson.types.ObjectId;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -55,7 +54,7 @@ public class MessagesView extends Main {
|
|||||||
private final AtomicBoolean loading = new AtomicBoolean(false);
|
private final AtomicBoolean loading = new AtomicBoolean(false);
|
||||||
private Registration pollRegistration;
|
private Registration pollRegistration;
|
||||||
private Registration broadcasterRegistration;
|
private Registration broadcasterRegistration;
|
||||||
private int lastMessageCount = 0;
|
private List<AppUser> cachedAppUsers;
|
||||||
|
|
||||||
public MessagesView(MessageService messageService, AppUserService appUserService,
|
public MessagesView(MessageService messageService, AppUserService appUserService,
|
||||||
MessageBroadcaster messageBroadcaster) {
|
MessageBroadcaster messageBroadcaster) {
|
||||||
@@ -152,7 +151,14 @@ public class MessagesView extends Main {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
List<AppUser> appUsers = Optional.ofNullable(appUserService.findByCurrentUser()).orElseGet(ArrayList::new);
|
List<AppUser> appUsers;
|
||||||
|
try {
|
||||||
|
appUsers = Optional.ofNullable(appUserService.findByCurrentUser()).orElseGet(ArrayList::new);
|
||||||
|
cachedAppUsers = appUsers;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("SecurityContext not available, using cached app users");
|
||||||
|
appUsers = cachedAppUsers != null ? cachedAppUsers : new ArrayList<>();
|
||||||
|
}
|
||||||
Map<String, AppUser> appUserLookup = buildAppUserLookup(appUsers);
|
Map<String, AppUser> appUserLookup = buildAppUserLookup(appUsers);
|
||||||
|
|
||||||
List<Message> allMessages = messageService.getAllMessages();
|
List<Message> allMessages = messageService.getAllMessages();
|
||||||
@@ -328,15 +334,26 @@ public class MessagesView extends Main {
|
|||||||
UI ui = attachEvent.getUI();
|
UI ui = attachEvent.getUI();
|
||||||
ui.setPollInterval(POLL_INTERVAL_MS);
|
ui.setPollInterval(POLL_INTERVAL_MS);
|
||||||
pollRegistration = ui.addPollListener(event -> loadClientSummaries());
|
pollRegistration = ui.addPollListener(event -> loadClientSummaries());
|
||||||
|
cachedAppUsers = Optional.ofNullable(appUserService.findByCurrentUser()).orElseGet(ArrayList::new);
|
||||||
// Register broadcaster for real-time notifications
|
broadcasterRegistration = messageBroadcaster.register(message -> handleIncomingMessage(ui, message));
|
||||||
broadcasterRegistration = messageBroadcaster.register(message -> {
|
requestBrowserPermissions(ui);
|
||||||
handleIncomingMessage(ui, message);
|
|
||||||
});
|
|
||||||
|
|
||||||
loadClientSummaries();
|
loadClientSummaries();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void requestBrowserPermissions(UI ui) {
|
||||||
|
ui.getPage().executeJs(
|
||||||
|
"if ('Notification' in window && Notification.permission === 'default') {"
|
||||||
|
+ " Notification.requestPermission();"
|
||||||
|
+ "}"
|
||||||
|
+ "if (!window._votianAudioCtx) {"
|
||||||
|
+ " window._votianAudioCtx = new (window.AudioContext || window.webkitAudioContext)();"
|
||||||
|
+ " document.addEventListener('click', function _resumeAudio() {"
|
||||||
|
+ " window._votianAudioCtx.resume();"
|
||||||
|
+ " document.removeEventListener('click', _resumeAudio);"
|
||||||
|
+ " }, { once: true });"
|
||||||
|
+ "}");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void onDetach(DetachEvent detachEvent) {
|
protected void onDetach(DetachEvent detachEvent) {
|
||||||
super.onDetach(detachEvent);
|
super.onDetach(detachEvent);
|
||||||
@@ -351,89 +368,54 @@ public class MessagesView extends Main {
|
|||||||
detachEvent.getUI().setPollInterval(-1);
|
detachEvent.getUI().setPollInterval(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle incoming message for notification purposes
|
|
||||||
*/
|
|
||||||
private void handleIncomingMessage(UI ui, Message message) {
|
private void handleIncomingMessage(UI ui, Message message) {
|
||||||
if (message == null || message.getOrigin() != MessageOrigin.CLIENT) {
|
if (message.getOrigin() != MessageOrigin.CLIENT) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ui.access(() -> {
|
ui.access(() -> {
|
||||||
try {
|
String senderName = resolveSenderName(message.getReceiver());
|
||||||
// Count total messages to detect new ones
|
String preview = resolvePreview(message);
|
||||||
int currentMessageCount = messageService.countAllMessages();
|
|
||||||
if (currentMessageCount > lastMessageCount) {
|
|
||||||
lastMessageCount = currentMessageCount;
|
|
||||||
|
|
||||||
// Play notification sound and show browser notification
|
// Play notification sound
|
||||||
playNotificationSound(ui);
|
ui.getPage().executeJs(
|
||||||
showBrowserNotification(ui, message);
|
"try {"
|
||||||
|
+ " const ctx = new (window.AudioContext || window.webkitAudioContext)();"
|
||||||
|
+ " ctx.resume().then(() => {"
|
||||||
|
+ " const osc = ctx.createOscillator();"
|
||||||
|
+ " const gain = ctx.createGain();"
|
||||||
|
+ " osc.connect(gain);"
|
||||||
|
+ " gain.connect(ctx.destination);"
|
||||||
|
+ " osc.frequency.value = 800;"
|
||||||
|
+ " osc.type = 'sine';"
|
||||||
|
+ " gain.gain.setValueAtTime(0.3, ctx.currentTime);"
|
||||||
|
+ " gain.gain.exponentialRampToValueAtTime(0.01, ctx.currentTime + 0.5);"
|
||||||
|
+ " osc.start(ctx.currentTime);"
|
||||||
|
+ " osc.stop(ctx.currentTime + 0.5);"
|
||||||
|
+ " });"
|
||||||
|
+ "} catch(e) { console.warn('Notification sound failed:', e); }");
|
||||||
|
|
||||||
|
// Show notification
|
||||||
|
Notification notification = Notification.show(
|
||||||
|
"Neue Nachricht von " + senderName + ": " + preview,
|
||||||
|
4000,
|
||||||
|
Notification.Position.TOP_END);
|
||||||
|
notification.addThemeVariants(NotificationVariant.LUMO_PRIMARY);
|
||||||
|
|
||||||
// Refresh the grid
|
|
||||||
loadClientSummaries();
|
loadClientSummaries();
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Error handling incoming message notification", e);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private String resolveSenderName(String clientId) {
|
||||||
* Play a notification sound when a new message arrives
|
if (clientId == null || clientId.isBlank()) {
|
||||||
*/
|
return "Unbekannt";
|
||||||
private void playNotificationSound(UI ui) {
|
|
||||||
ui.getPage().executeJs(
|
|
||||||
"const audio = new Audio('data:audio/wav;base64,UklGRnoGAABXQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YQoGAACBhYqFbF1fdJivrJBhNjVgodDbq2EcBj+a2/LDciUFLIHO8tiJNwgZaLvt559NEAxQp+PwtmMcBjiR1/LMeSwFJHfH8N2QQAoUXrTp66hVFApGn+DyvmwhBTGH0fPTgjMGHm7A7+OZSA0PVK3m8LRjHAU7k9nzyn0tBSd5ye/glEIKEl206O2oVhQLSKHi8r5uIgU0idT01IMzByBwwvHjmEgNDlWs5PCzYhsFO5TY88p+Kwcme8jw4JVCChNdt+jvp1QVDEih4vK+bSIGNIrV9dODMggib8Lx5JdIDQ9VrObws2IbBT6U2PXKfi0IJnzH8OCVQgoVXbfp76dVFQ5IouLyvW0jCDSL1fTSgTQJJG7C8eSWSA8RVq3m8LJgGwg/lNj0yn4tCSV7x+/glUILFl237++nVhYOSKPi8rxtIwo0i9X00oE1CiNuwvDklkkREVat5u+yXxwJP5PY9Ml+Lgoge8fv4JVCDBVct+7vqFYYEUij4vG8bSQKNIvV89GBNQshbcLw5JZJERFV\u003d\u003d'); audio.play().catch(err => console.log('Audio play failed:', err));");
|
|
||||||
}
|
}
|
||||||
|
List<AppUser> appUsers = cachedAppUsers != null ? cachedAppUsers : List.of();
|
||||||
/**
|
return appUsers.stream()
|
||||||
* Show a browser notification when a new message arrives
|
.filter(user -> clientId.equals(user.getIdAsString())
|
||||||
*/
|
|| clientId.equals(user.getEmail())
|
||||||
private void showBrowserNotification(UI ui, Message message) {
|
|| clientId.equals(user.getAppCode()))
|
||||||
String senderName = resolveAppUserName(message.getReceiver());
|
.findFirst()
|
||||||
String preview = resolveNotificationPreview(message);
|
.map(this::buildClientName)
|
||||||
String title = senderName != null ? ("Nachricht von " + senderName) : "Neue Nachricht";
|
.orElse(clientId);
|
||||||
|
|
||||||
ui.getPage().executeJs(
|
|
||||||
"if (!('Notification' in window)) {" + " console.log('Browser does not support notifications');"
|
|
||||||
+ "} else if (Notification.permission === 'granted') {" + " new Notification($0, { body: $1 });"
|
|
||||||
+ "} else if (Notification.permission !== 'denied') {" + " Notification.requestPermission().then(permission => {"
|
|
||||||
+ " if (permission === 'granted') {" + " new Notification($0, { body: $1 });" + " }" + " });" + "}",
|
|
||||||
title, preview);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resolve the AppUser name by ID
|
|
||||||
*/
|
|
||||||
private String resolveAppUserName(String appUserId) {
|
|
||||||
if (appUserId == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
ObjectId id = new ObjectId(appUserId);
|
|
||||||
AppUser appUser = appUserService.findById(id);
|
|
||||||
return appUser != null ? appUser.getBezeichnung() : null;
|
|
||||||
} catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resolve a short preview text for the notification
|
|
||||||
*/
|
|
||||||
private String resolveNotificationPreview(Message message) {
|
|
||||||
if (message == null) {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
if (message.getContentType() == MessageContentType.IMAGE) {
|
|
||||||
return "📷 Bild";
|
|
||||||
}
|
|
||||||
String content = message.getContent();
|
|
||||||
if (content == null || content.isBlank()) {
|
|
||||||
return "(kein Inhalt)";
|
|
||||||
}
|
|
||||||
// Limit preview to 100 characters
|
|
||||||
return content.length() > 100 ? content.substring(0, 97) + "..." : content;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,5 @@
|
|||||||
package de.assecutor.votianlt.pages.view;
|
package de.assecutor.votianlt.pages.view;
|
||||||
|
|
||||||
import com.vaadin.flow.component.AttachEvent;
|
|
||||||
import com.vaadin.flow.component.DetachEvent;
|
|
||||||
import com.vaadin.flow.component.UI;
|
|
||||||
import com.vaadin.flow.component.button.Button;
|
import com.vaadin.flow.component.button.Button;
|
||||||
import com.vaadin.flow.component.button.ButtonVariant;
|
import com.vaadin.flow.component.button.ButtonVariant;
|
||||||
import com.vaadin.flow.component.combobox.ComboBox;
|
import com.vaadin.flow.component.combobox.ComboBox;
|
||||||
@@ -19,20 +16,21 @@ import com.vaadin.flow.component.orderedlayout.HorizontalLayout;
|
|||||||
import com.vaadin.flow.component.orderedlayout.VerticalLayout;
|
import com.vaadin.flow.component.orderedlayout.VerticalLayout;
|
||||||
import com.vaadin.flow.component.textfield.TextField;
|
import com.vaadin.flow.component.textfield.TextField;
|
||||||
import com.vaadin.flow.server.StreamResource;
|
import com.vaadin.flow.server.StreamResource;
|
||||||
import com.vaadin.flow.shared.Registration;
|
|
||||||
import com.vaadin.flow.router.PageTitle;
|
import com.vaadin.flow.router.PageTitle;
|
||||||
import com.vaadin.flow.router.Route;
|
import com.vaadin.flow.router.Route;
|
||||||
import de.assecutor.votianlt.model.Job;
|
import de.assecutor.votianlt.model.Job;
|
||||||
import de.assecutor.votianlt.model.JobStatus;
|
import de.assecutor.votianlt.model.JobStatus;
|
||||||
|
import de.assecutor.votianlt.mqtt.MqttPublisher;
|
||||||
import de.assecutor.votianlt.repository.JobRepository;
|
import de.assecutor.votianlt.repository.JobRepository;
|
||||||
import de.assecutor.votianlt.security.SecurityService;
|
import de.assecutor.votianlt.security.SecurityService;
|
||||||
import de.assecutor.votianlt.service.JobBroadcaster;
|
import de.assecutor.votianlt.service.ClientConnectionService;
|
||||||
import de.assecutor.votianlt.service.JobHistoryService;
|
import de.assecutor.votianlt.service.JobHistoryService;
|
||||||
import jakarta.annotation.security.RolesAllowed;
|
import jakarta.annotation.security.RolesAllowed;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
@PageTitle("Aufträge")
|
@PageTitle("Aufträge")
|
||||||
@Route(value = "jobs", layout = de.assecutor.votianlt.pages.base.ui.view.MainLayout.class)
|
@Route(value = "jobs", layout = de.assecutor.votianlt.pages.base.ui.view.MainLayout.class)
|
||||||
@@ -47,17 +45,19 @@ public class ShowJobsView extends VerticalLayout {
|
|||||||
private final JobRepository jobRepository;
|
private final JobRepository jobRepository;
|
||||||
private final JobHistoryService jobHistoryService;
|
private final JobHistoryService jobHistoryService;
|
||||||
private final SecurityService securityService;
|
private final SecurityService securityService;
|
||||||
private final JobBroadcaster jobBroadcaster;
|
private final ClientConnectionService clientConnectionService;
|
||||||
|
private final MqttPublisher mqttPublisher;
|
||||||
private final Grid<Job> grid = new Grid<>(Job.class, false);
|
private final Grid<Job> grid = new Grid<>(Job.class, false);
|
||||||
private Registration broadcasterRegistration;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public ShowJobsView(JobRepository jobRepository, JobHistoryService jobHistoryService,
|
public ShowJobsView(JobRepository jobRepository, JobHistoryService jobHistoryService,
|
||||||
SecurityService securityService, JobBroadcaster jobBroadcaster) {
|
SecurityService securityService, ClientConnectionService clientConnectionService,
|
||||||
|
MqttPublisher mqttPublisher) {
|
||||||
this.jobRepository = jobRepository;
|
this.jobRepository = jobRepository;
|
||||||
this.jobHistoryService = jobHistoryService;
|
this.jobHistoryService = jobHistoryService;
|
||||||
this.securityService = securityService;
|
this.securityService = securityService;
|
||||||
this.jobBroadcaster = jobBroadcaster;
|
this.clientConnectionService = clientConnectionService;
|
||||||
|
this.mqttPublisher = mqttPublisher;
|
||||||
setSizeFull();
|
setSizeFull();
|
||||||
setPadding(true);
|
setPadding(true);
|
||||||
setSpacing(true);
|
setSpacing(true);
|
||||||
@@ -125,6 +125,18 @@ public class ShowJobsView extends VerticalLayout {
|
|||||||
return new com.vaadin.flow.component.html.Span();
|
return new com.vaadin.flow.component.html.Span();
|
||||||
}).setHeader("").setAutoWidth(true).setFlexGrow(0);
|
}).setHeader("").setAutoWidth(true).setFlexGrow(0);
|
||||||
|
|
||||||
|
// Delete column (last column, right side)
|
||||||
|
grid.addComponentColumn(job -> {
|
||||||
|
Button deleteBtn = new Button(new Icon(VaadinIcon.TRASH));
|
||||||
|
deleteBtn.addThemeVariants(ButtonVariant.LUMO_TERTIARY, ButtonVariant.LUMO_ERROR);
|
||||||
|
deleteBtn.setTooltipText("Auftrag löschen");
|
||||||
|
deleteBtn.addClickListener(e -> {
|
||||||
|
e.getSource().getElement().getNode(); // prevent row click
|
||||||
|
showDeleteJobDialog(job);
|
||||||
|
});
|
||||||
|
return deleteBtn;
|
||||||
|
}).setHeader("").setWidth("60px").setFlexGrow(0);
|
||||||
|
|
||||||
grid.setMultiSort(true);
|
grid.setMultiSort(true);
|
||||||
grid.setSizeFull();
|
grid.setSizeFull();
|
||||||
|
|
||||||
@@ -171,6 +183,57 @@ public class ShowJobsView extends VerticalLayout {
|
|||||||
dialog.open();
|
dialog.open();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void showDeleteJobDialog(Job job) {
|
||||||
|
ConfirmDialog dialog = new ConfirmDialog();
|
||||||
|
dialog.setHeader("Auftrag löschen");
|
||||||
|
dialog.setText("Möchten Sie den Auftrag " + job.getJobNumber() + " wirklich löschen? Diese Aktion kann nicht rückgängig gemacht werden.");
|
||||||
|
dialog.setCancelable(true);
|
||||||
|
dialog.setCancelText("Abbrechen");
|
||||||
|
dialog.setConfirmText("Löschen");
|
||||||
|
dialog.setConfirmButtonTheme("error primary");
|
||||||
|
dialog.addConfirmListener(e -> {
|
||||||
|
try {
|
||||||
|
// Notify client before deleting if online
|
||||||
|
notifyClientJobDeleted(job);
|
||||||
|
|
||||||
|
jobRepository.delete(job);
|
||||||
|
Notification.show("Auftrag " + job.getJobNumber() + " wurde gelöscht.", 3000,
|
||||||
|
Notification.Position.BOTTOM_END).addThemeVariants(NotificationVariant.LUMO_SUCCESS);
|
||||||
|
loadData();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
Notification.show("Fehler beim Löschen: " + ex.getMessage(), 5000, Notification.Position.BOTTOM_END)
|
||||||
|
.addThemeVariants(NotificationVariant.LUMO_ERROR);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
dialog.open();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyClientJobDeleted(Job job) {
|
||||||
|
if (!job.isDigitalProcessing()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String appUserId = job.getAppUser();
|
||||||
|
if (appUserId == null || appUserId.isBlank()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!clientConnectionService.isClientConnected(appUserId)) {
|
||||||
|
log.info("[JOB] Client {} not online, skipping job_deleted notification", appUserId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> payload = Map.of(
|
||||||
|
"type", "job_deleted",
|
||||||
|
"jobId", job.getId().toHexString(),
|
||||||
|
"jobNumber", job.getJobNumber() != null ? job.getJobNumber() : "",
|
||||||
|
"deletedAt", LocalDateTime.now().toString());
|
||||||
|
|
||||||
|
String topic = "/client/" + appUserId + "/job_deleted";
|
||||||
|
log.info("[JOB] Sending job_deleted to {}: {}", topic, payload);
|
||||||
|
mqttPublisher.publishAsJson(topic, payload, false);
|
||||||
|
}
|
||||||
|
|
||||||
private void loadData() {
|
private void loadData() {
|
||||||
var start = startDate.getValue();
|
var start = startDate.getValue();
|
||||||
var end = endDate.getValue();
|
var end = endDate.getValue();
|
||||||
@@ -263,75 +326,4 @@ public class ShowJobsView extends VerticalLayout {
|
|||||||
}
|
}
|
||||||
return customerSelection.trim();
|
return customerSelection.trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void onAttach(AttachEvent attachEvent) {
|
|
||||||
super.onAttach(attachEvent);
|
|
||||||
UI ui = attachEvent.getUI();
|
|
||||||
|
|
||||||
// Register broadcaster for real-time job notifications
|
|
||||||
broadcasterRegistration = jobBroadcaster.register(job -> {
|
|
||||||
handleNewJob(ui, job);
|
|
||||||
});
|
|
||||||
|
|
||||||
log.info("ShowJobsView attached and job listener registered");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void onDetach(DetachEvent detachEvent) {
|
|
||||||
super.onDetach(detachEvent);
|
|
||||||
if (broadcasterRegistration != null) {
|
|
||||||
broadcasterRegistration.remove();
|
|
||||||
broadcasterRegistration = null;
|
|
||||||
}
|
|
||||||
log.info("ShowJobsView detached and job listener unregistered");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle new job notification
|
|
||||||
*/
|
|
||||||
private void handleNewJob(UI ui, Job job) {
|
|
||||||
if (job == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ui.access(() -> {
|
|
||||||
try {
|
|
||||||
// Play notification sound and show browser notification
|
|
||||||
playNotificationSound(ui);
|
|
||||||
showBrowserNotification(ui, job);
|
|
||||||
|
|
||||||
// Refresh the grid
|
|
||||||
loadData();
|
|
||||||
|
|
||||||
log.info("New job notification displayed for job {}", job.getJobNumber());
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Error handling new job notification", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Play a notification sound when a new job is created
|
|
||||||
*/
|
|
||||||
private void playNotificationSound(UI ui) {
|
|
||||||
ui.getPage().executeJs(
|
|
||||||
"const audio = new Audio('data:audio/wav;base64,UklGRnoGAABXQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YQoGAACBhYqFbF1fdJivrJBhNjVgodDbq2EcBj+a2/LDciUFLIHO8tiJNwgZaLvt559NEAxQp+PwtmMcBjiR1/LMeSwFJHfH8N2QQAoUXrTp66hVFApGn+DyvmwhBTGH0fPTgjMGHm7A7+OZSA0PVK3m8LRjHAU7k9nzyn0tBSd5ye/glEIKEl206O2oVhQLSKHi8r5uIgU0idT01IMzByBwwvHjmEgNDlWs5PCzYhsFO5TY88p+Kwcme8jw4JVCChNdt+jvp1QVDEih4vK+bSIGNIrV9dODMggib8Lx5JdIDQ9VrObws2IbBT6U2PXKfi0IJnzH8OCVQgoVXbfp76dVFQ5IouLyvW0jCDSL1fTSgTQJJG7C8eSWSA8RVq3m8LJgGwg/lNj0yn4tCSV7x+/glUILFl237++nVhYOSKPi8rxtIwo0i9X00oE1CiNuwvDklkkREVat5u+yXxwJP5PY9Ml+Lgoge8fv4JVCDBVct+7vqFYYEUij4vG8bSQKNIvV89GBNQshbcLw5JZJERFV\u003d\u003d'); audio.play().catch(err => console.log('Audio play failed:', err));");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Show a browser notification when a new job is created
|
|
||||||
*/
|
|
||||||
private void showBrowserNotification(UI ui, Job job) {
|
|
||||||
String jobNumber = job.getJobNumber() != null ? job.getJobNumber() : "Neuer Auftrag";
|
|
||||||
String company = extractCompanyName(job.getCustomerSelection());
|
|
||||||
String message = company != null && !company.isBlank() ? ("Kunde: " + company) : "Neuer Auftrag erstellt";
|
|
||||||
|
|
||||||
ui.getPage().executeJs(
|
|
||||||
"if (!('Notification' in window)) {" + " console.log('Browser does not support notifications');"
|
|
||||||
+ "} else if (Notification.permission === 'granted') {" + " new Notification($0, { body: $1 });"
|
|
||||||
+ "} else if (Notification.permission !== 'denied') {" + " Notification.requestPermission().then(permission => {"
|
|
||||||
+ " if (permission === 'granted') {" + " new Notification($0, { body: $1 });" + " }" + " });" + "}",
|
|
||||||
jobNumber, message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,7 +70,6 @@ public class ClientConnectionService {
|
|||||||
*/
|
*/
|
||||||
public void registerClient(String clientId, String userId) {
|
public void registerClient(String clientId, String userId) {
|
||||||
if (clientId == null || clientId.isBlank()) {
|
if (clientId == null || clientId.isBlank()) {
|
||||||
log.warn("Cannot register client with null or blank clientId");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,12 +79,10 @@ public class ClientConnectionService {
|
|||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
ClientState state = new ClientState(clientId, userId, true, null, now, now);
|
ClientState state = new ClientState(clientId, userId, true, null, now, now);
|
||||||
connectedClients.put(clientId, state);
|
connectedClients.put(clientId, state);
|
||||||
log.info("[ClientConnectionService] Client registered: clientId={}, userId={}, totalClients={}", clientId,
|
log.info("[CLIENT] Connected: {}", clientId);
|
||||||
userId, connectedClients.size());
|
|
||||||
|
|
||||||
// If client was previously disconnected, retry pending messages
|
// If client was previously disconnected, retry pending messages
|
||||||
if (wasDisconnected) {
|
if (wasDisconnected) {
|
||||||
log.info("Client {} re-registered after disconnect - triggering pending message retry", clientId);
|
|
||||||
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
|
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -99,7 +96,7 @@ public class ClientConnectionService {
|
|||||||
public void unregisterClient(String clientId) {
|
public void unregisterClient(String clientId) {
|
||||||
ClientState removed = connectedClients.remove(clientId);
|
ClientState removed = connectedClients.remove(clientId);
|
||||||
if (removed != null) {
|
if (removed != null) {
|
||||||
log.info("Client unregistered: clientId={}, userId={}", clientId, removed.userId());
|
log.info("[CLIENT] Disconnected: {}", clientId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,7 +109,6 @@ public class ClientConnectionService {
|
|||||||
*/
|
*/
|
||||||
public void handlePong(String id) {
|
public void handlePong(String id) {
|
||||||
if (id == null || id.isBlank()) {
|
if (id == null || id.isBlank()) {
|
||||||
log.warn("Received pong from null or blank id");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,17 +128,15 @@ public class ClientConnectionService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (state != null) {
|
if (state != null) {
|
||||||
|
log.info("[PONG] Received from {}", clientId);
|
||||||
boolean wasDisconnected = !state.connected();
|
boolean wasDisconnected = !state.connected();
|
||||||
ClientState updatedState = state.withPongReceived(Instant.now());
|
ClientState updatedState = state.withPongReceived(Instant.now());
|
||||||
connectedClients.put(clientId, updatedState);
|
connectedClients.put(clientId, updatedState);
|
||||||
|
|
||||||
// If client was disconnected and is now reconnected, retry pending messages
|
// If client was disconnected and is now reconnected, retry pending messages
|
||||||
if (wasDisconnected) {
|
if (wasDisconnected) {
|
||||||
log.info("Client {} reconnected via pong - triggering pending message retry", clientId);
|
|
||||||
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
|
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.warn("Received pong from unknown id: {}", id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,21 +188,10 @@ public class ClientConnectionService {
|
|||||||
*/
|
*/
|
||||||
@Scheduled(fixedRateString = "${app.client.ping.interval-seconds:15}000")
|
@Scheduled(fixedRateString = "${app.client.ping.interval-seconds:15}000")
|
||||||
public void sendPingsToAllClients() {
|
public void sendPingsToAllClients() {
|
||||||
log.info("[ClientConnectionService] Ping cycle started - pluginConnected={}, connectedClients={}",
|
if (!pluginManager.isConnected() || getConnectedClientCount() == 0) {
|
||||||
pluginManager.isConnected(), getConnectedClientCount());
|
|
||||||
|
|
||||||
if (!pluginManager.isConnected()) {
|
|
||||||
log.info("[ClientConnectionService] Plugin not connected, skipping ping cycle");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int connectedCount = getConnectedClientCount();
|
|
||||||
if (connectedCount == 0) {
|
|
||||||
log.info("[ClientConnectionService] No connected clients, skipping ping cycle");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("[ClientConnectionService] Starting ping cycle for {} clients", connectedCount);
|
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
|
|
||||||
for (Map.Entry<String, ClientState> entry : connectedClients.entrySet()) {
|
for (Map.Entry<String, ClientState> entry : connectedClients.entrySet()) {
|
||||||
@@ -225,8 +208,7 @@ public class ClientConnectionService {
|
|||||||
// Client did not respond in time - mark as disconnected
|
// Client did not respond in time - mark as disconnected
|
||||||
ClientState disconnectedState = state.withConnected(false);
|
ClientState disconnectedState = state.withConnected(false);
|
||||||
connectedClients.put(clientId, disconnectedState);
|
connectedClients.put(clientId, disconnectedState);
|
||||||
log.warn("Client timed out, marking as disconnected: clientId={}, userId={}", clientId,
|
log.info("[CLIENT] Timeout: {}", clientId);
|
||||||
state.userId());
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -255,10 +237,11 @@ public class ClientConnectionService {
|
|||||||
|
|
||||||
SendOptions options = SendOptions.builder().qos(1).retained(false).build();
|
SendOptions options = SendOptions.builder().qos(1).retained(false).build();
|
||||||
|
|
||||||
|
log.info("[PING] Sent to {}", userId);
|
||||||
pluginManager.sendToClient(userId, "ping", payload, options);
|
pluginManager.sendToClient(userId, "ping", payload, options);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error sending ping to user {}: {}", userId, e.getMessage());
|
log.error("[PING] Error sending to {}: {}", userId, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,71 +0,0 @@
|
|||||||
package de.assecutor.votianlt.service;
|
|
||||||
|
|
||||||
import com.vaadin.flow.shared.Registration;
|
|
||||||
import de.assecutor.votianlt.event.JobCreatedEvent;
|
|
||||||
import de.assecutor.votianlt.model.Job;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.context.event.EventListener;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.util.LinkedHashSet;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Broadcaster service that manages listeners for newly created jobs and
|
|
||||||
* notifies UI components in a thread-safe manner
|
|
||||||
*/
|
|
||||||
@Service
|
|
||||||
@Slf4j
|
|
||||||
public class JobBroadcaster {
|
|
||||||
|
|
||||||
private final Executor executor = Executors.newSingleThreadExecutor();
|
|
||||||
private final LinkedHashSet<Consumer<Job>> listeners = new LinkedHashSet<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register a listener for newly created jobs
|
|
||||||
*
|
|
||||||
* @param listener
|
|
||||||
* Consumer that will be called when a new job is created
|
|
||||||
* @return Registration object that can be used to unregister the listener
|
|
||||||
*/
|
|
||||||
public synchronized Registration register(Consumer<Job> listener) {
|
|
||||||
listeners.add(listener);
|
|
||||||
log.debug("Registered job listener. Total listeners: {}", listeners.size());
|
|
||||||
|
|
||||||
return () -> {
|
|
||||||
synchronized (JobBroadcaster.this) {
|
|
||||||
listeners.remove(listener);
|
|
||||||
log.debug("Unregistered job listener. Total listeners: {}", listeners.size());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Broadcast a job creation to all registered listeners This is called
|
|
||||||
* asynchronously to avoid blocking the job creation
|
|
||||||
*/
|
|
||||||
private synchronized void broadcast(Job job) {
|
|
||||||
log.debug("Broadcasting job creation to {} listeners", listeners.size());
|
|
||||||
for (Consumer<Job> listener : listeners) {
|
|
||||||
executor.execute(() -> {
|
|
||||||
try {
|
|
||||||
listener.accept(job);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Error broadcasting job to listener", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Spring event listener that gets called when a JobCreatedEvent is published
|
|
||||||
*/
|
|
||||||
@EventListener
|
|
||||||
public void onJobCreated(JobCreatedEvent event) {
|
|
||||||
Job job = event.getJob();
|
|
||||||
log.info("JobBroadcaster received event for job {}", job.getJobNumber());
|
|
||||||
broadcast(job);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -41,7 +41,6 @@ public class MessageService {
|
|||||||
* Save a message to the database
|
* Save a message to the database
|
||||||
*/
|
*/
|
||||||
public Message saveMessage(Message message) {
|
public Message saveMessage(Message message) {
|
||||||
log.info("Saving message with origin {} for receiver {}", message.getOrigin(), message.getReceiver());
|
|
||||||
return messageRepository.save(message);
|
return messageRepository.save(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -101,20 +100,13 @@ public class MessageService {
|
|||||||
MessageContentType contentType = payload.contentType();
|
MessageContentType contentType = payload.contentType();
|
||||||
if (payload.hasJobContext()) {
|
if (payload.hasJobContext()) {
|
||||||
JobContext context = resolveJobContext(payload.jobId(), payload.jobNumber());
|
JobContext context = resolveJobContext(payload.jobId(), payload.jobNumber());
|
||||||
// receiver = AppUser ID (clientId)
|
|
||||||
message = new Message(payload.content(), payload.receiver(), MessageOrigin.CLIENT, contentType,
|
message = new Message(payload.content(), payload.receiver(), MessageOrigin.CLIENT, contentType,
|
||||||
context.jobId(), context.jobNumber());
|
context.jobId(), context.jobNumber());
|
||||||
} else {
|
} else {
|
||||||
// receiver = AppUser ID (clientId)
|
|
||||||
message = new Message(payload.content(), payload.receiver(), MessageOrigin.CLIENT, contentType);
|
message = new Message(payload.content(), payload.receiver(), MessageOrigin.CLIENT, contentType);
|
||||||
}
|
}
|
||||||
message = saveMessage(message);
|
message = saveMessage(message);
|
||||||
|
|
||||||
// Publish event to notify UI components about the new message
|
|
||||||
log.info("Publishing MessageReceivedEvent for message with origin {} for receiver {}", message.getOrigin(),
|
|
||||||
message.getReceiver());
|
|
||||||
eventPublisher.publishEvent(new MessageReceivedEvent(this, message));
|
eventPublisher.publishEvent(new MessageReceivedEvent(this, message));
|
||||||
|
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,9 +118,8 @@ public class MessageService {
|
|||||||
String topic = "/client/" + receiver + "/message";
|
String topic = "/client/" + receiver + "/message";
|
||||||
ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message);
|
ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message);
|
||||||
mqttPublisher.publishAsJson(topic, payload, false);
|
mqttPublisher.publishAsJson(topic, payload, false);
|
||||||
log.info("Published message to MQTT topic: {}", topic);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error publishing message to MQTT: {}", e.getMessage(), e);
|
log.error("[MQTT] Error publishing message: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -225,9 +216,6 @@ public class MessageService {
|
|||||||
Message message = messageOpt.get();
|
Message message = messageOpt.get();
|
||||||
message.markAsRead();
|
message.markAsRead();
|
||||||
messageRepository.save(message);
|
messageRepository.save(message);
|
||||||
log.info("Marked message {} as read", messageId);
|
|
||||||
} else {
|
|
||||||
log.warn("Message {} not found", messageId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -238,13 +226,6 @@ public class MessageService {
|
|||||||
return messageRepository.countByReceiverAndIsReadFalse(receiver);
|
return messageRepository.countByReceiverAndIsReadFalse(receiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Count all messages in the system
|
|
||||||
*/
|
|
||||||
public int countAllMessages() {
|
|
||||||
return (int) messageRepository.count();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a message by ID
|
* Get a message by ID
|
||||||
*/
|
*/
|
||||||
@@ -257,7 +238,6 @@ public class MessageService {
|
|||||||
*/
|
*/
|
||||||
public void deleteMessage(ObjectId messageId) {
|
public void deleteMessage(ObjectId messageId) {
|
||||||
messageRepository.deleteById(messageId);
|
messageRepository.deleteById(messageId);
|
||||||
log.info("Deleted message {}", messageId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<Job> findJobByIdentifier(String identifier) {
|
public Optional<Job> findJobByIdentifier(String identifier) {
|
||||||
|
|||||||
Reference in New Issue
Block a user