Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -465,22 +465,36 @@ fi
##
RRA_WITH_SYSTEMD_UNITDIR

AC_PKGCONFIG

##
# libev
# Which event loop implementation?
##
AC_ARG_WITH([external-libev],
AS_HELP_STRING([--with-external-libev], [Use external libev]))
AS_IF([test "x$with_external_libev" = "xyes"], [
AC_ARG_WITH([libuv],
AS_HELP_STRING([--with-libuv], [Use libuv for event loop (experimental)]))
AC_ARG_WITH([libev],
AS_HELP_STRING([--with-libev], [Use non-vendored libev for event loop]))
AS_IF([test "x$with_libuv" = "xyes" -a "x$with_libev" = "xyes"], [
AC_MSG_ERROR([--with-libuv conflicts with --with-libev])
])
AS_IF([test "x$with_libuv" = "xyes"], [
PKG_CHECK_MODULES([LIBUV], [libuv], [], [])
AC_DEFINE([HAVE_LIBUV],[1],[Event loop is libuv])
])
AS_IF([test "x$with_libev" = "xyes"], [
AC_SEARCH_LIBS([ev_run], [ev], [], [
AC_MSG_ERROR([--with-external-libev requested but external libev not found])
AC_MSG_ERROR([--with-libev requested but external libev not found])
])
],[
AC_DEFINE([HAVE_LIBEV],[1],[Event loop is libev])
])
AS_IF([test "x$with_libev" != "xyes" -a "x$with_libuv" != "xyes"], [
with_internal_libev=yes
m4_include([src/common/libev/libev.m4])
AC_DEFINE([HAVE_LIBEV],[1],[Event loop is libev])
AC_DEFINE([HAVE_LIBEV_INTERNAL],[1],[Use vendored libev])
])
AM_CONDITIONAL([INTERNAL_LIBEV],[test "x$with_external_libev" != "xyes"])

AC_PKGCONFIG
AM_CONDITIONAL([INTERNAL_LIBEV],[test "x$with_internal_libev" = "xyes"])
AM_CONDITIONAL([LIBUV], [test "x$with_libuv" = "xyes"])

##
# Project directories
Expand Down
2 changes: 1 addition & 1 deletion scripts/configure-macos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ source macos-venv/bin/activate
./autogen.sh

CPPFLAGS="$CPPFLAGS" LDFLAGS=$LDFLAGS PKG_CONFIG_PATH=$PKG_CONFIG_PATH \
./configure --with-external-libev
./configure --with-libev
1 change: 1 addition & 0 deletions scripts/install-deps-deb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ apt install \
pkg-config \
libc6-dev \
libzmq3-dev \
libuv1-dev \
uuid-dev \
libjansson-dev \
liblz4-dev \
Expand Down
1 change: 1 addition & 0 deletions src/common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ libflux_internal_la_LIBADD = \
$(LIBPTHREAD) \
$(LIBDL) \
$(LIBRT) \
$(LIBUV_LIBS) \
$(FLUX_SECURITY_LIBS)
if INTERNAL_LIBEV
libflux_internal_la_LIBADD += $(builddir)/libev/libev.la
Expand Down
15 changes: 9 additions & 6 deletions src/common/libflux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ AM_CPPFLAGS = \
-DLUADIR=\"$(luadir)\" \
-DLUAEXECDIR=\"$(luaexecdir)\" \
$(JANSSON_CFLAGS) \
$(LIBUUID_CFLAGS)

if INTERNAL_LIBEV
AM_CPPFLAGS += -I$(top_srcdir)/src/common/libev
endif
$(LIBUUID_CFLAGS) \
$(LIBUV_CFLAGS)

fluxcoreinclude_HEADERS = \
flux.h \
Expand Down Expand Up @@ -69,7 +66,6 @@ libflux_la_SOURCES = \
reactor_private.h \
watcher.c \
watcher_private.h \
watcher_wrap.c \
hwatcher.c \
msg_handler.c \
message.c \
Expand Down Expand Up @@ -102,6 +98,12 @@ libflux_la_SOURCES = \
fripp.h \
fripp.c

if LIBUV
libflux_la_SOURCES += watcher_uv.c
else
libflux_la_SOURCES += watcher_ev.c
endif

libflux_la_LDFLAGS = \
$(AM_LDFLAGS)

Expand Down Expand Up @@ -141,6 +143,7 @@ test_ldadd = \
$(top_builddir)/src/common/libtap/libtap.la \
$(LIBUUID_LIBS) \
$(JANSSON_LIBS) \
$(LIBUV_LIBS) \
$(LIBPTHREAD) \
$(LIBDL)

Expand Down
100 changes: 78 additions & 22 deletions src/common/libflux/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,36 @@
#include <errno.h>
#include <stdbool.h>
#include <fcntl.h>
#include <ev.h>
#if HAVE_LIBUV
# include <uv.h>
#elif HAVE_LIBEV_INTERNAL
# include "src/common/libev/ev.h"
#else
# include <ev.h>
#endif
#include <flux/core.h>

#include "reactor_private.h"

struct flux_reactor {
#if HAVE_LIBUV
uv_loop_t loop;
#else
struct ev_loop *loop;
#endif
int usecount;
unsigned int errflag:1;
};

static int valid_flags (int flags, int valid)
{
if ((flags & ~valid)) {
errno = EINVAL;
return -1;
}
return 0;
}

void flux_reactor_decref (flux_reactor_t *r)
{
if (r && --r->usecount == 0) {
int saved_errno = errno;
#if HAVE_LIBUV
(void)uv_loop_close (&r->loop); // could return -EBUSY
#else
ev_loop_destroy (r->loop);
#endif
free (r);
errno = saved_errno;
}
Expand All @@ -66,63 +71,114 @@
}
if (!(r = calloc (1, sizeof (*r))))
return NULL;
r->loop = ev_loop_new (EVFLAG_NOSIGMASK | EVFLAG_SIGNALFD);
if (!r->loop) {
#if HAVE_LIBUV
int uverr;
if ((uverr = uv_loop_init (&r->loop)) < 0) {
free (r);
errno = -uverr;
return NULL;
}
#else
if (!(r->loop = ev_loop_new (EVFLAG_NOSIGMASK | EVFLAG_SIGNALFD))) {
free (r);

Check warning on line 83 in src/common/libflux/reactor.c

View check run for this annotation

Codecov / codecov/patch

src/common/libflux/reactor.c#L83

Added line #L83 was not covered by tests
errno = ENOMEM;
flux_reactor_destroy (r);
return NULL;
}
ev_set_userdata (r->loop, r);
#endif
r->usecount = 1;
return r;
}

int flux_reactor_run (flux_reactor_t *r, int flags)
{
int ev_flags = 0;
int count;
int rflags;

if (valid_flags (flags, FLUX_REACTOR_NOWAIT | FLUX_REACTOR_ONCE) < 0)
return -1;
if (flags & FLUX_REACTOR_NOWAIT)
ev_flags |= EVRUN_NOWAIT;
if (flags & FLUX_REACTOR_ONCE)
ev_flags |= EVRUN_ONCE;
r->errflag = 0;
count = ev_run (r->loop, ev_flags);
#if HAVE_LIBUV
if (flags == FLUX_REACTOR_NOWAIT)
rflags = UV_RUN_NOWAIT;
else if (flags == FLUX_REACTOR_ONCE)
rflags = UV_RUN_ONCE;
else if (flags == 0)
rflags = UV_RUN_DEFAULT;
else
goto error;
count = uv_run (&r->loop, rflags);
#else
if (flags == FLUX_REACTOR_NOWAIT)
rflags = EVRUN_NOWAIT;
else if (flags == FLUX_REACTOR_ONCE)
rflags = EVRUN_ONCE;
else if (flags == 0)
rflags = 0;
else
goto error;
count = ev_run (r->loop, rflags);
#endif
return (r->errflag ? -1 : count);
error:
errno = EINVAL;
return -1;
}

double flux_reactor_time (void)
{
#if HAVE_LIBUV
return 1E-9 * uv_hrtime();
#else
return ev_time ();
#endif
}

double flux_reactor_now (flux_reactor_t *r)
{
#if HAVE_LIBUV
return 1E-3 * uv_now (&r->loop);
#else
return ev_now (r->loop);
#endif
}

void flux_reactor_now_update (flux_reactor_t *r)
{
#if HAVE_LIBUV
return uv_update_time (&r->loop);
#else
return ev_now_update (r->loop);
#endif
}

void flux_reactor_stop (flux_reactor_t *r)
{
r->errflag = 0;
#if HAVE_LIBUV
uv_stop (&r->loop);
#else
ev_break (r->loop, EVBREAK_ALL);
#endif
}

void flux_reactor_stop_error (flux_reactor_t *r)
{
r->errflag = 1;
#if HAVE_LIBUV
uv_stop (&r->loop);
#else
ev_break (r->loop, EVBREAK_ALL);
#endif
}

void *reactor_get_loop (flux_reactor_t *r)
{
return r ? r->loop : NULL;
if (!r)
return NULL;
#if HAVE_LIBUV
return &r->loop;
#else
return r->loop;
#endif
}

/*
Expand Down
26 changes: 22 additions & 4 deletions src/common/libflux/test/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
#include "src/common/libtap/tap.h"
#include "ccan/array_size/array_size.h"

// XXX let the unit test compile during libuv integration
#if HAVE_LIBEV
#define HAVE_PERIODIC_WATCHER 1
#define HAVE_CHECK_PRIORITY 1
#endif

void watcher_is (flux_watcher_t *w,
bool exp_active,
bool exp_referenced,
Expand Down Expand Up @@ -210,7 +216,7 @@ static void oneshot (flux_reactor_t *r,
static void test_timer (flux_reactor_t *reactor)
{
flux_watcher_t *w;
double elapsed, t0, t[] = { 0.001, 0.010, 0.050, 0.100, 0.200 };
double elapsed, t0, t[] = { 0.005, 0.010, 0.050, 0.100, 0.200 };
int i, rc;

/* in case this test runs a while after last reactor run.
Expand Down Expand Up @@ -260,7 +266,7 @@ static void test_timer (flux_reactor_t *reactor)
elapsed = flux_reactor_now (reactor) - t0;
ok (repeat_countdown == 0,
"timer: repeat timer ran 10x and stopped itself");
ok (elapsed >= 0.001*10,
ok (elapsed >= 0.001*10 - 0.001, // see libuv note below
"timer: elapsed time is >= 10*1ms (%.3fs)", elapsed);
flux_watcher_stop (w);
flux_watcher_destroy (w);
Expand All @@ -275,12 +281,15 @@ static void test_timer (flux_reactor_t *reactor)
oneshot_runs = 0;
rc = flux_reactor_run (reactor, 0);
elapsed = flux_reactor_now (reactor) - t0;
ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i],
"timer: reactor ran %.3fs oneshot at >= time (%.3fs)", t[i], elapsed);
// libuv timer rez is 1ms so allow event to fire up to 1ms early
ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i] - 0.001,
"timer: reactor ran %.3fs oneshot punctually", t[i]);
diag ("elapsed time was %.3fs", elapsed);
}
flux_watcher_destroy (w);
}

#if HAVE_PERIODIC_WATCHER

/* A reactor callback that immediately stops reactor without error */
static bool do_stop_callback_ran = false;
Expand Down Expand Up @@ -407,6 +416,8 @@ static void test_periodic (flux_reactor_t *reactor)

}

#endif

static int idle_count = 0;
static void idle_cb (flux_reactor_t *r,
flux_watcher_t *w,
Expand Down Expand Up @@ -748,6 +759,7 @@ static void test_unref (flux_reactor_t *r)
count = 0;
ok (flux_reactor_run (r, 0) == 0 && count == 1,
"flux_reactor_run with one unref watcher returned after 1 iteration");
diag ("count=%d", count);
flux_watcher_stop (w); // calls ev_ref()

flux_watcher_ref (w);
Expand Down Expand Up @@ -811,6 +823,7 @@ static void test_reactor_flags (flux_reactor_t *r)
"flux_reactor_create flags=0xffff fails with EINVAL");
}

#if HAVE_CHECK_PRIORITY
static char cblist[6] = {0};
static int cblist_index = 0;
static flux_watcher_t *priority_prep = NULL;
Expand Down Expand Up @@ -883,6 +896,7 @@ static void test_priority (flux_reactor_t *r)
flux_watcher_destroy (priority_prep);
flux_watcher_destroy (priority_idle);
}
#endif

int main (int argc, char *argv[])
{
Expand All @@ -902,7 +916,9 @@ int main (int argc, char *argv[])
"flux_watcher_is_active (NULL) returns false");

test_timer (reactor);
#if HAVE_PERIODIC_WATCHER
test_periodic (reactor);
#endif
test_fd (reactor);
test_idle (reactor);
test_prepcheck (reactor);
Expand All @@ -911,7 +927,9 @@ int main (int argc, char *argv[])
test_handle (reactor);
test_unref (reactor);
test_reactor_flags (reactor);
#if HAVE_CHECK_PRIORITY
test_priority (reactor);
#endif

flux_reactor_destroy (reactor);

Expand Down
Loading
Loading