Skip to content

Commit c6d4a97

Browse files
author
Rakshil Modi
committed
Updating S3 Transfer Manager for deletion
Adding response to delete handler
1 parent 5d37f3c commit c6d4a97

File tree

5 files changed

+190
-1
lines changed

5 files changed

+190
-1
lines changed

awscli/s3transfer/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,4 @@
3636

3737
USER_AGENT = f's3transfer/{s3transfer.__version__}'
3838
PROCESS_USER_AGENT = f'{USER_AGENT} processpool'
39+
MAX_BATCH_SIZE = 1000

awscli/s3transfer/delete.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
1111
# ANY KIND, either express or implied. See the License for the specific
1212
# language governing permissions and limitations under the License.
13+
1314
from s3transfer.tasks import SubmissionTask, Task
1415

1516

@@ -69,3 +70,78 @@ def _main(self, client, bucket, key, extra_args):
6970
7071
"""
7172
client.delete_object(Bucket=bucket, Key=key, **extra_args)
73+
74+
75+
class BatchDeleteSubmissionTask(SubmissionTask):
76+
"""Task for submitting tasks to execute batch deletion of S3 objects"""
77+
78+
def _submit(self, client, request_executor, transfer_future, **kwargs):
79+
"""
80+
:param client: The client associated with the transfer manager
81+
82+
:type config: s3transfer.manager.TransferConfig
83+
:param config: The transfer config associated with the transfer
84+
manager
85+
86+
:type osutil: s3transfer.utils.OSUtil
87+
:param osutil: The os utility associated to the transfer manager
88+
89+
:type request_executor: s3transfer.futures.BoundedExecutor
90+
:param request_executor: The request executor associated with the
91+
transfer manager
92+
93+
:type transfer_future: s3transfer.futures.TransferFuture
94+
:param transfer_future: The transfer future associated with the
95+
transfer request that tasks are being submitted for
96+
"""
97+
call_args = transfer_future.meta.call_args
98+
99+
# Extract the objects to delete from the call_args
100+
bucket = call_args.bucket
101+
objects = call_args.objects
102+
extra_args = call_args.extra_args or {}
103+
104+
self._transfer_coordinator.submit(
105+
request_executor,
106+
BatchDeleteObjectsTask(
107+
transfer_coordinator=self._transfer_coordinator,
108+
main_kwargs={
109+
'client': client,
110+
'bucket': bucket,
111+
'objects': objects,
112+
'extra_args': extra_args,
113+
},
114+
is_final=True,
115+
),
116+
)
117+
118+
119+
class BatchDeleteObjectsTask(Task):
120+
def _main(self, client, bucket, objects, extra_args):
121+
"""
122+
Delete multiple objects in a single API call
123+
124+
:param client: The S3 client to use when calling DeleteObjects
125+
126+
:type bucket: str
127+
:param bucket: The name of the bucket.
128+
129+
:type objects: list
130+
:param objects: The list of objects to delete.
131+
132+
:type extra_args: dict
133+
:param extra_args: Extra arguments to pass to the DeleteObjects call.
134+
"""
135+
# Create a copy of extra_args to avoid modifying the original
136+
extra_args_copy = extra_args.copy()
137+
138+
# Create the request body
139+
delete_request = {
140+
'Objects': objects,
141+
'Quiet': False,
142+
}
143+
144+
response = client.delete_objects(
145+
Bucket=bucket, Delete=delete_request, **extra_args_copy
146+
)
147+
return response

awscli/s3transfer/manager.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
MB,
2525
)
2626
from s3transfer.copies import CopySubmissionTask
27-
from s3transfer.delete import DeleteSubmissionTask
27+
from s3transfer.delete import BatchDeleteSubmissionTask, DeleteSubmissionTask
2828
from s3transfer.download import DownloadSubmissionTask
2929
from s3transfer.exceptions import CancelledError, FatalError
3030
from s3transfer.futures import (
@@ -665,6 +665,26 @@ def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
665665
self._request_executor.shutdown()
666666
self._io_executor.shutdown()
667667

668+
def batch_delete(self, bucket, objects, extra_args=None, subscribers=None):
669+
"""Delete multiple S3 objects in batches"""
670+
if extra_args is None:
671+
extra_args = {}
672+
if subscribers is None:
673+
subscribers = []
674+
# Version ID is not included in input shape.
675+
allowed_delete_objects_args = [
676+
arg for arg in self.ALLOWED_DELETE_ARGS if arg != 'VersionId'
677+
]
678+
self._validate_all_known_args(extra_args, allowed_delete_objects_args)
679+
self._validate_if_bucket_supported(bucket)
680+
call_args = CallArgs(
681+
bucket=bucket,
682+
objects=objects,
683+
extra_args=extra_args,
684+
subscribers=subscribers,
685+
)
686+
return self._submit_transfer(call_args, BatchDeleteSubmissionTask)
687+
668688

669689
class TransferCoordinatorController:
670690
def __init__(self):

tests/functional/s3transfer/test_delete.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,76 @@ def test_raise_exception_on_s3_object_lambda_resource(self):
7474
)
7575
with self.assertRaisesRegex(ValueError, 'methods do not support'):
7676
self.manager.delete(s3_object_lambda_arn, self.key)
77+
78+
79+
class TestBatchDelete(BaseGeneralInterfaceTest):
80+
__test__ = True
81+
82+
def setUp(self):
83+
super().setUp()
84+
self.bucket = 'mybucket'
85+
self.key = 'mykey'
86+
self.objects = [
87+
{'Key': self.key, 'VersionId': 'version1'},
88+
{'Key': self.key, 'VersionId': 'deletemarker1'},
89+
]
90+
self.manager = TransferManager(self.client)
91+
92+
@property
93+
def method(self):
94+
"""The transfer manager method to invoke i.e. upload()"""
95+
return self.manager.batch_delete
96+
97+
def create_call_kwargs(self):
98+
"""The kwargs to be passed to the transfer manager method"""
99+
return {
100+
'bucket': self.bucket,
101+
'objects': self.objects,
102+
}
103+
104+
def create_invalid_extra_args(self):
105+
return {
106+
'BadKwargs': True,
107+
}
108+
109+
def create_stubbed_responses(self):
110+
"""A list of stubbed responses that will cause the request to succeed
111+
112+
The elements of this list is a dictionary that will be used as key
113+
word arguments to botocore.Stubber.add_response(). For example::
114+
115+
[{'method': 'delete_objects', 'service_response': {}}]
116+
"""
117+
return [
118+
{
119+
'method': 'delete_objects',
120+
'service_response': {},
121+
'expected_params': {
122+
'Bucket': self.bucket,
123+
'Delete': {'Objects': self.objects, 'Quiet': False},
124+
},
125+
}
126+
]
127+
128+
def create_expected_progress_callback_info(self):
129+
return []
130+
131+
def test_known_allowed_args_in_input_shape(self):
132+
op_model = self.client.meta.service_model.operation_model(
133+
'DeleteObjects'
134+
)
135+
allowed_delete_objects_args = [
136+
arg
137+
for arg in self.manager.ALLOWED_DELETE_ARGS
138+
if arg != 'VersionId'
139+
]
140+
for allowed_arg in allowed_delete_objects_args:
141+
self.assertIn(allowed_arg, op_model.input_shape.members)
142+
143+
def test_raise_exception_on_s3_object_lambda_resource(self):
144+
s3_object_lambda_arn = (
145+
'arn:aws:s3-object-lambda:us-west-2:123456789012:'
146+
'accesspoint:my-accesspoint'
147+
)
148+
with self.assertRaisesRegex(ValueError, 'methods do not support'):
149+
self.manager.batch_delete(s3_object_lambda_arn, self.key)

tests/integration/s3transfer/test_delete.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,22 @@ def test_can_delete_object(self):
2626
future.result()
2727

2828
self.assertTrue(self.object_not_exists(key_name))
29+
30+
31+
class TestBatchDeleteObject(BaseTransferManagerIntegTest):
32+
def test_can_delete_versioned_objects(self):
33+
key_name = 'mykey'
34+
self.client.put_object(
35+
Bucket=self.bucket_name, Key=key_name, Body=b'hello world'
36+
)
37+
response = self.client.list_object_versions(Bucket=self.bucket_name)
38+
version_id = response['Versions'][0]['VersionId']
39+
objects = [{'Key': key_name, 'VersionId': version_id}]
40+
transfer_manager = self.create_transfer_manager()
41+
future = transfer_manager.batch_delete(
42+
bucket=self.bucket_name, objects=objects
43+
)
44+
45+
future.result()
46+
47+
self.assertTrue(self.object_not_exists(key_name))

0 commit comments

Comments
 (0)