upload_file.dart 11.7 KB
import 'dart:io';

import 'package:appframe/config/constant.dart';
import 'package:appframe/services/api_service.dart';
import 'package:appframe/services/dispatcher.dart';
import 'package:appframe/utils/file_type_util.dart';
import 'package:appframe/utils/video_util.dart';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
import 'package:path/path.dart' as path;
import 'package:path_provider/path_provider.dart';
import 'package:uuid/uuid.dart';

class UploadFileHandler extends MessageHandler {
  // late Dio _dio;

  // UploadFile5Handler() : _dio = Dio() {
  //   // _dio.httpClientAdapter = Http2Adapter(
  //   //   ConnectionManager(idleTimeout: Duration(seconds: 10)),
  //   // );
  //
  //   int connectTimeout = 30000;
  //   int receiveTimeout = 30000;
  //
  //   _dio.options = BaseOptions(
  //     baseUrl: '',
  //     connectTimeout: Duration(milliseconds: connectTimeout),
  //     receiveTimeout: Duration(milliseconds: receiveTimeout),
  //     headers: {'Content-Type': '', 'Accept': ''},
  //   );
  // }

  @override
  Future<dynamic> handleMessage(params) async {
    if (params is! Map<String, dynamic>) {
      throw Exception('参数错误');
    }
    final String? tempFilePath = params['tempFilePath'] as String?;
    if (tempFilePath == null || tempFilePath.isEmpty) {
      throw Exception('参数错误');
    }

    final String? busi = params['busi'] as String?;
    if (busi == null || busi.isEmpty) {
      throw Exception('参数错误');
    }

    final String? subBusi = params['subBusi'] as String?;
    if (subBusi == null || subBusi.isEmpty) {
      throw Exception('参数错误');
    }

    // _dio = Dio()
    //       ..options = BaseOptions(
    //         baseUrl: '',
    //         connectTimeout: Duration(milliseconds: 30000),
    //         receiveTimeout: Duration(milliseconds: 30000),
    //         headers: {'Content-Type': '', 'Accept': ''},
    //       )
    //     /*..httpClientAdapter = Http2Adapter(
    //     ConnectionManager(idleTimeout: Duration(seconds: 10)),
    //   )*/
    //     ;

    final startTime = DateTime.now();
    final result = await _handle(tempFilePath, busi, subBusi);
    final endTime = DateTime.now();
    print('====================>上传耗时:${endTime.millisecondsSinceEpoch - startTime.millisecondsSinceEpoch} 毫秒');

    // result['startTime'] = startTime.toString();
    // result['endTime'] = endTime.toString();

    return result;
  }

