Commit ce6abe0a by tanghuan

Merge branch 'feature-2604-cancel-upload' into feature-2604

2 parents 34bf5d1b 8621cc6f
......@@ -29,6 +29,7 @@ import 'package:appframe/data/repositories/message/share_handler.dart';
import 'package:appframe/data/repositories/message/share_to_wx_handler.dart';
import 'package:appframe/data/repositories/message/storage_handler.dart';
import 'package:appframe/data/repositories/message/title_bar_handler.dart';
import 'package:appframe/data/repositories/message/upload_cancel_handler.dart';
import 'package:appframe/data/repositories/message/upload_file.dart';
import 'package:appframe/data/repositories/message/upload_start_handler.dart';
import 'package:appframe/data/repositories/message/vibrate_short_handler.dart';
......@@ -180,7 +181,8 @@ Future<void> setupLocator() async {
/// 上传文件
getIt.registerLazySingleton<MessageHandler>(() => UploadFileHandler(), instanceName: 'uploadFile');
getIt.registerLazySingleton<MessageHandler>(() => UploadStartHandler(), instanceName: 'uploadStart');
getIt.registerFactory<MessageHandler>(() => UploadStartHandler(), instanceName: 'uploadStart');
getIt.registerLazySingleton<MessageHandler>(() => UploadCancelHandler(), instanceName: 'uploadCancel');
/// 下载文件
getIt.registerLazySingleton<MessageHandler>(() => DownloadFileHandler(), instanceName: 'downloadFile');
......
import 'package:appframe/data/repositories/message/upload_start_handler.dart';
import 'package:appframe/services/dispatcher.dart';
import 'package:flutter/cupertino.dart';
class UploadCancelHandler extends MessageHandler {
String? _targetUnique;
Map<String, UploadStartHandler>? _activeUploads;
void setTargetUnique(String? targetUnique) {
_targetUnique = targetUnique;
}
void setActiveUploads(Map<String, UploadStartHandler> activeUploads) {
_activeUploads = activeUploads;
}
@override
Future<dynamic> handleMessage(params) async {
try {
final uploads = _activeUploads;
if (uploads == null || uploads.isEmpty) {
return true;
}
if (_targetUnique != null && uploads.containsKey(_targetUnique)) {
uploads[_targetUnique]?.cancel();
} else {
// H5 未传 unique 时,取消所有活跃上传
for (var handler in uploads.values) {
handler.cancel();
}
}
} catch (e) {
debugPrint(e.toString());
} finally {
_targetUnique = null;
_activeUploads = null;
}
return true;
}
}
......@@ -12,6 +12,7 @@ import 'package:appframe/utils/file_type_util.dart';
import 'package:appframe/utils/image_util.dart';
import 'package:appframe/utils/video_util.dart';
import 'package:dio/dio.dart';
import 'package:ffmpeg_kit_flutter_new/ffmpeg_kit.dart';
import 'package:flutter/foundation.dart';
import 'package:path/path.dart' as path;
import 'package:path_provider/path_provider.dart';
......@@ -32,6 +33,12 @@ class UploadStartHandler extends MessageHandler {
late int _cmdTotalByte;
late int _cmdSentByte;
/// 上传用的 Dio 实例,取消时可直接关闭
Dio? _uploadDio;
/// 是否已取消
bool _isCancelled = false;
@override
void setCubit(WebCubit cubit) {
_webCubit = cubit;
......@@ -46,8 +53,17 @@ class UploadStartHandler extends MessageHandler {
_webCubit = null;
}
/// 取消当前上传任务
void cancel() {
_isCancelled = true;
_uploadDio?.close(force: true);
// FFmpegKit.cancel();
debugPrint('====================>上传任务已取消');
}
@override
Future<dynamic> handleMessage(params) async {
_isCancelled = false;
try {
if (params is! Map<String, dynamic>) {
throw ParamErrorException();
......@@ -72,6 +88,7 @@ class UploadStartHandler extends MessageHandler {
_cmdUploadId = const Uuid().v4();
final startTime = DateTime.now();
debugPrint('====================> 上传文件路径 $_cmdUnique $tempFilePath');
final result = await _handle(tempFilePath, busi, subBusi);
final endTime = DateTime.now();
debugPrint('====================>上传耗时:${endTime.millisecondsSinceEpoch - startTime.millisecondsSinceEpoch} 毫秒');
......@@ -94,11 +111,14 @@ class UploadStartHandler extends MessageHandler {
// 确保发送了 uploadEnd 指令
_webCubit?.sendUploadEnd(_cmdUnique, _cmdUploadId, '', errMsg: '网络错误,上传失败');
} catch (e) {
debugPrint('====================>上传失败:$e');
// 确保发送了 uploadEnd 指令
_webCubit?.sendUploadEnd(_cmdUnique, _cmdUploadId, '', errMsg: e.toString());
// Dio close 产生的 DioException 也会走到这里
final msg = (e is DioException && e.type == DioExceptionType.cancel) ? '上传已取消' : e.toString();
debugPrint('====================>上传失败:$msg');
_webCubit?.sendUploadEnd(_cmdUnique, _cmdUploadId, '', errMsg: msg);
} finally {
_unfollowCubit();
_uploadDio?.close(force: true);
_uploadDio = null;
}
}
......@@ -143,7 +163,7 @@ class UploadStartHandler extends MessageHandler {
outputPath,
onProgress: (progress) {
// progress 范围 0 ~ 100
debugPrint('转码进度: $progress%');
debugPrint('转码进度: $_cmdUnique $progress%');
/// 发送转码进度
_webCubit?.sendUploadProgress(_cmdUnique, _cmdUploadId, 1, progress, 0, 0, 0, 0);
......@@ -156,7 +176,7 @@ class UploadStartHandler extends MessageHandler {
'low',
onProgress: (progress) {
// progress 范围 0 ~ 100
debugPrint('压缩进度: $progress%');
debugPrint('压缩进度: $_cmdUnique $progress%');
/// 发送压缩进度
_webCubit?.sendUploadProgress(_cmdUnique, _cmdUploadId, 1, progress, 0, 0, 0, 0);
......@@ -189,6 +209,11 @@ class UploadStartHandler extends MessageHandler {
}
}
// 压缩完成后检查是否已取消
if (_isCancelled) {
throw Exception('上传已取消');
}
// 限制压缩后仍然大于300M的文件上传
if (fileSize > 1024 * 1024 * 300) {
throw Exception('上传的文件过大');
......@@ -251,7 +276,11 @@ class UploadStartHandler extends MessageHandler {
///
/// 6 上传(带进度反馈)
///
final dio = Dio()
if (_isCancelled) {
throw Exception('上传已取消');
}
// 将 dio 存为实例变量,以便 cancel() 可以直接关闭
_uploadDio = Dio()
..options = BaseOptions(
baseUrl: '',
connectTimeout: Duration(milliseconds: 30000),
......@@ -265,7 +294,11 @@ class UploadStartHandler extends MessageHandler {
// 创建分片上传任务工厂列表(延迟执行,控制并发)
final uploadTaskFactories = <Future<Map<String, dynamic>> Function()>[];
for (int i = 0; i < totalChunks; i++) {
if (_isCancelled) {
throw Exception('上传已取消');
}
// 获取签名的上传地址 URL
if (i == 0) {
final initResult = await _init(bxeApiService, objectKey, bucket);
......@@ -286,29 +319,37 @@ class UploadStartHandler extends MessageHandler {
randomAccessFile.setPositionSync(start);
await randomAccessFile.readInto(chunk, 0, actualChunkSize);
uploadTaskFactories.add(() => _uploadChunkWithProgress(
dio,
signUrls[i],
i,
chunk,
onChunkComplete: () {
_cmdUploadedChunks++;
_cmdSentByte = _cmdSentByte + actualChunkSize;
/// 发送 uploadProgress 指令,传递上传进度
_webCubit?.sendUploadProgress(
_cmdUnique,
_cmdUploadId,
2,
((_cmdUploadedChunks / _cmdTotalChunks) * 100).floor(),
_cmdTotalChunks,
_cmdUploadedChunks,
_cmdTotalByte,
_cmdSentByte,
);
},
));
uploadTaskFactories.add(() async {
if (_isCancelled) {
throw Exception('上传已取消');
}
return _uploadChunkWithProgress(
_uploadDio!,
signUrls[i],
i,
chunk,
onChunkComplete: () {
_cmdUploadedChunks++;
_cmdSentByte = _cmdSentByte + actualChunkSize;
/// 发送 uploadProgress 指令,传递上传进度
_webCubit?.sendUploadProgress(
_cmdUnique,
_cmdUploadId,
2,
((_cmdUploadedChunks / _cmdTotalChunks) * 100).floor(),
_cmdTotalChunks,
_cmdUploadedChunks,
_cmdTotalByte,
_cmdSentByte,
);
},
);
});
}
if (_isCancelled) {
throw Exception('上传已取消');
}
var resultList = await _runWithConcurrency(uploadTaskFactories);
......@@ -324,6 +365,9 @@ class UploadStartHandler extends MessageHandler {
///
/// 7 合并
///
if (_isCancelled) {
throw Exception('上传已取消');
}
var startTime2 = DateTime.now();
String location = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap);
var endTime2 = DateTime.now();
......@@ -333,10 +377,12 @@ class UploadStartHandler extends MessageHandler {
/// 8 针对视频生成封面
///
if (mimeType?.startsWith('video/') ?? false) {
await _genHwVideoCover(dio, objectKey);
if (_isCancelled) {
throw Exception('上传已取消');
}
await _genHwVideoCover(_uploadDio!, objectKey);
}
dio.close(force: true);
bxeApiService.close();
return {'url': _addPreUrl(location)};
......@@ -404,13 +450,16 @@ class UploadStartHandler extends MessageHandler {
int maxRetries = 3,
}) async {
for (int attempt = 0; attempt <= maxRetries; attempt++) {
if (_isCancelled) {
throw ChunkUploadFailedException('上传已取消');
}
try {
var starTime = DateTime.now();
final resp = await _uploadChunk(dio, signUrl, chunk, chunkIndex);
var endTime = DateTime.now();
if (resp.statusCode == 200) {
debugPrint(
'====================> 分片$chunkIndex${attempt + 1}次, $endTime 上传耗时:${endTime.millisecondsSinceEpoch - starTime.millisecondsSinceEpoch} 毫秒');
'====================> $_cmdUnique 分片$chunkIndex${attempt + 1}次, $endTime 上传耗时:${endTime.millisecondsSinceEpoch - starTime.millisecondsSinceEpoch} 毫秒');
final etags = resp.headers['etag'] as List<String>;
// 分片上传成功,触发回调
......@@ -422,7 +471,10 @@ class UploadStartHandler extends MessageHandler {
throw ChunkUploadFailedException('Chunk upload failed');
}
} catch (e) {
debugPrint('====================> 分片$chunkIndex${attempt + 1}次, 上传失败:${e.toString()}');
if (_isCancelled) {
throw ChunkUploadFailedException('上传已取消');
}
debugPrint('====================> $_cmdUnique 分片$chunkIndex${attempt + 1}次, 上传失败:${e.toString()}');
if (attempt == maxRetries) {
// throw Exception('Chunk $chunkIndex upload failed after $maxRetries attempts: $e');
throw ChunkUploadFailedException('Chunk upload failed');
......@@ -605,6 +657,10 @@ class UploadStartHandler extends MessageHandler {
Future<void> worker() async {
while (!cancelled) {
if (_isCancelled) {
cancelled = true;
break;
}
final i = currentIndex++;
if (i >= taskFactories.length) break;
try {
......
......@@ -4,6 +4,7 @@ import 'package:appframe/bloc/web_cubit.dart';
import 'package:appframe/config/locator.dart';
import 'package:appframe/data/models/message/h5_message.dart';
import 'package:appframe/data/models/message/h5_resp.dart';
import 'package:appframe/data/repositories/message/upload_cancel_handler.dart';
import 'package:appframe/data/repositories/message/upload_start_handler.dart';
// 消息处理器抽象类
......@@ -15,21 +16,22 @@ abstract class MessageHandler {
void setMessage(String message) {}
}
// 消息分发器
/// 消息分发器
class MessageDispatcher {
final Map<String, MessageHandler> _handlers = {};
final Map<String, UploadStartHandler> _activeUploads = {};
// 注册
/// 注册
void registerHandler(String command, MessageHandler handler) {
_handlers[command] = handler;
}
// 取消注册,暂时不会用到
/// 取消注册,暂时不会用到
void unregisterHandler(String command) {
_handlers.remove(command);
}
// 分发处理
/// 分发处理
Future<void> dispatch(String message, Function callback, {WebCubit? webCubit}) async {
final Map<String, dynamic> data = json.decode(message);
H5Message h5Message = H5Message.fromJson(data);
......@@ -38,7 +40,10 @@ class MessageDispatcher {
if (handler == null) {
try {
handler = getIt.get<MessageHandler>(instanceName: h5Message.cmd);
registerHandler(h5Message.cmd, handler);
// uploadStart 每次新建实例,不按 cmd 缓存
if (h5Message.cmd != 'uploadStart') {
registerHandler(h5Message.cmd, handler);
}
} catch (e) {
// 处理 getIt 找不到实例的情况
H5Resp h5Resp = H5Resp(h5Message.unique, h5Message.cmd, {}, '没有对应Cmd');
......@@ -47,6 +52,7 @@ class MessageDispatcher {
}
}
String? uploadUnique;
try {
// 设置传递的cubit,进行业务操作
if (h5Message.cmd == 'scanCode' ||
......@@ -63,9 +69,21 @@ class MessageDispatcher {
handler.setMessage(message);
}
// 针对 uploadStart 指令
// 针对 uploadStart 指令:创建新实例并追踪
if (h5Message.cmd == "uploadStart" && handler is UploadStartHandler) {
handler.setCmdUnique(h5Message.unique);
_activeUploads[h5Message.unique] = handler;
uploadUnique = h5Message.unique;
} else
// 针对 uploadCancel
if (h5Message.cmd == "uploadCancel" && handler is UploadCancelHandler) {
// 从 params 读取要取消的 unique,兼容 H5 不传的情况
String? targetUnique;
if (h5Message.params is Map<String, dynamic>) {
targetUnique = h5Message.params['unique'] as String?;
}
handler.setTargetUnique(targetUnique);
handler.setActiveUploads(_activeUploads);
}
final result = await handler.handleMessage(h5Message.params);
......@@ -79,6 +97,10 @@ class MessageDispatcher {
// 执行异常
H5Resp h5Resp = H5Resp(h5Message.unique, h5Message.cmd, {}, e.toString());
callback(h5Resp.toJson());
} finally {
if (uploadUnique != null) {
_activeUploads.remove(uploadUnique);
}
}
}
}
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!