upload_file.dart 7.6 KB
import 'dart:io';

import 'package:appframe/services/api_service.dart';
import 'package:appframe/services/dispatcher.dart';
import 'package:appframe/utils/file_type_util.dart';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
import 'package:path/path.dart' as path;
import 'package:uuid/uuid.dart';

class UploadFileHandler extends MessageHandler {
  @override
  Future handleMessage(params) async {
    if (params is! Map<String, dynamic>) {
      throw Exception('参数错误');
    }
    final String? tempFilePath = params['tempFilePath'] as String?;
    if (tempFilePath == null || tempFilePath.isEmpty) {
      throw Exception('参数错误');
    }

    final result = await compute(_handleUpload, {'filePath': tempFilePath});
    // final result = await _handleUpload({'filePath': tempFilePath});

    return result;
  }

  static const _bxeBaseUrl = 'https://iotapp-dev.banxiaoer.com/iotapp';
  static const _signatureNewUrl = '/api/v1/obs/multipart/signaturenew';
  static const _signatureNextUrl = '/api/v1/obs/multipart/signaturenext';
  static const _completeUrl = '/api/v1/obs/multipart/complete';

  /// 在Isolate中执行
  static Future<Map<String, dynamic>> _handleUpload(Map<String, dynamic> fileParams) async {
    var filePath = fileParams['filePath'] as String;

    if (filePath.startsWith('/temp')) {
      filePath = filePath.substring(5);
    }

    final bxeApiService = ApiService(baseUrl: _bxeBaseUrl);
    // 由于服务端签名时未设置Content-Type,这里必须设置为空,否则会报签名错误
    // 由于封装有默认值,所以不能不设置
    final obsApiService = ApiService(defaultHeaders: {'Content-Type': '', 'Accept': ''});

    //并行上传分段
    final uploadResult = await _uploadInParallel(bxeApiService, obsApiService, filePath);
    String objectKey = uploadResult['objectKey'] as String;
    String bucket = uploadResult['bucket'] as String;
    String uploadId = uploadResult['uploadId'] as String;
    Map<int, String> tagsMap = uploadResult['tagsMap'] as Map<int, String>;

    //请求合并文件
    Response response = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap);

    //关闭Dio
    bxeApiService.close();
    obsApiService.close();

