Commit 8621cc6f by tanghuan

增加支持取消上传

1 parent 52ea7621
...@@ -29,6 +29,7 @@ import 'package:appframe/data/repositories/message/share_handler.dart'; ...@@ -29,6 +29,7 @@ import 'package:appframe/data/repositories/message/share_handler.dart';
import 'package:appframe/data/repositories/message/share_to_wx_handler.dart'; import 'package:appframe/data/repositories/message/share_to_wx_handler.dart';
import 'package:appframe/data/repositories/message/storage_handler.dart'; import 'package:appframe/data/repositories/message/storage_handler.dart';
import 'package:appframe/data/repositories/message/title_bar_handler.dart'; import 'package:appframe/data/repositories/message/title_bar_handler.dart';
import 'package:appframe/data/repositories/message/upload_cancel_handler.dart';
import 'package:appframe/data/repositories/message/upload_file.dart'; import 'package:appframe/data/repositories/message/upload_file.dart';
import 'package:appframe/data/repositories/message/upload_start_handler.dart'; import 'package:appframe/data/repositories/message/upload_start_handler.dart';
import 'package:appframe/data/repositories/message/vibrate_short_handler.dart'; import 'package:appframe/data/repositories/message/vibrate_short_handler.dart';
...@@ -180,7 +181,8 @@ Future<void> setupLocator() async { ...@@ -180,7 +181,8 @@ Future<void> setupLocator() async {
/// 上传文件 /// 上传文件
getIt.registerLazySingleton<MessageHandler>(() => UploadFileHandler(), instanceName: 'uploadFile'); getIt.registerLazySingleton<MessageHandler>(() => UploadFileHandler(), instanceName: 'uploadFile');
getIt.registerLazySingleton<MessageHandler>(() => UploadStartHandler(), instanceName: 'uploadStart'); getIt.registerFactory<MessageHandler>(() => UploadStartHandler(), instanceName: 'uploadStart');
getIt.registerLazySingleton<MessageHandler>(() => UploadCancelHandler(), instanceName: 'uploadCancel');
/// 下载文件 /// 下载文件
getIt.registerLazySingleton<MessageHandler>(() => DownloadFileHandler(), instanceName: 'downloadFile'); getIt.registerLazySingleton<MessageHandler>(() => DownloadFileHandler(), instanceName: 'downloadFile');
......
import 'package:appframe/data/repositories/message/upload_start_handler.dart';
import 'package:appframe/services/dispatcher.dart';
import 'package:flutter/cupertino.dart';
class UploadCancelHandler extends MessageHandler {
String? _targetUnique;
Map<String, UploadStartHandler>? _activeUploads;
void setTargetUnique(String? targetUnique) {
_targetUnique = targetUnique;
}
void setActiveUploads(Map<String, UploadStartHandler> activeUploads) {
_activeUploads = activeUploads;
}
@override
Future<dynamic> handleMessage(params) async {
try {
final uploads = _activeUploads;
if (uploads == null || uploads.isEmpty) {
return true;
}
if (_targetUnique != null && uploads.containsKey(_targetUnique)) {
uploads[_targetUnique]?.cancel();
} else {
// H5 未传 unique 时,取消所有活跃上传
for (var handler in uploads.values) {
handler.cancel();
}
}
} catch (e) {
debugPrint(e.toString());
} finally {
_targetUnique = null;
_activeUploads = null;
}
return true;
}
}
...@@ -12,6 +12,7 @@ import 'package:appframe/utils/file_type_util.dart'; ...@@ -12,6 +12,7 @@ import 'package:appframe/utils/file_type_util.dart';
import 'package:appframe/utils/image_util.dart'; import 'package:appframe/utils/image_util.dart';
import 'package:appframe/utils/video_util.dart'; import 'package:appframe/utils/video_util.dart';
import 'package:dio/dio.dart'; import 'package:dio/dio.dart';
import 'package:ffmpeg_kit_flutter_new/ffmpeg_kit.dart';
import 'package:flutter/foundation.dart'; import 'package:flutter/foundation.dart';
import 'package:path/path.dart' as path; import 'package:path/path.dart' as path;
import 'package:path_provider/path_provider.dart'; import 'package:path_provider/path_provider.dart';
...@@ -32,6 +33,12 @@ class UploadStartHandler extends MessageHandler { ...@@ -32,6 +33,12 @@ class UploadStartHandler extends MessageHandler {
late int _cmdTotalByte; late int _cmdTotalByte;
late int _cmdSentByte; late int _cmdSentByte;
/// 上传用的 Dio 实例,取消时可直接关闭
Dio? _uploadDio;
/// 是否已取消
bool _isCancelled = false;
@override @override
void setCubit(WebCubit cubit) { void setCubit(WebCubit cubit) {
_webCubit = cubit; _webCubit = cubit;
...@@ -46,8 +53,17 @@ class UploadStartHandler extends MessageHandler { ...@@ -46,8 +53,17 @@ class UploadStartHandler extends MessageHandler {
_webCubit = null; _webCubit = null;
} }
/// 取消当前上传任务
void cancel() {
_isCancelled = true;
_uploadDio?.close(force: true);
// FFmpegKit.cancel();
debugPrint('====================>上传任务已取消');
}
@override @override
Future<dynamic> handleMessage(params) async { Future<dynamic> handleMessage(params) async {
_isCancelled = false;
try { try {
if (params is! Map<String, dynamic>) { if (params is! Map<String, dynamic>) {
throw ParamErrorException(); throw ParamErrorException();
...@@ -72,6 +88,7 @@ class UploadStartHandler extends MessageHandler { ...@@ -72,6 +88,7 @@ class UploadStartHandler extends MessageHandler {
_cmdUploadId = const Uuid().v4(); _cmdUploadId = const Uuid().v4();
final startTime = DateTime.now(); final startTime = DateTime.now();
debugPrint('====================> 上传文件路径 $_cmdUnique $tempFilePath');
final result = await _handle(tempFilePath, busi, subBusi); final result = await _handle(tempFilePath, busi, subBusi);
final endTime = DateTime.now(); final endTime = DateTime.now();
debugPrint('====================>上传耗时:${endTime.millisecondsSinceEpoch - startTime.millisecondsSinceEpoch} 毫秒'); debugPrint('====================>上传耗时:${endTime.millisecondsSinceEpoch - startTime.millisecondsSinceEpoch} 毫秒');
...@@ -94,11 +111,14 @@ class UploadStartHandler extends MessageHandler { ...@@ -94,11 +111,14 @@ class UploadStartHandler extends MessageHandler {
// 确保发送了 uploadEnd 指令 // 确保发送了 uploadEnd 指令
_webCubit?.sendUploadEnd(_cmdUnique, _cmdUploadId, '', errMsg: '网络错误,上传失败'); _webCubit?.sendUploadEnd(_cmdUnique, _cmdUploadId, '', errMsg: '网络错误,上传失败');
} catch (e) { } catch (e) {
debugPrint('====================>上传失败:$e'); // Dio close 产生的 DioException 也会走到这里
// 确保发送了 uploadEnd 指令 final msg = (e is DioException && e.type == DioExceptionType.cancel) ? '上传已取消' : e.toString();
_webCubit?.sendUploadEnd(_cmdUnique, _cmdUploadId, '', errMsg: e.toString()); debugPrint('====================>上传失败:$msg');
_webCubit?.sendUploadEnd(_cmdUnique, _cmdUploadId, '', errMsg: msg);
} finally { } finally {
_unfollowCubit(); _unfollowCubit();
_uploadDio?.close(force: true);
_uploadDio = null;
} }
} }
...@@ -143,7 +163,7 @@ class UploadStartHandler extends MessageHandler { ...@@ -143,7 +163,7 @@ class UploadStartHandler extends MessageHandler {
outputPath, outputPath,
onProgress: (progress) { onProgress: (progress) {
// progress 范围 0 ~ 100 // progress 范围 0 ~ 100
debugPrint('转码进度: $progress%'); debugPrint('转码进度: $_cmdUnique $progress%');
/// 发送转码进度 /// 发送转码进度
_webCubit?.sendUploadProgress(_cmdUnique, _cmdUploadId, 1, progress, 0, 0, 0, 0); _webCubit?.sendUploadProgress(_cmdUnique, _cmdUploadId, 1, progress, 0, 0, 0, 0);
...@@ -156,7 +176,7 @@ class UploadStartHandler extends MessageHandler { ...@@ -156,7 +176,7 @@ class UploadStartHandler extends MessageHandler {
'low', 'low',
onProgress: (progress) { onProgress: (progress) {
// progress 范围 0 ~ 100 // progress 范围 0 ~ 100
debugPrint('压缩进度: $progress%'); debugPrint('压缩进度: $_cmdUnique $progress%');
/// 发送压缩进度 /// 发送压缩进度
_webCubit?.sendUploadProgress(_cmdUnique, _cmdUploadId, 1, progress, 0, 0, 0, 0); _webCubit?.sendUploadProgress(_cmdUnique, _cmdUploadId, 1, progress, 0, 0, 0, 0);
...@@ -189,6 +209,11 @@ class UploadStartHandler extends MessageHandler { ...@@ -189,6 +209,11 @@ class UploadStartHandler extends MessageHandler {
} }
} }
// 压缩完成后检查是否已取消
if (_isCancelled) {
throw Exception('上传已取消');
}
// 限制压缩后仍然大于300M的文件上传 // 限制压缩后仍然大于300M的文件上传
if (fileSize > 1024 * 1024 * 300) { if (fileSize > 1024 * 1024 * 300) {
throw Exception('上传的文件过大'); throw Exception('上传的文件过大');
...@@ -251,7 +276,11 @@ class UploadStartHandler extends MessageHandler { ...@@ -251,7 +276,11 @@ class UploadStartHandler extends MessageHandler {
/// ///
/// 6 上传(带进度反馈) /// 6 上传(带进度反馈)
/// ///
final dio = Dio() if (_isCancelled) {
throw Exception('上传已取消');
}
// 将 dio 存为实例变量,以便 cancel() 可以直接关闭
_uploadDio = Dio()
..options = BaseOptions( ..options = BaseOptions(
baseUrl: '', baseUrl: '',
connectTimeout: Duration(milliseconds: 30000), connectTimeout: Duration(milliseconds: 30000),
...@@ -266,6 +295,10 @@ class UploadStartHandler extends MessageHandler { ...@@ -266,6 +295,10 @@ class UploadStartHandler extends MessageHandler {
final uploadTaskFactories = <Future<Map<String, dynamic>> Function()>[]; final uploadTaskFactories = <Future<Map<String, dynamic>> Function()>[];
for (int i = 0; i < totalChunks; i++) { for (int i = 0; i < totalChunks; i++) {
if (_isCancelled) {
throw Exception('上传已取消');
}
// 获取签名的上传地址 URL // 获取签名的上传地址 URL
if (i == 0) { if (i == 0) {
final initResult = await _init(bxeApiService, objectKey, bucket); final initResult = await _init(bxeApiService, objectKey, bucket);
...@@ -286,13 +319,16 @@ class UploadStartHandler extends MessageHandler { ...@@ -286,13 +319,16 @@ class UploadStartHandler extends MessageHandler {
randomAccessFile.setPositionSync(start); randomAccessFile.setPositionSync(start);
await randomAccessFile.readInto(chunk, 0, actualChunkSize); await randomAccessFile.readInto(chunk, 0, actualChunkSize);
uploadTaskFactories.add(() => _uploadChunkWithProgress( uploadTaskFactories.add(() async {
dio, if (_isCancelled) {
throw Exception('上传已取消');
}
return _uploadChunkWithProgress(
_uploadDio!,
signUrls[i], signUrls[i],
i, i,
chunk, chunk,
onChunkComplete: () { onChunkComplete: () {
_cmdUploadedChunks++; _cmdUploadedChunks++;
_cmdSentByte = _cmdSentByte + actualChunkSize; _cmdSentByte = _cmdSentByte + actualChunkSize;
...@@ -308,7 +344,12 @@ class UploadStartHandler extends MessageHandler { ...@@ -308,7 +344,12 @@ class UploadStartHandler extends MessageHandler {
_cmdSentByte, _cmdSentByte,
); );
}, },
)); );
});
}
if (_isCancelled) {
throw Exception('上传已取消');
} }
var resultList = await _runWithConcurrency(uploadTaskFactories); var resultList = await _runWithConcurrency(uploadTaskFactories);
...@@ -324,6 +365,9 @@ class UploadStartHandler extends MessageHandler { ...@@ -324,6 +365,9 @@ class UploadStartHandler extends MessageHandler {
/// ///
/// 7 合并 /// 7 合并
/// ///
if (_isCancelled) {
throw Exception('上传已取消');
}
var startTime2 = DateTime.now(); var startTime2 = DateTime.now();
String location = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap); String location = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap);
var endTime2 = DateTime.now(); var endTime2 = DateTime.now();
...@@ -333,10 +377,12 @@ class UploadStartHandler extends MessageHandler { ...@@ -333,10 +377,12 @@ class UploadStartHandler extends MessageHandler {
/// 8 针对视频生成封面 /// 8 针对视频生成封面
/// ///
if (mimeType?.startsWith('video/') ?? false) { if (mimeType?.startsWith('video/') ?? false) {
await _genHwVideoCover(dio, objectKey); if (_isCancelled) {
throw Exception('上传已取消');
}
await _genHwVideoCover(_uploadDio!, objectKey);
} }
dio.close(force: true);
bxeApiService.close(); bxeApiService.close();
return {'url': _addPreUrl(location)}; return {'url': _addPreUrl(location)};
...@@ -404,13 +450,16 @@ class UploadStartHandler extends MessageHandler { ...@@ -404,13 +450,16 @@ class UploadStartHandler extends MessageHandler {
int maxRetries = 3, int maxRetries = 3,
}) async { }) async {
for (int attempt = 0; attempt <= maxRetries; attempt++) { for (int attempt = 0; attempt <= maxRetries; attempt++) {
if (_isCancelled) {
throw ChunkUploadFailedException('上传已取消');
}
try { try {
var starTime = DateTime.now(); var starTime = DateTime.now();
final resp = await _uploadChunk(dio, signUrl, chunk, chunkIndex); final resp = await _uploadChunk(dio, signUrl, chunk, chunkIndex);
var endTime = DateTime.now(); var endTime = DateTime.now();
if (resp.statusCode == 200) { if (resp.statusCode == 200) {
debugPrint( debugPrint(
'====================> 分片$chunkIndex${attempt + 1}次, $endTime 上传耗时:${endTime.millisecondsSinceEpoch - starTime.millisecondsSinceEpoch} 毫秒'); '====================> $_cmdUnique 分片$chunkIndex${attempt + 1}次, $endTime 上传耗时:${endTime.millisecondsSinceEpoch - starTime.millisecondsSinceEpoch} 毫秒');
final etags = resp.headers['etag'] as List<String>; final etags = resp.headers['etag'] as List<String>;
// 分片上传成功,触发回调 // 分片上传成功,触发回调
...@@ -422,7 +471,10 @@ class UploadStartHandler extends MessageHandler { ...@@ -422,7 +471,10 @@ class UploadStartHandler extends MessageHandler {
throw ChunkUploadFailedException('Chunk upload failed'); throw ChunkUploadFailedException('Chunk upload failed');
} }
} catch (e) { } catch (e) {
debugPrint('====================> 分片$chunkIndex${attempt + 1}次, 上传失败:${e.toString()}'); if (_isCancelled) {
throw ChunkUploadFailedException('上传已取消');
}
debugPrint('====================> $_cmdUnique 分片$chunkIndex${attempt + 1}次, 上传失败:${e.toString()}');
if (attempt == maxRetries) { if (attempt == maxRetries) {
// throw Exception('Chunk $chunkIndex upload failed after $maxRetries attempts: $e'); // throw Exception('Chunk $chunkIndex upload failed after $maxRetries attempts: $e');
throw ChunkUploadFailedException('Chunk upload failed'); throw ChunkUploadFailedException('Chunk upload failed');
...@@ -605,6 +657,10 @@ class UploadStartHandler extends MessageHandler { ...@@ -605,6 +657,10 @@ class UploadStartHandler extends MessageHandler {
Future<void> worker() async { Future<void> worker() async {
while (!cancelled) { while (!cancelled) {
if (_isCancelled) {
cancelled = true;
break;
}
final i = currentIndex++; final i = currentIndex++;
if (i >= taskFactories.length) break; if (i >= taskFactories.length) break;
try { try {
......
...@@ -4,6 +4,7 @@ import 'package:appframe/bloc/web_cubit.dart'; ...@@ -4,6 +4,7 @@ import 'package:appframe/bloc/web_cubit.dart';
import 'package:appframe/config/locator.dart'; import 'package:appframe/config/locator.dart';
import 'package:appframe/data/models/message/h5_message.dart'; import 'package:appframe/data/models/message/h5_message.dart';
import 'package:appframe/data/models/message/h5_resp.dart'; import 'package:appframe/data/models/message/h5_resp.dart';
import 'package:appframe/data/repositories/message/upload_cancel_handler.dart';
import 'package:appframe/data/repositories/message/upload_start_handler.dart'; import 'package:appframe/data/repositories/message/upload_start_handler.dart';
// 消息处理器抽象类 // 消息处理器抽象类
...@@ -15,21 +16,22 @@ abstract class MessageHandler { ...@@ -15,21 +16,22 @@ abstract class MessageHandler {
void setMessage(String message) {} void setMessage(String message) {}
} }
// 消息分发器 /// 消息分发器
class MessageDispatcher { class MessageDispatcher {
final Map<String, MessageHandler> _handlers = {}; final Map<String, MessageHandler> _handlers = {};
final Map<String, UploadStartHandler> _activeUploads = {};
// 注册 /// 注册
void registerHandler(String command, MessageHandler handler) { void registerHandler(String command, MessageHandler handler) {
_handlers[command] = handler; _handlers[command] = handler;
} }
// 取消注册,暂时不会用到 /// 取消注册,暂时不会用到
void unregisterHandler(String command) { void unregisterHandler(String command) {
_handlers.remove(command); _handlers.remove(command);
} }
// 分发处理 /// 分发处理
Future<void> dispatch(String message, Function callback, {WebCubit? webCubit}) async { Future<void> dispatch(String message, Function callback, {WebCubit? webCubit}) async {
final Map<String, dynamic> data = json.decode(message); final Map<String, dynamic> data = json.decode(message);
H5Message h5Message = H5Message.fromJson(data); H5Message h5Message = H5Message.fromJson(data);
...@@ -38,7 +40,10 @@ class MessageDispatcher { ...@@ -38,7 +40,10 @@ class MessageDispatcher {
if (handler == null) { if (handler == null) {
try { try {
handler = getIt.get<MessageHandler>(instanceName: h5Message.cmd); handler = getIt.get<MessageHandler>(instanceName: h5Message.cmd);
// uploadStart 每次新建实例,不按 cmd 缓存
if (h5Message.cmd != 'uploadStart') {
registerHandler(h5Message.cmd, handler); registerHandler(h5Message.cmd, handler);
}
} catch (e) { } catch (e) {
// 处理 getIt 找不到实例的情况 // 处理 getIt 找不到实例的情况
H5Resp h5Resp = H5Resp(h5Message.unique, h5Message.cmd, {}, '没有对应Cmd'); H5Resp h5Resp = H5Resp(h5Message.unique, h5Message.cmd, {}, '没有对应Cmd');
...@@ -47,6 +52,7 @@ class MessageDispatcher { ...@@ -47,6 +52,7 @@ class MessageDispatcher {
} }
} }
String? uploadUnique;
try { try {
// 设置传递的cubit,进行业务操作 // 设置传递的cubit,进行业务操作
if (h5Message.cmd == 'scanCode' || if (h5Message.cmd == 'scanCode' ||
...@@ -63,9 +69,21 @@ class MessageDispatcher { ...@@ -63,9 +69,21 @@ class MessageDispatcher {
handler.setMessage(message); handler.setMessage(message);
} }
// 针对 uploadStart 指令 // 针对 uploadStart 指令:创建新实例并追踪
if (h5Message.cmd == "uploadStart" && handler is UploadStartHandler) { if (h5Message.cmd == "uploadStart" && handler is UploadStartHandler) {
handler.setCmdUnique(h5Message.unique); handler.setCmdUnique(h5Message.unique);
_activeUploads[h5Message.unique] = handler;
uploadUnique = h5Message.unique;
} else
// 针对 uploadCancel
if (h5Message.cmd == "uploadCancel" && handler is UploadCancelHandler) {
// 从 params 读取要取消的 unique,兼容 H5 不传的情况
String? targetUnique;
if (h5Message.params is Map<String, dynamic>) {
targetUnique = h5Message.params['unique'] as String?;
}
handler.setTargetUnique(targetUnique);
handler.setActiveUploads(_activeUploads);
} }
final result = await handler.handleMessage(h5Message.params); final result = await handler.handleMessage(h5Message.params);
...@@ -79,6 +97,10 @@ class MessageDispatcher { ...@@ -79,6 +97,10 @@ class MessageDispatcher {
// 执行异常 // 执行异常
H5Resp h5Resp = H5Resp(h5Message.unique, h5Message.cmd, {}, e.toString()); H5Resp h5Resp = H5Resp(h5Message.unique, h5Message.cmd, {}, e.toString());
callback(h5Resp.toJson()); callback(h5Resp.toJson());
} finally {
if (uploadUnique != null) {
_activeUploads.remove(uploadUnique);
}
} }
} }
} }
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!