Skip to content
Open
98 changes: 94 additions & 4 deletions examples/object_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import oss2
from oss2.crypto import LocalRsaProvider, AliKMSProvider

# 以下代码展示了客户端文件加密上传下载的用法,如下载文件、上传文件等,注意在客户端加密的条件下,oss暂不支持文件分片上传下载操作
# 以下代码展示了客户端文件加密上传下载的用法,如下载文件、上传文件等。


# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
Expand All @@ -31,7 +31,7 @@
filename = 'download.txt'


# 创建Bucket对象,可以进行客户端数据加密(用户端RSA),此模式下只提供对象整体上传下载操作
# 创建Bucket对象,可以进行客户端数据加密(用户端RSA)
bucket = oss2.CryptoBucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name, crypto_provider=LocalRsaProvider())

key1 = 'motto-copy.txt'
Expand Down Expand Up @@ -62,8 +62,53 @@

os.remove(filename)

# 下载部分文件
result = bucket.get_object(key, byte_range=(32,1024))

# 创建Bucket对象,可以进行客户端数据加密(使用阿里云KMS),此模式下只提供对象整体上传下载操作
#验证一下
content_got = b''
for chunk in result:
content_got +=chunk
assert content_got == content[32:1025]


"""
分片上传
"""
# 初始化上传分片
part_a = b'a' * 1024 * 100
part_b = b'b' * 1024 * 100
part_c = b'c' * 1024 * 100
multi_content = [part_a, part_b, part_c]

parts = []
data_size = 100 * 1024 * 3
part_size = 100 * 1024
multi_key = "test_crypto_multipart"

res = bucket.init_multipart_upload_securely(multi_key, data_size, part_size)
upload_id = res.upload_id

# 分片上传
for i in range(3):
result = bucket.upload_part_securely(multi_key, upload_id, i+1, multi_content[i])
parts.append(oss2.models.PartInfo(i+1, result.etag, size = part_size, part_crc = result.crc))

# 完成上传
result = bucket.complete_multipart_upload_securely(multi_key, upload_id, parts)

# 下载全部文件
result = bucket.get_object(multi_key)

# 验证一下
content_got = b''
for chunk in result:
content_got += chunk
assert content_got[0:102400] == part_a
assert content_got[102400:204800] == part_b
assert content_got[204800:307200] == part_c

# 创建Bucket对象,可以进行客户端数据加密(使用阿里云KMS)
bucket = oss2.CryptoBucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name,
crypto_provider=AliKMSProvider(access_key_id, access_key_secret, region, cmk, '1234'))

Expand Down Expand Up @@ -93,4 +138,49 @@
with open(filename, 'rb') as fileobj:
assert fileobj.read() == content

os.remove(filename)
os.remove(filename)

# 下载部分文件
result = bucket.get_object(key, byte_range=(32,1024))

#验证一下
content_got = b''
for chunk in result:
content_got +=chunk
assert content_got == content[32:1025]

"""
分片上传
"""
# 初始化上传分片
part_a = b'a' * 1024 * 100
part_b = b'b' * 1024 * 100
part_c = b'c' * 1024 * 100
multi_content = [part_a, part_b, part_c]

parts = []
data_size = 100 * 1024 * 3
part_size = 100 * 1024
multi_key = "test_crypto_multipart"

res = bucket.init_multipart_upload_securely(multi_key, data_size, part_size)
upload_id = res.upload_id

# 分片上传
for i in range(3):
result = bucket.upload_part_securely(multi_key, upload_id, i+1, multi_content[i])
parts.append(oss2.models.PartInfo(i+1, result.etag, size = part_size, part_crc = result.crc))

# 完成上传
result = bucket.complete_multipart_upload_securely(multi_key, upload_id, parts)

# 下载全部文件
result = bucket.get_object(multi_key)

# 验证一下
content_got = b''
for chunk in result:
content_got += chunk
assert content_got[0:102400] == part_a
assert content_got[102400:204800] == part_b
assert content_got[204800:307200] == part_c
169 changes: 167 additions & 2 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def progress_callback(bytes_consumed, total_bytes):
from .crypto import BaseCryptoProvider
from .headers import *