    return {'url': response.data['location']};
  }

  /// 并行上传
  static Future<Map<String, dynamic>> _uploadInParallel(ApiService bxeApiService,
      ApiService obsApiService,
      String filePath, {
        int maxConcurrency = 5,
      }) async {
    //判断文件
    File file = File(filePath);
    if (!file.existsSync()) {
      throw Exception('文件不存在');
    }
    //暂时仅支持200M的文件上传
    final fileSize = file.lengthSync();
    if (fileSize > 1024 * 1024 * 200) {
      throw Exception('上传的文件过大');
    }

    //分段大小5M
    final chunkSize = 1024 * 1024 * 5;
    //分段总数
    final totalChunks = (fileSize / chunkSize).ceil();

    final randomAccessFile = file.openSync();

    //bucket 存储桶名称 : bxe-files | bxe-pics | bxe-videos
    String bucket;
    if (await FileTypeUtil.isImage(file)) {
      bucket = 'bxe-pics';
    } else if (await FileTypeUtil.isVideo(file)) {
      bucket = 'bxe-videos';
    } else {
      bucket = 'bxe-files';
    }

    //生成唯一文件名,
    // String objectKey = 'd2/test/file.csv';
    var uuid = Uuid();
    // String objectKey = '${uuid.v5(Namespace.url.value, 'www.banxiaoer.com')}${path.extension(file.path)}';
    String objectKey = '${uuid.v4()}${path.extension(file.path)}';
    String uploadId = '';
    Map<int, String> tagsMap = {};

    final futures = <Future>[];
    for (int i = 0; i < totalChunks; i++) {
      // 控制并发数量
      if (futures.length >= maxConcurrency) {
        await Future.wait(futures);
        futures.clear();
      }

      final start = i * chunkSize;
      final actualChunkSize = (i + 1) * chunkSize > fileSize ? fileSize - start : chunkSize;
      final chunk = Uint8List(actualChunkSize);

      randomAccessFile.setPositionSync(start);
      await randomAccessFile.readInto(chunk, 0, actualChunkSize);

      String chunkSignUrl;
      if (i == 0) {
        final initResult = await _init(bxeApiService, objectKey, bucket);
        uploadId = initResult['upload_id'] as String;
        chunkSignUrl = initResult['signed_url'] as String;
      } else {
        final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
        chunkSignUrl = nextResult['signed_url'] as String;
      }

      // await _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
      final future = _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
      futures.add(future);
    }

    // 等待剩余的上传完成
    if (futures.isNotEmpty) {
      await Future.wait(futures);
    }

    randomAccessFile.closeSync();
    return {'objectKey': objectKey, 'bucket': bucket, 'uploadId': uploadId, 'tagsMap': tagsMap};
  }

  /// 初始化,请求后端获取签名信息和上传任务ID
  static Future<Map<String, dynamic>> _init(ApiService bxeApiService, String objectKey, String bucket) async {
    var endpoint = '$_signatureNewUrl?objectKey=$objectKey&bucket=$bucket';
    final resp = await bxeApiService.get(endpoint);
    return resp.data;
  }

  /// 每次上传前,请求后端获取签名信息
  static Future<Map<String, dynamic>> _next(ApiService bxeApiService,
      String objectKey,
      String bucket,
      String uploadId,
      int partNum,) async {
    var endpoint = '$_signatureNextUrl?objectKey=$objectKey&bucket=$bucket&uploadId=$uploadId&partNum=$partNum';
    final resp = await bxeApiService.get(endpoint);
    return resp.data;
  }

  /// 上传段,按照最大重试次数进行上传重试
  static Future<void> _uploadChunkWithRetry(ApiService obsApiService,
      String signUrl,
      int chunkIndex,
      Uint8List chunk,
      Map<int, String> tagsMap, {
        int maxRetries = 3,
      }) async {
    for (int attempt = 0; attempt <= maxRetries; attempt++) {
      try {
        final resp = await _uploadChunk(obsApiService, signUrl, chunk);
        if (resp.statusCode == 200) {
          final etags = resp.headers['etag'] as List<String>;
          tagsMap[chunkIndex + 1] = etags[0];
          return; // 上传成功
        } else {
          throw Exception('Chunk $chunkIndex upload failed: ${resp.statusCode}');
        }
      } catch (e) {
        if (attempt == maxRetries) {
          throw Exception('Chunk $chunkIndex upload failed after $maxRetries attempts: $e');
        }
        // 等待后重试
        await Future.delayed(Duration(seconds: 2 * attempt));
      }
    }
  }

  /// 上传段
  static Future<Response> _uploadChunk(ApiService obsApiService, String signUrl, Uint8List chunk) async {
    var url = signUrl.replaceFirst('AWSAccessKeyId=', 'AccessKeyId=').replaceFirst(':443', '');

    try {
      Response response = await obsApiService.put(url, chunk);
      return response;
    } catch (e) {
      print('Chunk upload failed: $e');
      throw Exception('Chunk upload failed: $e');
    }
  }

  /// 请求合并文件
  static Future<Response> _merge(ApiService bxeApiService,
      String objectKey,
      String bucket,
      String uploadId,
      Map<int, String> tagsMap,) async {
    final parts = [];
    for (int i = 1; i <= tagsMap.length; i++) {
      parts.add({'partNumber': i, 'etag': tagsMap[i]});
    }

    final response = await bxeApiService.post(_completeUrl, {
      'objectKey': objectKey,
      'bucket': bucket,
      'uploadId': uploadId,
      'parts': parts,
    });

    if (response.statusCode != 200) {
      throw Exception('合并文件失败');
    }

    return response;
  }
}