115 lines
3.3 KiB
Dart
115 lines
3.3 KiB
Dart
import 'package:flutter/foundation.dart';
|
|
|
|
import '../models/message_envelope.dart';
|
|
|
|
/// Result of unwrapping a message envelope
|
|
class UnwrapResult {
|
|
/// The unwrapped payload
|
|
final dynamic payload;
|
|
|
|
/// The message ID (null if not an envelope)
|
|
final String? messageId;
|
|
|
|
/// Whether this message requires acknowledgment
|
|
final bool requiresAck;
|
|
|
|
UnwrapResult({
|
|
required this.payload,
|
|
this.messageId,
|
|
this.requiresAck = false,
|
|
});
|
|
}
|
|
|
|
/// Handles message envelope unwrapping and deduplication.
|
|
///
|
|
/// This class is extracted from WebSocketService for testability.
|
|
/// It manages:
|
|
/// - Detecting and unwrapping MessageEnvelope structures
|
|
/// - Deduplicating messages by messageId
|
|
/// - Triggering ACK callbacks when required
|
|
class MessageHandler {
|
|
final Set<String> _processedMessageIds = {};
|
|
|
|
/// Maximum number of message IDs to track for deduplication
|
|
final int maxProcessedIds;
|
|
|
|
/// Callback invoked when an ACK should be sent
|
|
final void Function(String messageId)? onAckRequired;
|
|
|
|
MessageHandler({
|
|
this.maxProcessedIds = 100,
|
|
this.onAckRequired,
|
|
});
|
|
|
|
/// Check if data is a valid MessageEnvelope structure.
|
|
///
|
|
/// A valid envelope must contain:
|
|
/// - messageId
|
|
/// - timestamp
|
|
/// - topic
|
|
/// - payload
|
|
bool isEnvelopeMessage(dynamic data) {
|
|
if (data is! Map<String, dynamic>) return false;
|
|
return data.containsKey('messageId') &&
|
|
data.containsKey('timestamp') &&
|
|
data.containsKey('topic') &&
|
|
data.containsKey('payload');
|
|
}
|
|
|
|
/// Unwrap a message envelope and handle deduplication.
|
|
///
|
|
/// Returns null if the message was already processed (duplicate).
|
|
/// For duplicates, still triggers onAckRequired if the original required ACK.
|
|
///
|
|
/// Returns [UnwrapResult] with payload and ACK info for new messages.
|
|
/// If data is not an envelope, returns it as-is with requiresAck=false.
|
|
UnwrapResult? unwrapEnvelope(dynamic data) {
|
|
if (!isEnvelopeMessage(data)) {
|
|
// Not an envelope, return data as-is (no ACK needed)
|
|
return UnwrapResult(
|
|
payload: data,
|
|
messageId: null,
|
|
requiresAck: false,
|
|
);
|
|
}
|
|
|
|
final envelope = MessageEnvelope.fromJson(data as Map<String, dynamic>);
|
|
|
|
// Check for duplicate
|
|
if (_processedMessageIds.contains(envelope.messageId)) {
|
|
// Still send ACK for duplicate messages
|
|
if (envelope.requiresAck && onAckRequired != null) {
|
|
onAckRequired!(envelope.messageId);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
// Track this message as processed
|
|
_processedMessageIds.add(envelope.messageId);
|
|
|
|
// Limit set size to prevent memory growth (FIFO eviction)
|
|
if (_processedMessageIds.length > maxProcessedIds) {
|
|
_processedMessageIds.remove(_processedMessageIds.first);
|
|
}
|
|
|
|
return UnwrapResult(
|
|
payload: envelope.payload,
|
|
messageId: envelope.messageId,
|
|
requiresAck: envelope.requiresAck,
|
|
);
|
|
}
|
|
|
|
/// Check if a message ID was already processed
|
|
bool wasProcessed(String messageId) =>
|
|
_processedMessageIds.contains(messageId);
|
|
|
|
/// Get the count of tracked message IDs
|
|
int get processedCount => _processedMessageIds.length;
|
|
|
|
/// Clear all processed message IDs.
|
|
///
|
|
/// Primarily for testing purposes.
|
|
@visibleForTesting
|
|
void clearProcessedIds() => _processedMessageIds.clear();
|
|
}
|