Skip to content

Commit f0d209f

Browse files
oldpatrickabcavagnolo
authored andcommitted
Move to inotify_simple from unsupported pyinotify
1 parent 8771eae commit f0d209f

File tree

3 files changed

+22
-38
lines changed

3 files changed

+22
-38
lines changed

.github/workflows/actions.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ jobs:
66
runs-on: ubuntu-20.04
77
strategy:
88
matrix:
9-
python: [3.6, 3.8, 3.9]
10-
extras: ["test", "test,queable,sentry"]
9+
python: [3.6, 3.8, 3.9, "3.10", 3.12]
10+
extras: ["test", "test,queuable,sentry"]
1111
steps:
1212
- name: Setup Python
1313
uses: actions/[email protected]

client/datalake/queue.py

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,10 @@
4444
InsufficientConfiguration.'''
4545
has_queue = True
4646
try:
47-
import pyinotify
47+
import inotify_simple
4848
except ImportError:
4949
has_queue = False
5050

51-
class FakePyinotify(object):
52-
53-
class ProcessEvent(object):
54-
pass
55-
56-
pyinotify = FakePyinotify
57-
5851

5952
def requires_queue(f):
6053
def wrapped(*args, **kwargs):
@@ -124,32 +117,19 @@ def __init__(self, archive, queue_dir, callback=None):
124117
self._archive = archive
125118
self._callback = callback
126119

127-
class EventHandler(pyinotify.ProcessEvent):
128-
129-
def __init__(self, callback):
130-
super(Uploader.EventHandler, self).__init__()
131-
self.callback = callback
132-
133-
def process_IN_CLOSE_WRITE(self, event):
134-
self.callback(event.pathname)
135-
136-
def process_IN_MOVED_TO(self, event):
137-
self.callback(event.pathname)
120+
self.inotify = inotify_simple.INotify()
138121

139122
def _setup_watch_manager(self, timeout):
140-
if timeout is not None:
141-
timeout = int(timeout * 1000)
142-
self._wm = pyinotify.WatchManager()
143-
self._handler = Uploader.EventHandler(self._push)
144-
self._notifier = pyinotify.Notifier(self._wm, self._handler,
145-
timeout=timeout)
146-
self._wm.add_watch(self.queue_dir,
147-
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
123+
flags = inotify_simple.flags
124+
watch_flags = flags.CLOSE_WRITE | flags.MOVED_TO
125+
self.inotify.add_watch(self.queue_dir, watch_flags)
148126

149127
def _push(self, filename):
128+
if not os.path.isabs(filename):
129+
filename = os.path.join(self.queue_dir, filename)
150130
if os.path.basename(filename).startswith('.'):
151131
return
152-
if self._workers == []:
132+
if not self._workers:
153133
self._synchronous_push(filename)
154134
else:
155135
self._threaded_push(filename)
@@ -205,7 +185,7 @@ def _listen(self, timeout=None, workers=1):
205185
msg = 'number of upload workers cannot be zero or negative'
206186
raise InsufficientConfiguration(msg)
207187
if workers > 1:
208-
# when multipe workers are requested, the main thread monitors the
188+
# when multiple workers are requested, the main thread monitors the
209189
# queue directory and puts the files in a Queue that is serviced by
210190
# the worker threads. So the word queue is a bit overloaded in this
211191
# module.
@@ -229,12 +209,16 @@ def _create_worker(self, worker_number):
229209
def _run(self, timeout):
230210

231211
self._prepare_to_track_run_time(timeout)
232-
self._notifier.process_events()
233-
while self._notifier.check_events():
234-
self._notifier.read_events()
235-
self._notifier.process_events()
236-
if self._update_time_remaining() == 0:
237-
break
212+
if timeout is not None:
213+
timeout = int(timeout * 1000)
214+
215+
while self._update_time_remaining() > 0:
216+
for event in self.inotify.read(timeout=timeout):
217+
if event.name is None:
218+
continue
219+
self._push(event.name)
220+
if self._update_time_remaining() == 0:
221+
break
238222

239223
def _update_time_remaining(self):
240224
if self._run_time_remaining is self.INFINITY:

client/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def get_version():
4444
# the queuable feature allows users to offload their datalake pushes
4545
# to a separate uploader process.
4646
'queuable': [
47-
'pyinotify>=0.9.4',
47+
'inotify_simple>=1.3.5',
4848
],
4949
'sentry': [
5050
'raven>=5.0.0',

0 commit comments

Comments
 (0)