  Future<Map<String, dynamic>> _handle(String filePath, String busi, String subBusi) async {
    ///
    /// 1 判断
    ///
    if (filePath.startsWith(Constant.localFileUrl)) {
      filePath = filePath.replaceFirst(Constant.localFileUrl, '');
    }

    if (filePath.startsWith(Constant.localServerTemp)) {
      filePath = filePath.replaceFirst(Constant.localServerTemp, '');
    }

    //判断文件
    File file = File(filePath);
    if (!file.existsSync()) {
      throw Exception('文件不存在');
    }
    //暂时仅支持200M的文件上传
    var fileSize = file.lengthSync();
    if (fileSize > 1024 * 1024 * 200) {
      throw Exception('上传的文件过大');
    }
    print('原始文件大小:$fileSize 字节');

    ///
    /// 视频文件上传之前进行压缩
    /// 非 mp4 格式的视频文件需先转码
    ///
    String? mimeType = await FileTypeUtil.getMimeType(file);
    if (mimeType?.startsWith('video/') ?? false) {
      final inputPath = filePath;
      final tempDir = await getTemporaryDirectory();
      final outputPath = '${tempDir.path}/${Uuid().v4()}.mp4';

      var startTime = DateTime.now();
      if (mimeType != 'video/mp4') {
        await VideoUtil.convertToMp4(inputPath, outputPath);
      } else {
        await VideoUtil.compressVideo(inputPath, outputPath, 'low');
      }
      var endTime = DateTime.now();
      print('====================>压缩耗时:${endTime.millisecondsSinceEpoch - startTime.millisecondsSinceEpoch} 毫秒');

      file = File(outputPath);
      fileSize = file.lengthSync();
    }

    /// 2
    /// bucket 存储桶名称 : bxe-files | bxe-pics | bxe-videos
    ///
    String bucket;
    if (mimeType?.startsWith('image/') ?? false) {
      bucket = 'bxe-pics';
    } else if (mimeType?.startsWith('video/') ?? false) {
      bucket = 'bxe-videos';
    } else {
      bucket = 'bxe-files';
    }

    /// 3
    /// objectKey
    var uuid = Uuid();
    String logicPrefix = _getLoginPrefix(busi, subBusi);
    String objectKey = '$logicPrefix/${uuid.v4()}${path.extension(file.path)}';

    ///
    /// 4 计算分片
    ///
    final chunkSize = Constant.obsUploadChunkSize;
    final totalChunks = (fileSize / chunkSize).ceil();
    print('上传文件大小:$fileSize 字节');
    print('分片数量:$totalChunks');

    ///
    /// 5 sig
    ///
    var startTime1 = DateTime.now();
    print('====================>签名开始 $startTime1');
    final bxeApiService = ApiService(baseUrl: _bxeBaseUrl);
    late String uploadId;
    var signUrls = [];
    for (int i = 0; i < totalChunks; i++) {
      if (i == 0) {
        final initResult = await _init(bxeApiService, objectKey, bucket);
        uploadId = initResult['upload_id'] as String;
        var signUrl = initResult['signed_url'] as String;
        signUrls.add(signUrl);
      } else {
        final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
        var signUrl = nextResult['signed_url'] as String;
        signUrls.add(signUrl);
      }
    }
    var endTime1 = DateTime.now();
    print('====================>签名耗时:${endTime1.millisecondsSinceEpoch - startTime1.millisecondsSinceEpoch} 毫秒');

    ///
    /// 6 上传
    ///
    final dio = Dio()
      ..options = BaseOptions(
        baseUrl: '',
        connectTimeout: Duration(milliseconds: 30000),
        receiveTimeout: Duration(milliseconds: 30000),
        headers: {'Content-Type': '', 'Accept': ''},
      );

    final randomAccessFile = await file.open();
    Map<int, String> tagsMap = {};
    final futures = <Future>[];

    for (int i = 0; i < totalChunks; i++) {
      final chunkSize = Constant.obsUploadChunkSize;
      final start = i * chunkSize;
      final actualChunkSize = (i + 1) * chunkSize > fileSize ? fileSize - start : chunkSize;

      final chunk = Uint8List(actualChunkSize);
      randomAccessFile.setPositionSync(start);
      await randomAccessFile.readInto(chunk, 0, actualChunkSize);

      futures.add(_uploadChunkWithRetry(dio, signUrls[i], i, chunk));
    }

    var resultList = await Future.wait(futures);
    for (var result in resultList) {
      if (result is Map<String, dynamic>) {
        tagsMap[result['idx'] as int] = result['etag'] as String;
      }
    }
    futures.clear();

    await randomAccessFile.close();
    dio.close(force:  true);

    ///
    /// 7 合并
    ///
    var startTime2 = DateTime.now();
    String location = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap);
    var endTime2 = DateTime.now();
    print('====================>合并签名耗时:${endTime2.millisecondsSinceEpoch - startTime2.millisecondsSinceEpoch} 毫秒');

    bxeApiService.close();

