Skip to content
This repository was archived by the owner on Apr 12, 2018. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
772bd64
Add signals module with register() function #7
Mar 19, 2014
292a96c
Update signals: add unregister() #7
Mar 19, 2014
3f93f91
Update process #7: use signals.register() in ProcessOpen
Mar 19, 2014
fc00a8b
Update signals #7: move signal constants into signals.constants
Mar 19, 2014
d88e1b7
Update signals: update the signature of handler
ggreg Mar 21, 2014
8934296
Add module signals.registry
ggreg Mar 21, 2014
dadd536
Add module signals.process #7: dispatch a signal to the PID of a proc…
ggreg Mar 21, 2014
ab41dc2
Update pool: check if process.pit is None in on_process_exit()
ggreg Apr 15, 2014
f026e72
Add signals.base.reset(): empty SIGNAL_HANDLERS
ggreg Apr 15, 2014
cc254c8
Fix signals.base: avoid signum key with empty list of handlers
ggreg Apr 15, 2014
89bec8f
Fix signals.process
ggreg Apr 15, 2014
f0208c1
Fix signals.registry
ggreg Apr 15, 2014
4ad8a7c
Update utils.wait()
ggreg Apr 15, 2014
c1f147c
Fix tests.signals.test_base: move to pytest
ggreg Apr 15, 2014
14603bb
Fix tests.signals.test_process: replace os.waitpid() by os.wait()
ggreg Apr 15, 2014
3e12585
Update test_pool: refactoring
ggreg Apr 15, 2014
77dadc6
Fix test_process: refactoring
ggreg Apr 15, 2014
0db15e4
Update process: do not modify the Process instance from ProcessOpen
ggreg Apr 15, 2014
5185546
Update process: use signals.process to bind signals
ggreg Apr 15, 2014
2dbb3fb
Update process: add Process.exit() method
ggreg Apr 15, 2014
66472cd
Fix Process.wait()
ggreg Apr 15, 2014
dd8d212
Fix Process.is_alive
ggreg Apr 15, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkit/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def terminate(self, wait=False):
task.process.terminate(wait=wait)

def on_process_exit(self, process):
if process.pid is None: # Already handled.
return

self.slots.release()

pid = process.pid
Expand Down
66 changes: 41 additions & 25 deletions pkit/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import traceback


from pkit import signals


JOIN_RESTART_POLICY = 0
TERMINATE_RESTART_POLICY = 1

Expand Down Expand Up @@ -54,7 +57,8 @@ def __init__(self, process, wait=False, wait_timeout=1):

self.pid = os.fork()
if self.pid == 0:
signal.signal(signal.SIGTERM, self.on_sigterm)
signals.register(signal.SIGTERM, self.on_sigterm)

# Once the child process has it's signal handler
# binded we warn the parent process through a pipe
if wait is True:
Expand Down Expand Up @@ -155,8 +159,6 @@ def wait(self, timeout=None):
delay = min(delay * 2, remaining, 0.05)
time.sleep(delay)

self.process.clean()

return returncode

