Commit 6b31c590 by tanghuan

测试

1 parent 86fb6411
......@@ -23,6 +23,7 @@ import 'package:appframe/data/repositories/message/scan_code_handler.dart';
import 'package:appframe/data/repositories/message/set_title_handler.dart';
import 'package:appframe/data/repositories/message/storage_handler.dart';
import 'package:appframe/data/repositories/message/upload_file.dart';
import 'package:appframe/data/repositories/message/upload_file2.dart';
import 'package:appframe/data/repositories/message/vibrate_short_handler.dart';
import 'package:appframe/data/repositories/message/video_info_handler.dart';
import 'package:appframe/data/repositories/message/wifi_info_handler.dart';
......@@ -155,7 +156,8 @@ Future<void> setupLocator() async {
getIt.registerLazySingleton<MessageHandler>(() => ScanCodeHandler(), instanceName: 'scanCode');
/// 上传文件
getIt.registerLazySingleton<MessageHandler>(() => UploadFileHandler(), instanceName: 'uploadFile');
// getIt.registerLazySingleton<MessageHandler>(() => UploadFileHandler(), instanceName: 'uploadFile');
getIt.registerLazySingleton<MessageHandler>(() => UploadFile2Handler(), instanceName: 'uploadFile');
/// 下载文件
getIt.registerLazySingleton<MessageHandler>(() => DownloadFileHandler(), instanceName: 'downloadFile');
......
......@@ -6,10 +6,13 @@ import 'package:appframe/services/dispatcher.dart';
import 'package:appframe/utils/file_type_util.dart';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
import 'package:logger/logger.dart';
import 'package:path/path.dart' as path;
import 'package:uuid/uuid.dart';
class UploadFileHandler extends MessageHandler {
static final upLogger = Logger(level: Level.debug);
@override
Future handleMessage(params) async {
if (params is! Map<String, dynamic>) {
......@@ -29,10 +32,19 @@ class UploadFileHandler extends MessageHandler {
if (subBusi == null || subBusi.isEmpty) {
throw Exception('参数错误');
}
final startTimestamp = DateTime.now().millisecondsSinceEpoch;
upLogger.d('开始上传文件');
final result = await compute(_handleUpload, {'filePath': tempFilePath, 'busi': busi, 'subBusi': subBusi});
// final result = await _handleUpload({'filePath': tempFilePath});
final endTimestamp = DateTime.now().millisecondsSinceEpoch;
upLogger.d('上传完成,耗时:${endTimestamp - startTimestamp} 毫秒');
result['startTimestamp'] = startTimestamp;
result['sendTimestamp'] = endTimestamp;
return result;
}
......@@ -47,12 +59,6 @@ class UploadFileHandler extends MessageHandler {
String busi = fileParams['busi'] as String;
String subBusi = fileParams['subBusi'] as String;
print('参数-------');
print('filePath:$filePath ');
print('busi:$busi ');
print('subBusi:$subBusi ');
print('参数-------');
if (filePath.startsWith(Constant.localServerUrl)) {
filePath = filePath.replaceFirst(Constant.localServerUrl, '');
}
......@@ -67,18 +73,23 @@ class UploadFileHandler extends MessageHandler {
final obsApiService = ApiService(defaultHeaders: {'Content-Type': '', 'Accept': ''});
String logicPrefix = _getLoginPrefix(busi, subBusi);
print('logicPrefix: $logicPrefix');
//并行上传分段
final uploadResult = await _uploadInParallel(bxeApiService, obsApiService, logicPrefix, filePath);
upLogger.d("开始处理并行上传");
final uploadResult =
await _uploadInParallel(bxeApiService, obsApiService, logicPrefix, filePath, maxConcurrency: 30);
upLogger.d("并行上传完成");
// 上传结果
String objectKey = uploadResult['objectKey'] as String;
String bucket = uploadResult['bucket'] as String;
String uploadId = uploadResult['uploadId'] as String;
Map<int, String> tagsMap = uploadResult['tagsMap'] as Map<int, String>;
//请求合并文件
upLogger.d("开始处理合并文件");
String location = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap);
print('location: $location');
upLogger.d("合并文件完成");
//关闭Dio
bxeApiService.close();
......@@ -106,10 +117,15 @@ class UploadFileHandler extends MessageHandler {
throw Exception('上传的文件过大');
}
upLogger.d('文件大小:$fileSize');
//分段大小2M
final chunkSize = Constant.obsUploadChunkSize;
upLogger.d('分段大小:$chunkSize');
//分段总数
final totalChunks = (fileSize / chunkSize).ceil();
upLogger.d('分段总数:$totalChunks');
final randomAccessFile = file.openSync();
......@@ -127,14 +143,16 @@ class UploadFileHandler extends MessageHandler {
// String objectKey = 'd2/test/file.csv';
var uuid = Uuid();
String objectKey = '$logicPrefix/${uuid.v4()}${path.extension(file.path)}';
print('objectKey: $objectKey');
String uploadId = '';
Map<int, String> tagsMap = {};
final futures = <Future>[];
for (int i = 0; i < totalChunks; i++) {
upLogger.d('开始处理分段:$i');
// 控制并发数量
if (futures.length >= maxConcurrency) {
upLogger.d('超过最大并发数量,等待');
await Future.wait(futures);
futures.clear();
}
......@@ -146,6 +164,7 @@ class UploadFileHandler extends MessageHandler {
randomAccessFile.setPositionSync(start);
await randomAccessFile.readInto(chunk, 0, actualChunkSize);
final startTime = DateTime.now().millisecondsSinceEpoch;
String chunkSignUrl;
if (i == 0) {
final initResult = await _init(bxeApiService, objectKey, bucket);
......@@ -155,7 +174,8 @@ class UploadFileHandler extends MessageHandler {
final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
chunkSignUrl = nextResult['signed_url'] as String;
}
print('chunkSignUrl: $chunkSignUrl');
final endTime = DateTime.now().millisecondsSinceEpoch;
upLogger.d('分段$i,签名耗时:${endTime - startTime} 毫秒');
// await _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
final future = _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
......@@ -165,6 +185,7 @@ class UploadFileHandler extends MessageHandler {
// 等待剩余的上传完成
if (futures.isNotEmpty) {
await Future.wait(futures);
futures.clear();
}
randomAccessFile.closeSync();
......@@ -200,12 +221,17 @@ class UploadFileHandler extends MessageHandler {
Map<int, String> tagsMap, {
int maxRetries = 3,
}) async {
final start = DateTime.now().millisecondsSinceEpoch;
upLogger.d('分段$chunkIndex,开始时间:$start毫秒');
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
final resp = await _uploadChunk(obsApiService, signUrl, chunk);
final resp = await _uploadChunk(obsApiService, signUrl, chunk, chunkIndex);
if (resp.statusCode == 200) {
final etags = resp.headers['etag'] as List<String>;
tagsMap[chunkIndex + 1] = etags[0];
final end = DateTime.now().millisecondsSinceEpoch;
upLogger.d('分段$chunkIndex,上传段耗时:${end - start}毫秒');
return; // 上传成功
} else {
throw Exception('Chunk $chunkIndex upload failed: ${resp.statusCode}');
......@@ -221,14 +247,18 @@ class UploadFileHandler extends MessageHandler {
}
/// 上传段
static Future<Response> _uploadChunk(ApiService obsApiService, String signUrl, Uint8List chunk) async {
static Future<Response> _uploadChunk(
ApiService obsApiService, String signUrl, Uint8List chunk, int chunkIndex) async {
final start = DateTime.now().millisecondsSinceEpoch;
upLogger.d('分段$chunkIndex,处理开始时间 $start');
var url = signUrl.replaceFirst('AWSAccessKeyId=', 'AccessKeyId=').replaceFirst(':443', '');
try {
Response response = await obsApiService.put(url, chunk);
final end = DateTime.now().millisecondsSinceEpoch;
upLogger.d('分段$chunkIndex,处理完成时间:${end - start}毫秒');
return response;
} catch (e) {
print('Chunk upload failed: $e');
throw Exception('Chunk upload failed: $e');
}
}
......
import 'dart:io';
import 'package:appframe/config/constant.dart';
import 'package:appframe/services/api_service.dart';
import 'package:appframe/services/dispatcher.dart';
import 'package:appframe/utils/file_type_util.dart';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
import 'package:logger/logger.dart';
import 'package:path/path.dart' as path;
import 'package:uuid/uuid.dart';
class UploadFile2Handler extends MessageHandler {
static final upLogger = Logger(level: Level.debug);
@override
Future handleMessage(params) async {
if (params is! Map<String, dynamic>) {
throw Exception('参数错误');
}
final String? tempFilePath = params['tempFilePath'] as String?;
if (tempFilePath == null || tempFilePath.isEmpty) {
throw Exception('参数错误');
}
final String? busi = params['busi'] as String?;
if (busi == null || busi.isEmpty) {
throw Exception('参数错误');
}
final String? subBusi = params['subBusi'] as String?;
if (subBusi == null || subBusi.isEmpty) {
throw Exception('参数错误');
}
final startTime = DateTime.now();
upLogger.d('开始上传文件,$startTime');
// final result = await compute(_handleUpload, {'filePath': tempFilePath, 'busi': busi, 'subBusi': subBusi});
final result = await _handleUpload(tempFilePath, busi, subBusi);
final endTime = DateTime.now();
upLogger.d('完成上传文件,$endTime');
upLogger.d('上传完成,耗时:${endTime.millisecondsSinceEpoch - startTime.millisecondsSinceEpoch} 毫秒');
result['startTimestamp'] = startTime.toString();
result['endTimestamp'] = endTime.toString();
return result;
}
static const _bxeBaseUrl = 'https://iotapp-dev.banxiaoer.com/iotapp';
static const _signatureNewUrl = '/api/v1/obs/multipart/signaturenew';
static const _signatureNextUrl = '/api/v1/obs/multipart/signaturenext';
static const _completeUrl = '/api/v1/obs/multipart/complete';
/// 在Isolate中执行
// static Future<Map<String, dynamic>> _handleUpload(Map<String, dynamic> fileParams) async {
// String filePath = fileParams['filePath'] as String;
// String busi = fileParams['busi'] as String;
// String subBusi = fileParams['subBusi'] as String;
static Future<Map<String, dynamic>> _handleUpload(String filePath, String busi, String subBusi) async {
if (filePath.startsWith(Constant.localServerUrl)) {
filePath = filePath.replaceFirst(Constant.localServerUrl, '');
}
if (filePath.startsWith(Constant.localServerTemp)) {
filePath = filePath.replaceFirst(Constant.localServerTemp, '');
}
final bxeApiService = ApiService(baseUrl: _bxeBaseUrl);
// 由于服务端签名时未设置Content-Type,这里必须设置为空,否则会报签名错误
// 由于封装有默认值,所以不能不设置
final obsApiService = ApiService(defaultHeaders: {'Content-Type': '', 'Accept': ''});
String logicPrefix = _getLoginPrefix(busi, subBusi);
//并行上传分段
var startTime = DateTime.now();
upLogger.d("开始异步处理 $startTime");
final uploadResult =
await _uploadInParallel(bxeApiService, obsApiService, logicPrefix, filePath, maxConcurrency: 30);
var endTime = DateTime.now();
upLogger.d("完成异步处理 $endTime ,耗时 ${endTime.millisecondsSinceEpoch - startTime.millisecondsSinceEpoch} 毫秒");
// 上传结果
String objectKey = uploadResult['objectKey'] as String;
String bucket = uploadResult['bucket'] as String;
String uploadId = uploadResult['uploadId'] as String;
Map<int, String> tagsMap = uploadResult['tagsMap'] as Map<int, String>;
//请求合并文件
var startTime2 = DateTime.now();
upLogger.d("开始处理合并文件 $startTime2");
String location = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap);
var endTime2 = DateTime.now();
upLogger.d("合并文件完成 $endTime2 ,耗时 ${endTime2.millisecondsSinceEpoch - startTime2.millisecondsSinceEpoch} 毫秒 ");
//关闭Dio
bxeApiService.close();
obsApiService.close();
return {'url': _addPreUrl(location)};
}
/// 并行上传
static Future<Map<String, dynamic>> _uploadInParallel(
ApiService bxeApiService,
ApiService obsApiService,
String logicPrefix,
String filePath, {
int maxConcurrency = 5,
}) async {
//判断文件
File file = File(filePath);
if (!file.existsSync()) {
throw Exception('文件不存在');
}
//暂时仅支持200M的文件上传
final fileSize = file.lengthSync();
if (fileSize > 1024 * 1024 * 200) {
throw Exception('上传的文件过大');
}
upLogger.d('文件大小:$fileSize');
//分段大小2M
final chunkSize = Constant.obsUploadChunkSize;
upLogger.d('分段大小:$chunkSize');
//分段总数
final totalChunks = (fileSize / chunkSize).ceil();
upLogger.d('分段总数:$totalChunks');
final randomAccessFile = file.openSync();
//bucket 存储桶名称 : bxe-files | bxe-pics | bxe-videos
String bucket;
if (await FileTypeUtil.isImage(file)) {
bucket = 'bxe-pics';
} else if (await FileTypeUtil.isVideo(file)) {
bucket = 'bxe-videos';
} else {
bucket = 'bxe-files';
}
//生成唯一文件名,
// String objectKey = 'd2/test/file.csv';
var uuid = Uuid();
String objectKey = '$logicPrefix/${uuid.v4()}${path.extension(file.path)}';
String uploadId = '';
Map<int, String> tagsMap = {};
final futures = <Future>[];
for (int i = 0; i < totalChunks; i++) {
upLogger.d('开始处理分段:$i');
// 控制并发数量
if (futures.length >= maxConcurrency) {
upLogger.d('超过最大并发数量,等待');
var resultList = await Future.wait(futures);
upLogger.d('等待完成');
for (var result in resultList) {
if (result is Map<String, dynamic>) {
tagsMap[result['idx'] as int] = result['etag'] as String;
}
}
futures.clear();
}
final start = i * chunkSize;
final actualChunkSize = (i + 1) * chunkSize > fileSize ? fileSize - start : chunkSize;
final chunk = Uint8List(actualChunkSize);
randomAccessFile.setPositionSync(start);
await randomAccessFile.readInto(chunk, 0, actualChunkSize);
final startTime = DateTime.now().millisecondsSinceEpoch;
String chunkSignUrl;
if (i == 0) {
final initResult = await _init(bxeApiService, objectKey, bucket);
uploadId = initResult['upload_id'] as String;
chunkSignUrl = initResult['signed_url'] as String;
} else {
final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
chunkSignUrl = nextResult['signed_url'] as String;
}
final endTime = DateTime.now().millisecondsSinceEpoch;
upLogger.d('分段$i,签名耗时:${endTime - startTime} 毫秒');
// await _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
// final future = _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
// final future = _uploadChunkWithRetry({'signUrl': chunkSignUrl, 'chunkIndex': i, 'chunk': chunk});
final future = compute(_uploadChunkWithRetry, {'signUrl': chunkSignUrl, 'chunkIndex': i, 'chunk': chunk});
futures.add(future);
}
// 等待剩余的上传完成
if (futures.isNotEmpty) {
var resultList = await Future.wait(futures);
for (var result in resultList) {
if (result is Map<String, dynamic>) {
tagsMap[result['idx'] as int] = result['etag'] as String;
}
}
}
randomAccessFile.closeSync();
return {'objectKey': objectKey, 'bucket': bucket, 'uploadId': uploadId, 'tagsMap': tagsMap};
}
/// 初始化,请求后端获取签名信息和上传任务ID
static Future<Map<String, dynamic>> _init(ApiService bxeApiService, String objectKey, String bucket) async {
var endpoint = '$_signatureNewUrl?objectKey=$objectKey&bucket=$bucket';
final resp = await bxeApiService.get(endpoint);
return resp.data;
}
/// 每次上传前,请求后端获取签名信息
static Future<Map<String, dynamic>> _next(
ApiService bxeApiService,
String objectKey,
String bucket,
String uploadId,
int partNum,
) async {
var endpoint = '$_signatureNextUrl?objectKey=$objectKey&bucket=$bucket&uploadId=$uploadId&partNum=$partNum';
final resp = await bxeApiService.get(endpoint);
return resp.data;
}
/// 上传段,按照最大重试次数进行上传重试
// static Future<void> _uploadChunkWithRetry(
// ApiService obsApiService,
// String signUrl,
// int chunkIndex,
// Uint8List chunk,
// Map<int, String> tagsMap, {
// int maxRetries = 3,
// }) async {
static Future<Map<String, dynamic>> _uploadChunkWithRetry(Map<String, dynamic> uploadParams) async {
var signUrl = uploadParams['signUrl'] as String;
var chunk = uploadParams['chunk'] as Uint8List;
var chunkIndex = uploadParams['chunkIndex'] as int;
// var tagsMap = uploadParams['tagsMap'] as Map<int, String>;
var maxRetries = 30;
final start = DateTime.now();
upLogger.d('new Isolate--- 分段$chunkIndex,开始时间:$start');
for (int attempt = 0; attempt <= maxRetries; attempt++) {
upLogger.d('new Isolate--- 分段$chunkIndex,第 ${attempt + 1} 次,开始时间:${DateTime.now()}');
try {
final obsApiService = ApiService(defaultHeaders: {'Content-Type': '', 'Accept': ''});
final resp = await _uploadChunk(obsApiService, signUrl, chunk, chunkIndex);
if (resp.statusCode == 200) {
final etags = resp.headers['etag'] as List<String>;
// tagsMap[chunkIndex + 1] = etags[0];
final end = DateTime.now();
upLogger.d(
'new Isolate--- 分段$chunkIndex,完成时间: $end,上传段耗时:${end.millisecondsSinceEpoch - start.millisecondsSinceEpoch}毫秒');
return Future.value({'idx': chunkIndex + 1, 'etag': etags[0]}); // 上传成功
} else {
throw Exception('Chunk $chunkIndex upload failed: ${resp.statusCode}');
}
} catch (e) {
if (attempt == maxRetries) {
throw Exception('Chunk $chunkIndex upload failed after $maxRetries attempts: $e');
}
// 等待后重试
await Future.delayed(Duration(seconds: 2 * attempt));
}
}
throw Exception('上传失败');
}
/// 上传段
static Future<Response> _uploadChunk(
ApiService obsApiService, String signUrl, Uint8List chunk, int chunkIndex) async {
var url = signUrl.replaceFirst('AWSAccessKeyId=', 'AccessKeyId=').replaceFirst(':443', '');
try {
Response response = await obsApiService.put(url, chunk);
return response;
} catch (e) {
throw Exception('Chunk upload failed: $e');
}
}
/// 请求合并文件
static Future<String> _merge(
ApiService bxeApiService,
String objectKey,
String bucket,
String uploadId,
Map<int, String> tagsMap,
) async {
final parts = [];
for (int i = 1; i <= tagsMap.length; i++) {
parts.add({'partNumber': i, 'etag': tagsMap[i]});
}
final response = await bxeApiService.post(_completeUrl, {
'objectKey': objectKey,
'bucket': bucket,
'uploadId': uploadId,
'parts': parts,
});
if (response.statusCode != 200) {
throw Exception('合并文件失败');
}
return response.data["location"];
}
static String _getLoginPrefix(String busi, String subBusi) {
var now = DateTime.now();
var year = now.year;
var month = now.month;
var day = now.day;
return 'd2/pridel/user/$year$month$day/bxe/${busi}_$subBusi';
}
static String _addPreUrl(String location) {
// /bxe-pics/d2/pridel/user/20251017/bxe/bxe_homework/f4ea233d-9e1b-4a3f-bc8f-b64e776f42a6.jpg
if (location.startsWith('/bxe-files')) {
return 'https://files-obs.banxiaoer.com${location.substring(10)}';
} else if (location.startsWith('/bxe-pics')) {
return 'https://pics-obs.banxiaoer.com${location.substring(9)}';
} else if (location.startsWith('/bxe-videos')) {
return 'https://videos-obs.banxiaoer.com${location.substring(11)}';
} else {
return location;
}
}
}
......@@ -8,7 +8,7 @@ import 'app.dart';
void main() async {
WidgetsFlutterBinding.ensureInitialized();
await setupLocator();
await registerMqtt();
// await registerMqtt();
await initDatabase();
runApp(const App());
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!