    return {'url': _addPreUrl(location)};
  }

  static const _bxeBaseUrl = 'https://iotapp-dev.banxiaoer.com/iotapp';
  static const _signatureNewUrl = '/api/v1/obs/multipart/signaturenew';
  static const _signatureNextUrl = '/api/v1/obs/multipart/signaturenext';
  static const _completeUrl = '/api/v1/obs/multipart/complete';

  /// 初始化,请求后端获取签名信息和上传任务ID
  Future<Map<String, dynamic>> _init(ApiService bxeApiService, String objectKey, String bucket) async {
    var endpoint = '$_signatureNewUrl?objectKey=$objectKey&bucket=$bucket';
    final resp = await bxeApiService.get(endpoint);
    return resp.data;
  }

  /// 每次上传前,请求后端获取签名信息
  Future<Map<String, dynamic>> _next(
      ApiService bxeApiService,
      String objectKey,
      String bucket,
      String uploadId,
      int partNum,
      ) async {
    var endpoint = '$_signatureNextUrl?objectKey=$objectKey&bucket=$bucket&uploadId=$uploadId&partNum=$partNum';
    final resp = await bxeApiService.get(endpoint);
    return resp.data;
  }

  /// 上传段,按照最大重试次数进行上传重试
  Future<Map<String, dynamic>> _uploadChunkWithRetry(
      Dio dio,
      String signUrl,
      int chunkIndex,
      Uint8List chunk, {
        int maxRetries = 3,
      }) async {
    //print('====================> 分片$chunkIndex , 开始上传 ${DateTime.now()}');
    for (int attempt = 0; attempt <= maxRetries; attempt++) {
      try {
        var starTime = DateTime.now();
        final resp = await _uploadChunk(dio, signUrl, chunk, chunkIndex);
        var endTime = DateTime.now();
        if (resp.statusCode == 200) {
          print(
              '====================> 分片$chunkIndex${attempt + 1}次, $endTime 上传耗时:${endTime.millisecondsSinceEpoch - starTime.millisecondsSinceEpoch} 毫秒');
          final etags = resp.headers['etag'] as List<String>;
          return Future.value({'idx': chunkIndex + 1, 'etag': etags[0]}); // 上传成功
        } else {
          throw Exception('Chunk $chunkIndex upload failed: ${resp.statusCode}');
        }
      } catch (e) {
        print('====================> 分片$chunkIndex${attempt + 1}次, 上传失败:${e.toString()}');
        if (attempt == maxRetries) {
          throw Exception('Chunk $chunkIndex upload failed after $maxRetries attempts: $e');
        }
        // 等待后重试
        await Future.delayed(Duration(seconds: 2 * attempt));
      }
    }
    throw Exception('上传失败');
  }

  /// 上传段
  Future<Response> _uploadChunk(Dio dio,String signUrl, Uint8List chunk, int chunkIndex) async {
    var url = signUrl.replaceFirst('AWSAccessKeyId=', 'AccessKeyId=').replaceFirst(':443', '');
    try {
      // Response response = await _put(url, chunk);
      print('====================> 分片$chunkIndex , 开始上传 ${DateTime.now()}');
      final response = await dio.put(
        url,
        // data: Stream.fromIterable(chunk.map((e) => [e])),
        // data: Stream.fromIterable([chunk]),
        data: chunk,
      );
      print('====================> 分片$chunkIndex , 上传成功 ${DateTime.now()}');

      return response;
    } catch (e) {
      throw Exception('Chunk upload failed: $e');
    }
  }

  /// 请求合并文件
  Future<String> _merge(
      ApiService bxeApiService,
      String objectKey,
      String bucket,
      String uploadId,
      Map<int, String> tagsMap,
      ) async {
    final parts = [];
    for (int i = 1; i <= tagsMap.length; i++) {
      parts.add({'partNumber': i, 'etag': tagsMap[i]});
    }

    final response = await bxeApiService.post(_completeUrl, {
      'objectKey': objectKey,
      'bucket': bucket,
      'uploadId': uploadId,
      'parts': parts,
    });

    if (response.statusCode != 200) {
      throw Exception('合并文件失败');
    }

    return response.data["location"];
  }

  String _getLoginPrefix(String busi, String subBusi) {
    var now = DateTime.now();
    var year = now.year;
    var month = now.month;
    var day = now.day;

    return 'd2/pridel/user/$year$month$day/bxe/${busi}_$subBusi';
  }

  String _addPreUrl(String location) {
    // /bxe-pics/d2/pridel/user/20251017/bxe/bxe_homework/f4ea233d-9e1b-4a3f-bc8f-b64e776f42a6.jpg
    if (location.startsWith('/bxe-files')) {
      return 'https://files-obs.banxiaoer.com${location.substring(10)}';
    } else if (location.startsWith('/bxe-pics')) {
      return 'https://pics-obs.banxiaoer.com${location.substring(9)}';
    } else if (location.startsWith('/bxe-video')) {
      return 'https://video-obs.banxiaoer.com${location.substring(10)}';
    } else {
      return location;
    }
  }

}