def terminate(self):
Expand Down Expand Up @@ -217,8 +219,9 @@ def __init__(self, target=None, name=None,
self.target_kwargs = dict(kwargs)

def bind_signal_handlers(self):
signal.signal(signal.SIGCHLD, self.on_sigchld)
signal.siginterrupt(signal.SIGCHLD, False)
signals.process.register(
signals.constants.SIGCHLD,
self, self.on_sigchld)

def __str__(self):
return '<{0} {1}>'.format(self.name, self.pid)
Expand All @@ -227,8 +230,13 @@ def __repr__(self):
return self.__str__()

def on_sigchld(self, signum, sigframe):
signals.process.unregister(
signals.constants.SIGCHLD,
self,
self.on_sigchld)

if self._child is not None and self._child.pid:
self.join()
self.exit()

def create(self):
"""Method to be called when the process child is forked"""
Expand Down Expand Up @@ -285,13 +293,19 @@ def start(self, wait=False, wait_timeout=0):
if self._child is not None:
raise RuntimeError("Cannot start a process twice")

self.bind_signal_handlers()
self._child = ProcessOpen(self, wait=wait, wait_timeout=wait_timeout)
self.bind_signal_handlers()
child_pid = self._child.pid
self._current = self

return child_pid

def exit(self):
if self._on_exit:
self._on_exit(self)

self.clean()

def join(self, timeout=None):
"""Awaits on Process exit

Expand All @@ -303,15 +317,10 @@ def join(self, timeout=None):
if self._child is None:
raise RuntimeError("Can only join a started process")

try:
self._exitcode = self._child.wait(timeout)
except OSError:
pass
# FIXME: self._exitcode set inside self.wait()
self.wait()

if self._on_exit:
self._on_exit(self)

self.clean()
self.exit()

def terminate(self, wait=False):
"""Forces the process to stop
Expand All @@ -326,7 +335,8 @@ def terminate(self, wait=False):
self._child.terminate()

if wait:
self.wait(until=lambda p, *args: p._child is None)
self.wait()
self.exit()

def restart(self, policy=JOIN_RESTART_POLICY):
if not policy in [JOIN_RESTART_POLICY, TERMINATE_RESTART_POLICY]:
Expand Down Expand Up @@ -376,12 +386,20 @@ def wait(self, until=None, args=(), timeout=None):
:type timeout: float
"""
def default_until(self, *args):
if self._child is not None:
try:
self._child.wait(timeout)
except OSError:
pass
return True
if self._child is None:
return False

exitcode = None
try:
# FIXME: I'm an ugly implicit side-effect!
exitcode = self._child.wait(timeout)
except OSError:
return False

if exitcode is not None:
self._exitcode = exitcode

return True

if until is not None and not callable(until):
raise ValueError("Until parameter must be a callable")
Expand All @@ -399,9 +417,7 @@ def is_alive(self):
if self._child is None or not self._child.pid:
return False

self._child.poll()

return self._child.returncode is None
return self._child.is_running

@property
def exitcode(self):
Expand Down
4 changes: 4 additions & 0 deletions pkit/signals/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from __future__ import absolute_import

from .base import *
from . import process
51 changes: 51 additions & 0 deletions pkit/signals/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import signal
import collections

from pkit.signals import constants


__all__ = ['register', 'unregister']


SIGNAL_HANDLERS = collections.defaultdict(list)


def reset():
SIGNAL_HANDLERS = collections.defaultdict(list)


def call_signal_handler(signum):
def handle_signal(signum, sigframe):
if signum not in SIGNAL_HANDLERS:
return None

for handler in SIGNAL_HANDLERS[signum]:
handler(signum, sigframe)

return handle_signal


def register(signum, handler):
if signum not in constants.SIGNAL_NUMBERS:
raise ValueError('Unknow signal number {}'.format(signum))

if not callable(handler):
raise TypeError('handler must be callable')

if signum not in SIGNAL_HANDLERS or not SIGNAL_HANDLERS[signum]:
signal.signal(signum, call_signal_handler(signum))

SIGNAL_HANDLERS[signum].append(handler)


def unregister(signum, handler):
if signum not in SIGNAL_HANDLERS:
raise LookupError('signal number {} not found'.format(signum))

handlers = SIGNAL_HANDLERS[signum]
try:
handlers.remove(handler)
except ValueError:
raise LookupError('handler {} not found for signal number {}'.format(
handler,
signum))
21 changes: 21 additions & 0 deletions pkit/signals/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import sys
import signal


__all__ = [] # Fill below


current_module = sys.modules[__name__]

SIGNALS = {k: v for k, v in vars(signal).iteritems() if
k.startswith('SIG')}

SIGNAL_NUMBERS = SIGNALS.values()


def set_signals():
for signame, signum in SIGNALS.iteritems():
setattr(current_module, signame, signum)
__all__.append(signame)

set_signals()
49 changes: 49 additions & 0 deletions pkit/signals/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os

import pkit.signals
from pkit.signals.base import SIGNAL_HANDLERS
from pkit.signals.registry import HandlerRegistry


def get_last_exited_pid():
try:
pid, _ = os.wait()
except OSError:
return None

return pid


def get_registry_handler(signum, handlers=SIGNAL_HANDLERS):
if signum not in handlers:
return None

for i in handlers[signum]:
if isinstance(i, HandlerRegistry):
registry_handler = i
break

return registry_handler


def register(signum, process, handler):
registry_handler = get_registry_handler(signum)

if registry_handler is None:
registry_handler = HandlerRegistry(
extract_from=lambda *_: get_last_exited_pid(),
insert_with=lambda process: process.pid)
pkit.signals.base.register(signum, registry_handler)

registry_handler.register(process, handler)


def unregister(signum, process, handler):
registry_handler = get_registry_handler(signum)

if registry_handler is None:
raise LookupError(
'Handler {} for process {} and signal {} not found'.format(
handler, process, signum))

registry_handler.unregister(process, handler)
34 changes: 34 additions & 0 deletions pkit/signals/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import collections


class HandlerRegistry(object):
def __init__(self, extract_from, insert_with):
"""
:param extract_from: returns the key of the object to find when a
signal is triggered.
:type extract_from: callable(signum, sigframe)

:param insert_with: returns the value of the key used to index the
object in the registry.
:type insert_with: callable(obj)

"""
self._extract_from = extract_from
self._insert_with = insert_with
self._handlers = collections.defaultdict(list)

def register(self, obj, handler):
key = self._insert_with(obj)
self._handlers[key].append(handler)

def unregister(self, obj, handler):
key = self._insert_with(obj)
self._handlers[key].remove(handler)

def __call__(self, signum, sigframe):
key = self._extract_from(signum, sigframe)
if key not in self._handlers:
return

for handler in self._handlers[key]:
handler(signum, sigframe)
10 changes: 7 additions & 3 deletions pkit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
from pkit.exceptions import TimeoutError


DEFAULT_INTERVAL = 0.005


def wait(until=None, timeout=None, args=(), kwargs={}):
if not callable(until):
raise TypeError("until must be callable")

elapsed = 0.0

interval = DEFAULT_INTERVAL
while until(*args, **kwargs) is False:
time.sleep(0.005)
elapsed += 0.005
time.sleep(interval)
elapsed += interval

if timeout is not None and elapsed >= timeout:
raise TimeoutError

Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
packages=[
'pkit',

'pkit.slot'
'pkit.slot',

'pkit.signals',
],
)
Loading