-
Notifications
You must be signed in to change notification settings - Fork 361
DNM support client crypto multipart upload #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 8 commits
4b07044
7c90852
2ac86a6
ce4199c
c7a0551
5e9b27e
8b56d0d
651d73c
87b6f4b
0df71c2
9a22e8c
0c6782b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -177,6 +177,8 @@ def progress_callback(bytes_consumed, total_bytes): | |
| from .crypto import BaseCryptoProvider | ||
| from .headers import * | ||
|
|
||
| from .utils import calc_aes_ctr_offset_by_data_offset, is_valid_crypto_part_size, determine_crypto_part_size | ||
|
|
||
| import time | ||
| import shutil | ||
| import base64 | ||
|
|
@@ -1646,6 +1648,7 @@ def __init__(self, auth, endpoint, bucket_name, crypto_provider, | |
| self.enable_crc = enable_crc | ||
| self.bucket = Bucket(auth, endpoint, bucket_name, is_cname, session, connect_timeout, | ||
| app_name, enable_crc=False) | ||
| self.multipart_upload_contexts = {} | ||
|
|
||
| def put_object(self, key, data, | ||
| headers=None, | ||
|
|
@@ -1703,6 +1706,7 @@ def put_object_from_file(self, key, filename, | |
| return self.put_object(key, f, headers=headers, progress_callback=progress_callback) | ||
|
|
||
| def get_object(self, key, | ||
| byte_range=None, | ||
| headers=None, | ||
| progress_callback=None, | ||
| params=None): | ||
|
|
@@ -1715,6 +1719,7 @@ def get_object(self, key, | |
| 'hello world' | ||
|
|
||
| :param key: 文件名 | ||
| :param byte_range: 指定下载范围。参见 :ref:`byte_range` | ||
|
|
||
| :param headers: HTTP头部 | ||
| :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict | ||
|
|
@@ -1730,8 +1735,12 @@ def get_object(self, key, | |
| """ | ||
| headers = http.CaseInsensitiveDict(headers) | ||
|
|
||
| if 'range' in headers: | ||
| raise ClientError('Crypto bucket do not support range get') | ||
| if byte_range and (not utils.is_multiple_sizeof_encrypt_block(byte_range[0])): | ||
| raise ClientError('Crypto bucket get range start must align to encrypt block') | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里把出错的byte_range打出来,方便debug
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 恩,这个版本先把byte_range打印出来,debug的时候可以看的比较清楚,另外range只支持对齐要在文档的说明。如果后续用户有不对齐的需求,后续版本可能还要考虑是否可以兼容? |
||
|
|
||
| range_string = _make_range_string(byte_range) | ||
| if range_string: | ||
| headers['range'] = range_string | ||
|
|
||
| encrypted_result = self.bucket.get_object(key, headers=headers, params=params, progress_callback=None) | ||
|
|
||
|
|
@@ -1768,6 +1777,162 @@ def get_object_to_file(self, key, filename, | |
|
|
||
| return result | ||
|
|
||
| def init_multipart_upload_securely(self, key, data_size, part_size = None, headers=None): | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """客户端加密初始化分片上传。 | ||
|
|
||
| 返回值中的 `upload_id` 以及Bucket名和Object名三元组唯一对应了此次分片上传事件。 | ||
| 返回值中的 `part_size` 限制了后续分片上传中除最后一个分片之外其他分片大小必须一致 | ||
| 返回值中的 `part_number` 限制了后续分片上传分片总数目,未完全上传不允许complete操作 | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| :param str key: 待上传的文件名 | ||
| :param int data_size : 待上传文件总大小 | ||
| :param int part_size : 后续分片上传时除最后一个分片之外的其他分片大小 | ||
|
|
||
| :param headers: HTTP头部 | ||
| :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict | ||
|
|
||
| :return: :class:`InitMultipartUploadResult <oss2.models.InitMultipartUploadResult>` | ||
| """ | ||
| if part_size is not None: | ||
| res = is_valid_crypto_part_size(part_size, data_size) | ||
| if not res: | ||
| raise ClientError("Crypto bucket get an invalid part_size") | ||
| else: | ||
| part_size = determine_crypto_part_size(data_size) | ||
|
|
||
| logger.info("Start to init multipart upload securely, data_size: {0}, part_size: {1}".format(data_size, part_size)) | ||
|
|
||
| crypto_key = self.crypto_provider.get_key() | ||
| crypto_start = self.crypto_provider.get_start() | ||
|
|
||
| part_number = int((data_size - 1) / part_size + 1) | ||
| context = CryptoMultipartContext(crypto_key, crypto_start, part_size, part_number, data_size) | ||
|
|
||
| headers = self.crypto_provider.build_header(headers, context) | ||
|
|
||
| resp = self.bucket.init_multipart_upload(key, headers) | ||
| resp.part_size = context.part_size | ||
| resp.part_number = context.part_number | ||
|
|
||
| context.upload_id = resp.upload_id | ||
| self.multipart_upload_contexts[resp.upload_id] = context | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| logger.info("Init multipart upload securely done, upload_id = {0} put into local contexts".format(context.upload_id)) | ||
|
|
||
| return resp | ||
|
|
||
| def upload_part_securely(self, key, upload_id, part_number, data, progress_callback=None, headers=None): | ||
| """客户端加密上传一个分片。 | ||
|
|
||
| :param str key: 待上传文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。 | ||
| :param str upload_id: 分片上传ID | ||
| :param int part_number: 分片号,最小值是1. | ||
| :param data: 待上传数据。 | ||
| :param progress_callback: 用户指定进度回调函数。可以用来实现进度条等功能。参考 :ref:`progress_callback` 。 | ||
|
|
||
| :param headers: 用户指定的HTTP头部。可以指定Content-MD5头部等 | ||
| :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict | ||
|
|
||
| :return: :class:`PutObjectResult <oss2.models.PutObjectResult>` | ||
| """ | ||
| logger.info("Start upload part securely, upload_id = {0}, part_number = {1}".format(upload_id, part_number)) | ||
| try: | ||
| context = self.multipart_upload_contexts[upload_id] | ||
| except: | ||
| raise ClientError("Crypto bucket can't find the upload_id in local contexts") | ||
yangwanyuan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if len(data) != context.part_size and part_number != context.part_number: | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| raise ClientError("Please upload part with correct size unless the last part") | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| crypto_key = context.crypto_key | ||
| start = context.crypto_start | ||
| offset = context.part_size * (part_number - 1) | ||
| count_offset = utils.calc_aes_ctr_offset_by_data_offset(offset) | ||
|
|
||
| data = self.crypto_provider.make_encrypt_adapter(data, crypto_key, start, count_offset=count_offset) | ||
yangwanyuan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if self.enable_crc: | ||
| data = utils.make_crc_adapter(data) | ||
|
|
||
| resp = self.bucket.upload_part(key, upload_id, part_number, data, progress_callback, headers) | ||
|
|
||
| context.uploaded_parts.add(part_number) | ||
| logger.info("Upload part securely done, the part {0} already put into local contexts".format(part_number)) | ||
|
|
||
| return resp | ||
|
|
||
|
|
||
| def complete_multipart_upload_securely(self, key, upload_id, parts, headers=None): | ||
| """客户端加密完成分片上传,创建文件。 | ||
| 当所有分片均已上传成功,才可以调用此函数 | ||
|
|
||
| :param str key: 待上传的文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。 | ||
| :param str upload_id: 分片上传ID | ||
|
|
||
| :param parts: PartInfo列表。PartInfo中的part_number和etag是必填项。其中的etag可以从 :func:`upload_part` 的返回值中得到。 | ||
| :type parts: list of `PartInfo <oss2.models.PartInfo>` | ||
|
|
||
| :param headers: HTTP头部 | ||
| :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict | ||
|
|
||
| :return: :class:`PutObjectResult <oss2.models.PutObjectResult>` | ||
| """ | ||
| logger.info("Start complete multipart upload securely, upload_id = {0}".format(upload_id)) | ||
| try: | ||
| context = self.multipart_upload_contexts[upload_id] | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| except: | ||
| raise ClientError("Crypto bucket can't find the upload_id in local contexts") | ||
|
|
||
| if len(context.uploaded_parts) != context.part_number: | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| raise ClientError("Incomplete parts uploaded in local contexts") | ||
|
|
||
| res = self.bucket.complete_multipart_upload(key, upload_id, parts, headers) | ||
| del self.multipart_upload_contexts[upload_id] | ||
|
|
||
| logger.info("Complete multipart upload securely done, upload_id = {0} remove from local contexts".format(upload_id)) | ||
|
|
||
| return res | ||
|
|
||
| def abort_multipart_upload_securely(self, key, upload_id): | ||
| """取消分片上传。 | ||
|
|
||
| :param str key: 待上传的文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。 | ||
| :param str upload_id: 分片上传ID | ||
|
|
||
| :return: :class:`RequestResult <oss2.models.RequestResult>` | ||
| """ | ||
| logger.info("Start abort multipart upload securely, upload_id = {0}".format(upload_id)) | ||
| try: | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| context = self.multipart_upload_contexts[upload_id] | ||
| except: | ||
| raise ClientError("Crypto bucket can't find the upload_id in local contexts") | ||
|
|
||
| res = self.bucket.abort_multipart_upload(key, upload_id) | ||
| del self.multipart_upload_contexts[upload_id] | ||
|
|
||
| logger.info("Abort multipart upload securely done, upload_id = {0} remove from local contexts".format(upload_id)) | ||
|
|
||
| return res | ||
|
|
||
| def list_parts_securely(self, key, upload_id, | ||
| marker='', max_parts=1000): | ||
| """列举已经上传的分片。支持分页。 | ||
|
|
||
| :param str key: 文件名 | ||
| :param str upload_id: 分片上传ID | ||
| :param str marker: 分页符 | ||
| :param int max_parts: 一次最多罗列多少分片 | ||
|
|
||
| :return: :class:`ListPartsResult <oss2.models.ListPartsResult>` | ||
| """ | ||
| logger.info("Start list parts securely, upload_id = {0}".format(upload_id)) | ||
| try: | ||
hangzws marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| context = self.multipart_upload_contexts[upload_id] | ||
| except: | ||
| raise ClientError("Crypto bucket can't find the upload_id in local contexts") | ||
|
|
||
| res = self.bucket.list_parts(key, upload_id, marker = marker, max_parts = max_parts) | ||
| logger.info("List parts securely done, upload_id = {0}".format(upload_id)) | ||
| return res | ||
|
|
||
| def _normalize_endpoint(endpoint): | ||
| if not endpoint.startswith('http://') and not endpoint.startswith('https://'): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.