upload_file.dart 7.62 KB
import 'dart:io';

import 'package:appframe/services/api_service.dart';
import 'package:appframe/services/dispatcher.dart';
import 'package:appframe/utils/file_type_util.dart';
import 'package:dio/dio.dart';
import 'package:flutter/foundation.dart';
import 'package:path/path.dart' as path;
import 'package:uuid/uuid.dart';

class UploadFileHandler extends MessageHandler {
  @override
  Future handleMessage(params) async {
    if (params is! Map<String, dynamic>) {
      throw Exception('参数错误');
    }
    final String? tempFilePath = params['tempFilePath'] as String?;
    if (tempFilePath == null || tempFilePath.isEmpty) {
      throw Exception('参数错误');
    }

    final result = await compute(_handleUpload, {'filePath': tempFilePath});
    // final result = await _handleUpload({'filePath': tempFilePath});

    return result;
  }

  static const _bxeBaseUrl = 'https://iotapp-dev.banxiaoer.com/iotapp';
  static const _signatureNewUrl = '/api/v1/obs/multipart/signaturenew';
  static const _signatureNextUrl = '/api/v1/obs/multipart/signaturenext';
  static const _completeUrl = '/api/v1/obs/multipart/complete';

  /// 在Isolate中执行
  static Future<Map<String, dynamic>> _handleUpload(Map<String, dynamic> fileParams) async {
    var filePath = fileParams['filePath'] as String;

    if (filePath.startsWith('/temp')) {
      filePath = filePath.substring(5);
    }

    final bxeApiService = ApiService(baseUrl: _bxeBaseUrl);
    // 由于服务端签名时未设置Content-Type,这里必须设置为空,否则会报签名错误
    // 由于封装有默认值,所以不能不设置
    final obsApiService = ApiService(defaultHeaders: {'Content-Type': '', 'Accept': ''});

    //并行上传分段
    final uploadResult = await _uploadInParallel(bxeApiService, obsApiService, filePath);
    String objectKey = uploadResult['objectKey'] as String;
    String bucket = uploadResult['bucket'] as String;
    String uploadId = uploadResult['uploadId'] as String;
    Map<int, String> tagsMap = uploadResult['tagsMap'] as Map<int, String>;

    //请求合并文件
    Response response = await _merge(bxeApiService, objectKey, bucket, uploadId, tagsMap);

    //关闭Dio
    bxeApiService.close();
    obsApiService.close();

    return {'url': response.data['location']};
  }

  /// 并行上传
  static Future<Map<String, dynamic>> _uploadInParallel(
    ApiService bxeApiService,
    ApiService obsApiService,
    String filePath, {
    int maxConcurrency = 5,
  }) async {
    //判断文件
    File file = File(filePath);
    if (!file.existsSync()) {
      throw Exception('文件不存在');
    }
    //暂时仅支持200M的文件上传
    final fileSize = file.lengthSync();
    if (fileSize > 1024 * 1024 * 200) {
      throw Exception('上传的文件过大');
    }

    //分段大小1M
    final chunkSize = 1024 * 1024 * 1;
    //分段总数
    final totalChunks = (fileSize / chunkSize).ceil();

    final randomAccessFile = file.openSync();

    //bucket 存储桶名称 : bxe-files | bxe-pics | bxe-videos
    String bucket;
    if (await FileTypeUtil.isImage(file)) {
      bucket = 'bxe-pics';
    } else if (await FileTypeUtil.isVideo(file)) {
      bucket = 'bxe-videos';
    } else {
      bucket = 'bxe-files';
    }

    //生成唯一文件名,
    // String objectKey = 'd2/test/file.csv';
    var uuid = Uuid();
    String objectKey = '${uuid.v5(Namespace.url.value, 'www.banxiaoer.com')}${path.extension(file.path)}';
    String uploadId = '';
    Map<int, String> tagsMap = {};

    // final futures = <Future>[];
    for (int i = 0; i < totalChunks; i++) {
      // 控制并发数量
      // if (futures.length >= maxConcurrency) {
      //   await Future.wait(futures);
      //   futures.clear();
      // }

      final start = i * chunkSize;
      final actualChunkSize = (i + 1) * chunkSize > fileSize ? fileSize - start : chunkSize;
      final chunk = Uint8List(actualChunkSize);

      randomAccessFile.setPositionSync(start);
      await randomAccessFile.readInto(chunk, 0, actualChunkSize);

      String chunkSignUrl;
      if (i == 0) {
        final initResult = await _init(bxeApiService, objectKey, bucket);
        uploadId = initResult['upload_id'] as String;
        chunkSignUrl = initResult['signed_url'] as String;
      } else {
        final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
        chunkSignUrl = nextResult['signed_url'] as String;
      }

      await _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
      // final future = _uploadChunkWithRetry(obsApiService, chunkSignUrl, i, chunk, tagsMap);
      // futures.add(future);
    }

    // 等待剩余的上传完成
    // if (futures.isNotEmpty) {
    //   await Future.wait(futures);
    // }

    randomAccessFile.closeSync();
    return {'objectKey': objectKey, 'bucket': bucket, 'uploadId': uploadId, 'tagsMap': tagsMap};
  }

