Commit 2b2f2ec4 by tanghuan

1.获取上传URL时增加错误重试。2.控制上传的并发数。

1 parent 647ceae6
......@@ -228,25 +228,25 @@ class UploadStartHandler extends MessageHandler {
///
/// 5 sig
///
var startTime1 = DateTime.now();
debugPrint('====================>签名开始 $startTime1');
// var startTime1 = DateTime.now();
// debugPrint('====================>签名开始 $startTime1');
final bxeApiService = ApiService(baseUrl: Constant.iotAppBaseUrl);
late String uploadId;
var signUrls = [];
for (int i = 0; i < totalChunks; i++) {
if (i == 0) {
final initResult = await _init(bxeApiService, objectKey, bucket);
uploadId = initResult['upload_id'] as String;
var signUrl = initResult['signed_url'] as String;
signUrls.add(signUrl);
} else {
final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
var signUrl = nextResult['signed_url'] as String;
signUrls.add(signUrl);
}
}
var endTime1 = DateTime.now();
debugPrint('====================>签名耗时:${endTime1.millisecondsSinceEpoch - startTime1.millisecondsSinceEpoch} 毫秒');
// for (int i = 0; i < totalChunks; i++) {
// if (i == 0) {
// final initResult = await _init(bxeApiService, objectKey, bucket);
// uploadId = initResult['upload_id'] as String;
// var signUrl = initResult['signed_url'] as String;
// signUrls.add(signUrl);
// } else {
// final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
// var signUrl = nextResult['signed_url'] as String;
// signUrls.add(signUrl);
// }
// }
// var endTime1 = DateTime.now();
// debugPrint('====================>签名耗时:${endTime1.millisecondsSinceEpoch - startTime1.millisecondsSinceEpoch} 毫秒');
///
/// 6 上传(带进度反馈)
......@@ -262,10 +262,22 @@ class UploadStartHandler extends MessageHandler {
final randomAccessFile = await file.open();
Map<int, String> tagsMap = {};
// 创建分片上传任务列表
final uploadTasks = <Future<Map<String, dynamic>>>[];
// 创建分片上传任务工厂列表(延迟执行,控制并发)
final uploadTaskFactories = <Future<Map<String, dynamic>> Function()>[];
for (int i = 0; i < totalChunks; i++) {
// 获取签名的上传地址 URL
if (i == 0) {
final initResult = await _init(bxeApiService, objectKey, bucket);
uploadId = initResult['upload_id'] as String;
var signUrl = initResult['signed_url'] as String;
signUrls.add(signUrl);
} else {
final nextResult = await _next(bxeApiService, objectKey, bucket, uploadId, i + 1);
var signUrl = nextResult['signed_url'] as String;
signUrls.add(signUrl);
}
final chunkSize = Constant.obsUploadChunkSize;
final start = i * chunkSize;
final actualChunkSize = (i + 1) * chunkSize > fileSize ? fileSize - start : chunkSize;
......@@ -274,7 +286,7 @@ class UploadStartHandler extends MessageHandler {
randomAccessFile.setPositionSync(start);
await randomAccessFile.readInto(chunk, 0, actualChunkSize);
uploadTasks.add(_uploadChunkWithProgress(
uploadTaskFactories.add(() => _uploadChunkWithProgress(
dio,
signUrls[i],
i,
......@@ -299,7 +311,8 @@ class UploadStartHandler extends MessageHandler {
));
}
var resultList = await Future.wait(uploadTasks);
var resultList = await _runWithConcurrency(uploadTaskFactories);
for (var result in resultList) {
if (result is Map<String, dynamic>) {
tagsMap[result['idx'] as int] = result['etag'] as String;
......@@ -334,14 +347,26 @@ class UploadStartHandler extends MessageHandler {
static const _completeUrl = '/api/v1/obs/multipart/complete';
/// 初始化,请求后端获取签名信息和上传任务ID
Future<Map<String, dynamic>> _init(ApiService bxeApiService, String objectKey, String bucket) async {
try {
var endpoint = '$_signatureNewUrl?objectKey=$objectKey&bucket=$bucket';
final resp = await bxeApiService.get(endpoint);
return resp.data;
} catch (e) {
throw ChunkSigException('初始化上传任务失败: $e');
Future<Map<String, dynamic>> _init(
ApiService bxeApiService,
String objectKey,
String bucket, {
int maxRetries = 3,
}) async {
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
var endpoint = '$_signatureNewUrl?objectKey=$objectKey&bucket=$bucket';
final resp = await bxeApiService.get(endpoint);
return resp.data;
} catch (e) {
debugPrint('====================> init上传任务 第${attempt + 1}次失败:${e.toString()}');
if (attempt == maxRetries) {
throw ChunkSigException('初始化上传任务失败: $e');
}
await Future.delayed(Duration(seconds: 2 * (attempt + 1)));
}
}
throw ChunkSigException('初始化上传任务失败');
}
/// 每次上传前,请求后端获取签名信息
......@@ -350,15 +375,23 @@ class UploadStartHandler extends MessageHandler {
String objectKey,
String bucket,
String uploadId,
int partNum,
) async {
try {
var endpoint = '$_signatureNextUrl?objectKey=$objectKey&bucket=$bucket&uploadId=$uploadId&partNum=$partNum';
final resp = await bxeApiService.get(endpoint);
return resp.data;
} catch (e) {
throw ChunkSigException('获取签名信息失败: $e');
int partNum, {
int maxRetries = 3,
}) async {
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
var endpoint = '$_signatureNextUrl?objectKey=$objectKey&bucket=$bucket&uploadId=$uploadId&partNum=$partNum';
final resp = await bxeApiService.get(endpoint);
return resp.data;
} catch (e) {
debugPrint('====================> next签名 第${attempt + 1}次失败:${e.toString()}');
if (attempt == maxRetries) {
throw ChunkSigException('获取签名信息失败: $e');
}
await Future.delayed(Duration(seconds: 2 * (attempt + 1)));
}
}
throw ChunkSigException('获取签名信息失败');
}
/// 上传段(带进度回调)
......@@ -462,32 +495,40 @@ class UploadStartHandler extends MessageHandler {
String objectKey,
String bucket,
String uploadId,
Map<int, String> tagsMap,
) async {
try {
final parts = [];
for (int i = 1; i <= tagsMap.length; i++) {
parts.add({'partNumber': i, 'etag': tagsMap[i]});
}
final response = await bxeApiService.post(_completeUrl, {
'objectKey': objectKey,
'bucket': bucket,
'uploadId': uploadId,
'parts': parts,
});
Map<int, String> tagsMap, {
int maxRetries = 3,
}) async {
final parts = [];
for (int i = 1; i <= tagsMap.length; i++) {
parts.add({'partNumber': i, 'etag': tagsMap[i]});
}
if (response.statusCode != 200) {
throw ChunkSigException('合并文件失败');
}
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
final response = await bxeApiService.post(_completeUrl, {
'objectKey': objectKey,
'bucket': bucket,
'uploadId': uploadId,
'parts': parts,
});
if (response.statusCode != 200) {
throw ChunkSigException('合并文件失败');
}
return response.data["location"];
} catch (e) {
if (e is ChunkSigException) {
rethrow;
return response.data["location"];
} catch (e) {
debugPrint('====================> merge 第${attempt + 1}次失败:${e.toString()}');
if (attempt == maxRetries) {
if (e is ChunkSigException) {
rethrow;
}
throw ChunkSigException('合并文件失败: $e');
}
await Future.delayed(Duration(seconds: 2 * (attempt + 1)));
}
throw ChunkSigException('合并文件失败: $e');
}
throw ChunkSigException('合并文件失败');
}
String _getLoginPrefix(String busi, String subBusi) {
......@@ -550,4 +591,44 @@ class UploadStartHandler extends MessageHandler {
debugPrint(e.toString());
}
}
/// 限制并发数执行任务,保持结果顺序
Future<List<T>> _runWithConcurrency<T>(
List<Future<T> Function()> taskFactories, {
int maxConcurrency = 10,
}) async {
final results = List<T?>.filled(taskFactories.length, null);
var currentIndex = 0;
var cancelled = false;
Object? firstError;
StackTrace? firstStackTrace;
Future<void> worker() async {
while (!cancelled) {
final i = currentIndex++;
if (i >= taskFactories.length) break;
try {
results[i] = await taskFactories[i]();
} catch (e, st) {
if (!cancelled) {
firstError = e;
firstStackTrace = st;
cancelled = true;
}
break;
}
}
}
final workers = List.generate(
maxConcurrency.clamp(1, taskFactories.length),
(_) => worker(),
);
await Future.wait(workers);
if (firstError != null) {
Error.throwWithStackTrace(firstError!, firstStackTrace!);
}
return results.cast<T>();
}
}
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!