Skip to content

Commit bdb5f02

Browse files
authored
Merge pull request #40 from planetlabs/atomic-enqueue
Enqueue file atomically.
2 parents d8c8b2a + 1ebe45d commit bdb5f02

File tree

4 files changed

+71
-11
lines changed

4 files changed

+71
-11
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,7 @@ docs/_build/
5555

5656
# PyBuilder
5757
target/
58+
59+
# Vim
60+
*.swp
61+
*.un~

datalake/dlfile.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def _validate_bundle(bundle_filename):
164164
def _validate_bundle_version(bundle):
165165
v = File._get_content_from_bundle(bundle, 'version').decode('utf-8')
166166
if v != File.DATALAKE_BUNDLE_VERSION:
167-
msg = '{} has unsupported bundle version {}.'
167+
msg = '{} has unsupported bundle version {}'
168168
msg = msg.format(bundle.name, v)
169169
raise InvalidDatalakeBundle(msg)
170170

@@ -200,16 +200,22 @@ def to_bundle(self, bundle_filename):
200200
Args:
201201
bundle_filename: output file
202202
'''
203-
t = tarfile.open(bundle_filename, 'w')
204-
self._add_fd_to_tar(t, 'content', self._fd)
205-
self._add_string_to_tar(t, 'version', self.DATALAKE_BUNDLE_VERSION)
206-
self._add_string_to_tar(t, 'datalake-metadata.json',
207-
self.metadata.json)
208-
t.close()
203+
temp_filename = self._dot_filename(bundle_filename)
204+
with open(temp_filename, 'wb') as f:
205+
t = tarfile.open(fileobj=f, mode='w')
206+
self._add_fd_to_tar(t, 'content', self._fd)
207+
self._add_string_to_tar(t, 'version', self.DATALAKE_BUNDLE_VERSION)
208+
self._add_string_to_tar(t, 'datalake-metadata.json',
209+
self.metadata.json)
210+
os.rename(temp_filename, bundle_filename)
209211

210212
# reset the file pointer in case somebody else wants to read us.
211213
self.seek(0, 0)
212214

215+
def _dot_filename(self, path):
216+
return os.path.join(os.path.dirname(path),
217+
'.{}'.format(os.path.basename(path)))
218+
213219
def _add_string_to_tar(self, tfile, arcname, data):
214220
s = BytesIO(data.encode('utf-8'))
215221
info = tarfile.TarInfo(name=arcname)

datalake/queue.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
reason, the file remains in the queue.
2727
'''
2828
from os import environ
29+
import os
2930
from datalake_common.errors import InsufficientConfiguration
3031
from logging import getLogger
31-
import os
3232
import time
3333

34-
from datalake import File
34+
from datalake import File, InvalidDatalakeBundle
3535

3636

3737
'''whether or not queue feature is available
@@ -132,7 +132,14 @@ def _setup_watch_manager(self, timeout):
132132
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
133133

134134
def _push(self, filename):
135-
f = File.from_bundle(filename)
135+
if os.path.basename(filename).startswith('.'):
136+
return
137+
try:
138+
f = File.from_bundle(filename)
139+
except InvalidDatalakeBundle as e:
140+
msg = '{}. Skipping upload.'.format(e.args[0])
141+
log.exception(msg)
142+
return
136143
url = self._archive.push(f)
137144
msg = 'Pushed {}({}) to {}'.format(filename, f.metadata['path'], url)
138145
log.info(msg)

test/test_queue.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import os
1919
from datalake_common.tests import random_word
2020
from datalake_common.errors import InsufficientConfiguration
21-
from datalake import Enqueuer, Uploader
21+
from datalake import Enqueuer, Uploader, InvalidDatalakeBundle
2222
from datalake.queue import has_queue
2323
from conftest import crtime_setuid
2424
from gzip import GzipFile
@@ -71,6 +71,15 @@ def validator(f):
7171
return validator
7272

7373

74+
@pytest.fixture
75+
def assert_s3_bucket_empty(s3_bucket):
76+
77+
def asserter():
78+
assert len([k for k in s3_bucket.list()]) == 0
79+
80+
return asserter
81+
82+
7483
@pytest.fixture
7584
def random_file(tmpfile, random_metadata):
7685
expected_content = random_word(100)
@@ -103,6 +112,40 @@ def enqueue():
103112
uploaded_file_validator(f)
104113

105114

115+
@pytest.mark.skipif(not has_queue, reason='requires queuable features')
116+
def test_skip_incoming_dotfile(random_file, queue_dir, uploader,
117+
assert_s3_bucket_empty):
118+
119+
def enqueue():
120+
enqueued_name = os.path.join(queue_dir, '.ignoreme')
121+
os.rename(str(random_file), enqueued_name)
122+
123+
t = Timer(0.5, enqueue)
124+
t.start()
125+
uploader.listen(timeout=1.0)
126+
127+
assert_s3_bucket_empty()
128+
129+
130+
@pytest.mark.skipif(not has_queue, reason='requires queuable features')
131+
def test_skip_invalid_bundles(random_file, queue_dir, uploader,
132+
assert_s3_bucket_empty):
133+
134+
def enqueue():
135+
enqueued_name = os.path.join(queue_dir, 'invalid-bundle')
136+
os.rename(str(random_file), enqueued_name)
137+
138+
t = Timer(0.5, enqueue)
139+
t.start()
140+
141+
try:
142+
uploader.listen(timeout=1.0)
143+
except InvalidDatalakeBundle:
144+
pytest.fail("Didn't catch InvalidDatalakeBundle exception.")
145+
146+
assert_s3_bucket_empty()
147+
148+
106149
@pytest.mark.skipif(not has_queue, reason='requires queuable features')
107150
def test_upload_existing_cli(cli_tester, random_file, random_metadata,
108151
uploaded_content_validator, queue_dir):

0 commit comments

Comments
 (0)