  /// 初始化,请求后端获取签名信息和上传任务ID
  static Future<Map<String, dynamic>> _init(ApiService bxeApiService, String objectKey, String bucket) async {
    var endpoint = '$_signatureNewUrl?objectKey=$objectKey&bucket=$bucket';
    final resp = await bxeApiService.get(endpoint);
    return resp.data;
  }

  /// 每次上传前,请求后端获取签名信息
  static Future<Map<String, dynamic>> _next(
    ApiService bxeApiService,
    String objectKey,
    String bucket,
    String uploadId,
    int partNum,
  ) async {
    var endpoint = '$_signatureNextUrl?objectKey=$objectKey&bucket=$bucket&uploadId=$uploadId&partNum=$partNum';
    final resp = await bxeApiService.get(endpoint);
    return resp.data;
  }

  /// 上传段,按照最大重试次数进行上传重试
  static Future<void> _uploadChunkWithRetry(
    ApiService obsApiService,
    String signUrl,
    int chunkIndex,
    Uint8List chunk,
    Map<int, String> tagsMap, {
    int maxRetries = 3,
  }) async {
    for (int attempt = 0; attempt <= maxRetries; attempt++) {
      try {
        final resp = await _uploadChunk(obsApiService, signUrl, chunk);
        if (resp.statusCode == 200) {
          final etags = resp.headers['etag'] as List<String>;
          tagsMap[chunkIndex + 1] = etags[0];
          return; // 上传成功
        } else {
          print('Chunk $chunkIndex upload failed: ${resp.statusCode} ${resp.data}');
          throw Exception('Chunk $chunkIndex upload failed: ${resp.statusCode}');
        }
      } catch (e) {
        if (attempt == maxRetries) {
          throw Exception('Chunk $chunkIndex upload failed after $maxRetries attempts: $e');
        }
        // 等待后重试
        await Future.delayed(Duration(seconds: 2 * attempt));
      }
    }
  }

  /// 上传段
  static Future<Response> _uploadChunk(ApiService obsApiService, String signUrl, Uint8List chunk) async {
    var url = signUrl.replaceFirst('AWSAccessKeyId=', 'AccessKeyId=').replaceFirst(':443', '');

    try {
      Response response = await obsApiService.put(url, chunk);
      return response;
    } catch (e) {
      print('Chunk upload failed: $e');
      throw Exception('Chunk upload failed: $e');
    }
  }

  /// 请求合并文件
  static Future<Response> _merge(
    ApiService bxeApiService,
    String objectKey,
    String bucket,
    String uploadId,
    Map<int, String> tagsMap,
  ) async {
    final parts = [];
    for (int i = 1; i <= tagsMap.length; i++) {
      parts.add({'partNumber': i, 'etag': tagsMap[i]});
    }

    final response = await bxeApiService.post(_completeUrl, {
      'objectKey': objectKey,
      'bucket': bucket,
      'uploadId': uploadId,
      'parts': parts,
    });

    if (response.statusCode != 200) {
      throw Exception('合并文件失败');
    }

    return response;
  }
}