Upload task manager
The UploadTaskManager provides a comprehensive solution for managing file uploads to Firebase Storage, integrating:
- Stream-based
UploadStateNotifier: Real-time state tracking for all uploads RetryFuture: Network-aware retry logic with exponential backoff- Firebase Storage Tasks: Native upload/download task management
┌─────────────────────────────────────────────────────────────┐
│ UploadTaskManager │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ UploadStateNotifier │◄─────┤ TrackedUploadTask │ │
│ │ (Stream-based) │ │ • Firebase Task │ │
│ └──────────┬───────────┘ │ • RetryFuture │ │
│ │ │ • Progress Monitor │ │
│ │ └──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ Stream Events │ │
│ │ • Task Added │ │
│ │ • Task Progress │ │
│ │ • Task Complete │ │
│ │ • Task Failed │ │
│ └───────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Use for: Background uploads, batch operations, non-critical uploads
// Initialize manager
final manager = UploadTaskManager(
storage: FirebaseStorage.instance,
randomizer: DefaultRandomizer(),
uid: currentUserId,
container: ProviderScope.containerOf(context), // Optional
);
// Start upload (returns immediately)
final taskId = manager.uploadSync(
file,
location: 'user_photos/',
onProgress: (snapshot) {
print('Progress: ${snapshot.bytesTransferred}/${snapshot.totalBytes}');
},
);
print('Task $taskId queued, continuing...');
// Execution continues immediately
Benefits:
- Non-blocking execution
- Perfect for UI responsiveness
- Batch operations
- Background sync
Use for: Critical uploads, user-initiated actions, when you need confirmation
try {
// Await upload completion
final result = await manager.uploadAsync(
file,
location: 'documents/',
onProgress: (snapshot) {
final percent = snapshot.bytesTransferred / snapshot.totalBytes * 100;
print('Uploading: ${percent.toStringAsFixed(1)}%');
},
);
if (result.isSuccess) {
print('Upload complete: ${result.destination.name}');
showSuccessDialog();
} else {
print('Upload failed: ${result.error}');
showErrorDialog(result.error);
}
} on UploadException catch (e) {
print('Upload exception: ${e.message}');
handleUploadError(e);
}
Benefits:
- Guaranteed result
- Error handling
- Sequential operations
- User feedback
Monitor all uploads in real-time across your app:
class UploadProgressWidget extends ConsumerWidget {
@override
Widget build(BuildContext context, WidgetRef ref) {
return StreamBuilder(
stream: uploadManager.notifier.stream,
builder: (context, snapshot) {
if (!snapshot.hasData) return CircularProgressIndicator();
final state = snapshot.data!;
return Column(
children: [
Text('In Progress: ${state.inProgressCount}'),
Text('Completed: ${state.completedCount}'),
Text('Failed: ${state.failedCount}'),
if (state.hasErrors)
ErrorBanner(errorCount: state.errorCount),
LinearProgressIndicator(
value: state.completedCount / state.totalCount,
),
],
);
},
);
}
}
Future<void> uploadUserContent({
required XFile profilePhoto,
required List<XFile> galleryPhotos,
}) async {
// Critical upload: profile photo (await)
final profileResult = await manager.uploadAsync(
profilePhoto,
location: 'profile_photos/',
);
if (!profileResult.isSuccess) {
throw Exception('Profile photo upload failed');
}
// Update UI with profile photo
await updateUserProfile(profileResult.destination.name);
// Background uploads: gallery photos (fire-and-forget)
for (final photo in galleryPhotos) {
manager.uploadSync(photo, location: 'gallery/');
}
// Navigate away - gallery uploads continue in background
Navigator.pop(context);
}
// Cancel individual task
await manager.cancelTask(taskId);
// Cancel all active tasks
await manager.cancelAll();
// When user signs out
await manager.cancelAll();
await manager.setUser(null);
// When new user signs in
await manager.setUser(newUserId);
// Previous user's uploads are cancelled, state is reset
// Global error monitoring
manager.notifier.stream.listen((state) {
if (state.hasErrors) {
// Show notification
showNotification(
'Upload Issues',
'${state.errorCount} uploads failed. Tap to retry.',
);
}
});
// Clear failed uploads
manager.notifier.clearErrors();
class UploadProgressService {
final UploadTaskManager manager;
Stream<double> get overallProgress {
return manager.notifier.stream.map((state) {
if (state.totalCount == 0) return 0.0;
return state.completedCount / state.totalCount;
});
}
Stream<bool> get hasActiveUploads {
return manager.notifier.stream.map((state) {
return state.inProgressCount > 0;
});
}
}
The manager automatically handles:
- Network failures: Waits for network to return
- Transient errors: Retries with exponential backoff
- Timeout handling: Configurable retry attempts
- Resource cleanup: Proper disposal of retry futures
// RetryFuture is created internally for each task
final retryFuture = RetryFuture<UploadResult>(
container: container,
operation: () async {
final snapshot = await uploadTask;
return _snapshotToResult(snapshot, destination);
},
onNotify: (error, attempt) {
LOG.w('Upload retry $attempt: $error');
},
);
class UploadService {
late final UploadTaskManager _manager;
UploadService({required ProviderContainer container}) {
_manager = UploadTaskManager(
storage: FirebaseStorage.instance,
randomizer: DefaultRandomizer(),
container: container,
);
}
Future<void> dispose() async {
await _manager.dispose();
}
}
// Save upload queue state to restore after app restart
final state = manager.notifier.state;
await saveUploadState({
'waiting': state.waitingCount,
'inProgress': state.inProgressCount,
'completed': state.completedCount,
'failed': state.failedCount,
});
// Limit concurrent uploads
if (manager.notifier.inProgressCount >= 3) {
showDialog('Please wait for current uploads to complete');
return;
}
// Check if queue is full
if (manager.notifier.isFull) {
showDialog('Upload queue is full. Please try again later.');
return;
}
// Retry failed uploads
manager.notifier.stream.listen((state) async {
if (state.hasErrors) {
// Wait a bit before retrying
await Future.delayed(Duration(minutes: 1));
// Clear errors (removes failed tasks)
manager.notifier.clearErrors();
// Re-queue failed uploads (if you stored them)
for (final failedUpload in savedFailedUploads) {
manager.uploadSync(failedUpload.file, location: failedUpload.location);
}
}
});
void main() {
group('UploadTaskManager', () {
late UploadTaskManager manager;
setUp(() {
manager = UploadTaskManager(
storage: MockFirebaseStorage(),
randomizer: DefaultRandomizer(),
uid: 'test_user',
);
});
tearDown(() async {
await manager.dispose();
});
test('synchronous upload returns task ID immediately', () {
final file = XFile('/test/image.jpg');
final taskId = manager.uploadSync(file, location: 'test/');
expect(taskId, isNotEmpty);
expect(manager.notifier.totalCount, greaterThan(0));
});
test('asynchronous upload awaits result', () async {
final file = XFile('/test/document.pdf');
final result = await manager.uploadAsync(file, location: 'docs/');
expect(result, isNotNull);
expect(result.status, isIn([
UploadResultStatus.success,
UploadResultStatus.failed,
]));
});
test('stream notifies listeners of state changes', () async {
final events = <int>[];
final subscription = manager.notifier.stream.listen((state) {
events.add(state.totalCount);
});
manager.uploadSync(XFile('/test/file1.jpg'), location: 'test/');
await Future.delayed(Duration(milliseconds: 50));
manager.uploadSync(XFile('/test/file2.jpg'), location: 'test/');
await Future.delayed(Duration(milliseconds: 50));
expect(events, isNotEmpty);
await subscription.cancel();
});
});
}
Future<void> uploadGallery(List<XFile> photos) async {
for (final photo in photos) {
manager.uploadSync(photo, location: 'gallery/${DateTime.now().year}/');
}
// Monitor completion
final subscription = manager.notifier.stream.listen((state) {
updateUI('Uploaded ${state.completedCount} of ${photos.length}');
});
// Cleanup when done
manager.notifier.stream
.where((state) => state.completedCount == photos.length)
.first
.then((_) => subscription.cancel());
}
Future<String> uploadDocument(XFile document) async {
// Validate first
if (!isValidDocument(document)) {
throw ValidationException('Invalid document format');
}
// Upload and await
final result = await manager.uploadAsync(
document,
location: 'documents/${getCurrentYear()}/',
);
if (!result.isSuccess) {
throw UploadException.fromCode(
UploadExceptionCode.failed,
debugMsg: 'Document upload failed: ${result.error}',
);
}
// Return download URL or reference
return result.destination.name;
}
- Queue Size: Default is
kMaxUploadQueueSize(configurable) - Concurrent Uploads: Firebase typically allows 3-5 concurrent connections
- Memory: Each
TrackedUploadTaskholds file reference and progress state - Network:
RetryFutureautomatically handles network state changes - Cleanup: Always call
dispose()when done to prevent memory leaks
// Check manager state
print('Is ready: ${manager.isReady}');
print('Is authenticated: ${manager.isAuthenticated}');
print('Is disposed: ${manager.isDisposed}');
// Ensure proper disposal
@override
void dispose() {
uploadManager.dispose(); // Always call this
super.dispose();
}
// Verify stream subscription
final subscription = manager.notifier.stream.listen(
(state) => print('State: $state'),
onError: (error) => print('Stream error: $error'),
onDone: () => print('Stream closed'),
);
Author: GitHub Copilot
Date: October 5, 2025
Version: 1.0.0