from .utils import calc_aes_ctr_offset_by_data_offset, is_valid_crypto_part_size, determine_crypto_part_size

import time
import shutil
import base64
Expand Down Expand Up @@ -1646,6 +1648,7 @@ def __init__(self, auth, endpoint, bucket_name, crypto_provider,
self.enable_crc = enable_crc
self.bucket = Bucket(auth, endpoint, bucket_name, is_cname, session, connect_timeout,
app_name, enable_crc=False)
self.multipart_upload_contexts = {}

def put_object(self, key, data,
headers=None,
Expand Down Expand Up @@ -1703,6 +1706,7 @@ def put_object_from_file(self, key, filename,
return self.put_object(key, f, headers=headers, progress_callback=progress_callback)

def get_object(self, key,
byte_range=None,
headers=None,
progress_callback=None,
params=None):
Expand All @@ -1715,6 +1719,7 @@ def get_object(self, key,
'hello world'

:param key: 文件名
:param byte_range: 指定下载范围。参见 :ref:`byte_range`

:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
Expand All @@ -1730,8 +1735,12 @@ def get_object(self, key,
"""
headers = http.CaseInsensitiveDict(headers)

if 'range' in headers:
raise ClientError('Crypto bucket do not support range get')
if byte_range and (not utils.is_multiple_sizeof_encrypt_block(byte_range[0])):
raise ClientError('Crypto bucket get range start must align to encrypt block')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里把出错的byte_range打出来,方便debug

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

恩,这个版本先把byte_range打印出来,debug的时候可以看的比较清楚,另外range只支持对齐要在文档的说明。如果后续用户有不对齐的需求,后续版本可能还要考虑是否可以兼容?


range_string = _make_range_string(byte_range)
if range_string:
headers['range'] = range_string

encrypted_result = self.bucket.get_object(key, headers=headers, params=params, progress_callback=None)

Expand Down Expand Up @@ -1768,6 +1777,162 @@ def get_object_to_file(self, key, filename,

return result

def init_multipart_upload_securely(self, key, data_size, part_size = None, headers=None):
"""客户端加密初始化分片上传。

返回值中的 `upload_id` 以及Bucket名和Object名三元组唯一对应了此次分片上传事件。
返回值中的 `part_size` 限制了后续分片上传中除最后一个分片之外其他分片大小必须一致
返回值中的 `part_number` 限制了后续分片上传分片总数目,未完全上传不允许complete操作

:param str key: 待上传的文件名
:param int data_size : 待上传文件总大小
:param int part_size : 后续分片上传时除最后一个分片之外的其他分片大小

:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict

:return: :class:`InitMultipartUploadResult <oss2.models.InitMultipartUploadResult>`
"""
if part_size is not None:
res = is_valid_crypto_part_size(part_size, data_size)
if not res:
raise ClientError("Crypto bucket get an invalid part_size")
else:
part_size = determine_crypto_part_size(data_size)

logger.info("Start to init multipart upload securely, data_size: {0}, part_size: {1}".format(data_size, part_size))

crypto_key = self.crypto_provider.get_key()
crypto_start = self.crypto_provider.get_start()

part_number = int((data_size - 1) / part_size + 1)
context = CryptoMultipartContext(crypto_key, crypto_start, part_size, part_number, data_size)

headers = self.crypto_provider.build_header(headers, context)

resp = self.bucket.init_multipart_upload(key, headers)
resp.part_size = context.part_size
resp.part_number = context.part_number

context.upload_id = resp.upload_id
self.multipart_upload_contexts[resp.upload_id] = context

logger.info("Init multipart upload securely done, upload_id = {0} put into local contexts".format(context.upload_id))

return resp

def upload_part_securely(self, key, upload_id, part_number, data, progress_callback=None, headers=None):
"""客户端加密上传一个分片。

:param str key: 待上传文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。
:param str upload_id: 分片上传ID
:param int part_number: 分片号,最小值是1.
:param data: 待上传数据。
:param progress_callback: 用户指定进度回调函数。可以用来实现进度条等功能。参考 :ref:`progress_callback` 。

:param headers: 用户指定的HTTP头部。可以指定Content-MD5头部等
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict

:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
logger.info("Start upload part securely, upload_id = {0}, part_number = {1}".format(upload_id, part_number))
try:
context = self.multipart_upload_contexts[upload_id]
except:
raise ClientError("Crypto bucket can't find the upload_id in local contexts")

if len(data) != context.part_size and part_number != context.part_number:
raise ClientError("Please upload part with correct size unless the last part")

crypto_key = context.crypto_key
start = context.crypto_start
offset = context.part_size * (part_number - 1)
count_offset = utils.calc_aes_ctr_offset_by_data_offset(offset)

data = self.crypto_provider.make_encrypt_adapter(data, crypto_key, start, count_offset=count_offset)
if self.enable_crc:
data = utils.make_crc_adapter(data)

resp = self.bucket.upload_part(key, upload_id, part_number, data, progress_callback, headers)

context.uploaded_parts.add(part_number)
logger.info("Upload part securely done, the part {0} already put into local contexts".format(part_number))

return resp


def complete_multipart_upload_securely(self, key, upload_id, parts, headers=None):
"""客户端加密完成分片上传,创建文件。
当所有分片均已上传成功,才可以调用此函数

:param str key: 待上传的文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。
:param str upload_id: 分片上传ID

:param parts: PartInfo列表。PartInfo中的part_number和etag是必填项。其中的etag可以从 :func:`upload_part` 的返回值中得到。
:type parts: list of `PartInfo <oss2.models.PartInfo>`

:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict

:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
logger.info("Start complete multipart upload securely, upload_id = {0}".format(upload_id))
try:
context = self.multipart_upload_contexts[upload_id]
except:
raise ClientError("Crypto bucket can't find the upload_id in local contexts")

if len(context.uploaded_parts) != context.part_number:
raise ClientError("Incomplete parts uploaded in local contexts")

res = self.bucket.complete_multipart_upload(key, upload_id, parts, headers)
del self.multipart_upload_contexts[upload_id]

logger.info("Complete multipart upload securely done, upload_id = {0} remove from local contexts".format(upload_id))

return res

def abort_multipart_upload_securely(self, key, upload_id):
"""取消分片上传。

:param str key: 待上传的文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。
:param str upload_id: 分片上传ID

:return: :class:`RequestResult <oss2.models.RequestResult>`
"""
logger.info("Start abort multipart upload securely, upload_id = {0}".format(upload_id))
try:
context = self.multipart_upload_contexts[upload_id]
except:
raise ClientError("Crypto bucket can't find the upload_id in local contexts")

res = self.bucket.abort_multipart_upload(key, upload_id)
del self.multipart_upload_contexts[upload_id]

logger.info("Abort multipart upload securely done, upload_id = {0} remove from local contexts".format(upload_id))

return res

def list_parts_securely(self, key, upload_id,
marker='', max_parts=1000):
"""列举已经上传的分片。支持分页。

:param str key: 文件名
:param str upload_id: 分片上传ID
:param str marker: 分页符
:param int max_parts: 一次最多罗列多少分片

:return: :class:`ListPartsResult <oss2.models.ListPartsResult>`
"""
logger.info("Start list parts securely, upload_id = {0}".format(upload_id))
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

缺少uploadPartCopy接口?

context = self.multipart_upload_contexts[upload_id]
except:
raise ClientError("Crypto bucket can't find the upload_id in local contexts")

res = self.bucket.list_parts(key, upload_id, marker = marker, max_parts = max_parts)
logger.info("List parts securely done, upload_id = {0}".format(upload_id))
return res

def _normalize_endpoint(endpoint):
if not endpoint.startswith('http://') and not endpoint.startswith('https://'):
Expand Down
Loading