diff --git a/.gitignore b/.gitignore index 0bd3136..c0638a0 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ doc/html +.*~ +.*.swp +.*.swo diff --git a/.travis.yml b/.travis.yml index 39089f5..3c6a151 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,15 +1,12 @@ language: julia os: - linux + - osx julia: - - 0.3 - - 0.4 + - release - nightly +sudo: false notifications: email: false -sudo: false -script: - - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi - - julia -e 'Pkg.clone(pwd()); Pkg.build("Reactive"); Pkg.test("Reactive"; coverage=true)' after_success: - - julia -e 'cd(Pkg.dir("Reactive")); Pkg.add("Coverage"); using Coverage; Coveralls.submit(process_folder()); Codecov.submit(process_folder())' + - julia -e 'cd(Pkg.dir("Reactive")); Pkg.add("Coverage"); using Coverage; Coveralls.submit(Coveralls.process_folder())' diff --git a/REQUIRE b/REQUIRE index 285a58c..2c4ef82 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,2 +1 @@ julia 0.3 -Compat 0.4.0 diff --git a/doc/index.md b/doc/index.md index d45c392..dd6ffbf 100644 --- a/doc/index.md +++ b/doc/index.md @@ -8,7 +8,12 @@ order: 1 Reactive.jl is a Julia package for [Reactive Programming](https://en.wikipedia.org/wiki/Reactive_programming). It makes writing event-driven programs simple. -Reactive borrows its design from [Elm](http://elm-lang.org/) (see also [Functional Reactive Programming](http://elm-lang.org/learn/What-is-FRP.elm)). +Reactive borrows its vocabulary from [Elm](http://elm-lang.org/). + +# What is reactive programming? + +*Reactive programming* is a way of creating event-driven programs in terms of **streams of data**. The streams in this package are called Signals, the name signifies the fact that they always have a value, and are conceptually continuous like electrical signals. For example, a keyboard gives out a *signal of keys pressed*, a timer might give out a *signal of timestamps*, a database can consume a *signal of queries* and so on. Reactive also provides functions for common operations on signals such as transforming, filtering, merging, sampling, and throttling. + # Getting Started @@ -27,7 +32,7 @@ using Reactive -The basic currency of Reactive programs is the signal. `Signal{T}` is an abstract type that represents a time-varying value of type `T`. You can create, mix and mash `Signal`s using Reactive. +The basic currency of Reactive programs is the signal. `Signal{T}` represents a time-varying value of type `T`. -An `Input` is the most basic kind of signal: it has no *parents*--all updates to it are explicitly done through a call to `push!`. +A signal can be created using the `Signal` constructor, and must be given an inital value. ```{.julia execute="false"} # E.g. -x = Input(0) -typeof(x) -# => Input{Int64} -super(Input{Int64}) -# => Signal{Int64} -value(x) -# => 0 -push!(x, 2) -value(x) -# => 2 -``` +julia> x = Signal(0) +Signal{Int64}(0, nactions=0) -## Derived signals +julia> value(x) +0 +``` -The `lift` operator can be used to transform one signal into another. +to update the value in a signal, use the `push!` function on signals. ```{.julia execute="false"} -xsquared = lift(a -> a*a, x) -typeof(xsquared) -# => Lift{Int64} -super(Reactive.Lift{Int64}) -# => Node{Int64} -super(super(Reactive.Lift{Int64})) -# => Signal{Int64} -value(xsquared) -# => 4 +# E.g. +julia> push!(x, 42) + +julia> value(x) +42 ``` -The type of the lifted signal can be specified using a keyword argument `typ=T` to `lift`. If omitted, it is determined from the type returned by the function, using the current `value`s of its inputs. +the `push!` function updates the signal asynchronously via a central channel of updates. Below we will learn ways to derive dependent signals from one or more signals that already exist. + + +## Derived signals + +The `map` function can be used to transform signals by applying a function. -Now for every value `x` takes, `xsquared` will hold its square. ```{.julia execute="false"} -push!(x, 3) -value(xsquared) -# => 9 +julia> xsquared = map(a -> a*a, x) + +julia> value(xsquared) +1764 # 42^2 + ``` -`lift` can take more than one signal as argument. + +Now for every value of `x`, `xsquared` will hold its square. ```{.julia execute="false"} -y = lift((a, b) -> a + b, x, xsquared; typ = Int) -value(y) -# => 12 +julia> push!(x, 3) + +julia> value(xsquared) +9 ``` -**Example: A stupid line-follower bot** +The type of the `map` signal can be specified using a keyword argument `typ=T` to `map`. If omitted, it is determined from the type returned by the function, using the current `value`s of its inputs. If you want to set an initial value without computing it from the current value of the input signals, you can specify it using the `init` keyword argument. `map` can take more than one signals as argument. Here is a demonstration of these three points. -In the examples below we explore how a simple line-follower robot could be programmed with Reactive. +```{.julia execute="false"} +julia> y = map(+, x, xsquared; typ=Float64, init=0) -Here are the specifications of the robot: +julia> value(y) # Will give the initial value +0.0 -1. There are 3 sensors: left, middle and right -2. There are 2 DC motors: left and right (the bot is balanced by a castor wheel) +julia> push!(x, 4) -We start off by creating a signal of sensor values. -```{.julia execute="false"} -# the values signify how much of the line each sensor (left, middle, right) is seeing -sensor_input = Input([0.0, 1.0, 0.0]) # :: Input{Vector{Float64}} +julia> value(y) # will be 4 + 4^2 +20.0 ``` -Then create motor voltages from sensor readings. -```{.julia execute="false"} -function v_left(sensors) - # slow down when left sensor is seeing a lot of the line - # -ve voltage turns the wheel backwards - # this could, of course, be [more complicated than this](http://www.societyofrobots.com/member_tutorials/book/export/html/350). - sensors[2] - sensors[1] -end +Note that, signal nodes that do not have any reference in Reactive are admissible to [garbage collection](https://en.wikipedia.org/wiki/Garbage_collection) and subsequent termination of updates. So if you are creating a signal with `map` and to do some side effect (like printing) and don't plan to keep a reference to it, it may be stopped in the next GC pass. To prevent this from happening, you can *preserve* a signal using the `preserve` function. + +```julia +julia> preserve(map(println, x)) +Signal{Void}(nothing, nactions=0) # the type is Void because that's the return type of println +4 + +julia> push!(x, 25) +25 # printed by the above signal +``` + +`foreach(f, x)` is a shorthand for `preserve(map(f, x))`. So the above could also have been written as `foreach(println, x)`. + +`map` is a very useful function on signals. We will see an example of map below. + +**Example: A simple animation** + +Let's use `map` to create an animation of a bouncing ball using [Compose.jl](http://composejl.org). + +Our goal is to create a signal of Compose pictures that updates over time. To do this we will first create a function which given a time `t`, returns a picture of the ball at that time `t`. We will worry about updating this time `t` later. -# Similarly -function v_right(sensors) - # slow down when the right sensor is seeing the line - sensors[2] - sensors[3] +``` +function drawball(t) + y = 1-abs(sin(t)) # The y coordinate. + compose(context(), circle(0.5, y, 0.04)) end +``` + +In this function the `y` coordinate of the ball at any time `t` is `1-abs(sin(t))` - when you plot this function over `t`, you can see that it looks like the bouncing of a ball. + +Next, we need a signal that updates at a reasonable rate every second. That's where the `fps` function comes in handy. `fps(rate)` returns a signal which updates `rate` times every second. -left_motor = lift(v_left, sensor_input, typ=Float64) -right_motor = lift(v_right, sensor_input, typ=Float64) +``` +julia> ticks = fps(60) ``` -The `@lift` macro makes this simpler: +The `ticks` signal itself updates to the time elapsed between the current update and the previous update, although this is useful, for the sake of this example, we will use `map` to create a signal of time stamps from this signal. -```{.julia execute="false"} -left_motor = @lift sensor_input[1] - sensor_input[3] -right_motor = @lift sensor_input[1] - sensor_input[2] +``` +julia> timestamps = map(_ -> time(), ticks) ``` -We now ask the bot to apply the voltages across its two wheels: -```{.julia execute="false"} -function set_voltages(left, right) - write(LEFT_MOTOR_OUTPUT, left) - write(RIGHT_MOTOR_OUTPUT, right) -end +Now that we have a signal of timestamps, we can use this to create a signal of compose graphics which will be our animation. -@lift set_voltages(left_motor, right_motor) +``` +julia> anim = map(drawball, timestamps) ``` -## The Event Loop -Finally, we need to set up a loop which reads input from the sensors and plumbs it into the data flow we created above. +**Try it.** The [Interact](https://github.com/JuliaLang/Interact.jl) package allows you to render `Signal` objects as they update over time in IJulia notebooks. Try the following code in an IJulia notebook to see the animation we just created. -```{.julia execute="false"} -function read_sensors() - [read(LEFT_SENSOR_PIN), - read(MIDDLE_SENSOR_PIN), - read(RIGHT_SENSOR_PIN)] -end +``` +using Reactive, Interact, Compose -while true - # push! changes the value held by an input signal and - # propagates it through the data flow - @async push!(sensor_input, read_sensors()) - sleep(0.1) +function drawball(t) + y = 1-abs(sin(t)) # The y coordinate. + compose(context(), circle(0.5, y, 0.04)) end + +ticks = fps(60) +timestamps = map(_ -> time(), ticks) +map(drawball, timestamps) ``` -See [Timed signals and sampling](#timed-signals-and-sampling) for a more elegant way of doing the same! +The complete example points to the usual structure of programs written with Reactive. It usually consists of stateless functions (such as `drawball`) and then wiring input signals to these stateless functions to create the output signal. Below we will see some more involved examples with other operations on signals. ## Maintaining State -**Example: A Voting System** -The following examples deal with a voting system in an election. The voters can either vote for Alice, Bob, or cast an invalid vote. +[`foldp`](api.html#foldp) can be used to accumulate a value over time. You might have learned about [foldl and foldr](https://en.wikipedia.org/wiki/Fold_%28higher-order_function%29) functions on collection objects. `foldp` is a similar function, the name stands for "fold over past values". -[`foldl`](api.html#foldl) can be used to accumulate a value over time. You might count the number of votes like this: -```{.julia execute="false"} -votes = Input(:NoVote) # We use :NoVote to denote the initial case -total = foldl((acc, vote) -> acc + 1, 0, votes) # Count all votes -alice = foldl((acc, vote) -> acc + (vote == :Alice), 0, votes) -bob = foldl((acc, vote) -> acc + (vote == :Bob), 0, votes) -leading = lift((a, b) -> a > b ? :Alice : a < b ? :Bob : :Tie, alice, bob) -``` +Let's look at how it works: `y = foldp(f, init, x)` -Maintaining a difference between two updates is a bit more involved. To find the difference between previous and current value of a signal, you'd do: -```{.julia execute="false"} -function difference(prev, x) - prev_diff, prev_val = prev - # x becomes prev_val in the next call - return (x-prev_val, x) -end +Here, `y` is a signal whose initial value is `init`, and when the signal `x` updates, `f` is applied to the current value of `y` and the current value of `x` and the result is again stored in `y`. + +As an example: -diff = lift(x->x[1], foldl(difference, (0.0, 0.0), signal)) ``` +julia> x = Signal(0) -Note that this method has the advantage that all state is explicit. You could accomplish this by using a global variable to store `prev_val`, but that is not recommended. +julia> y = foldp(+, 0, x) -## Filter and merge +julia> push!(x, 1) -The [`filter`](api.html#filter) or [`dropif`](api.html#dropif) functions can filter a signal based on a predicate function. +julia> value(y) +1 -```{.julia execute="false"} -# Create a signal of only valid votes -# If the initial votes are invalid, we use :NoVote -valid_votes = filter(x -> x != :Invalid, :NoVote, votes) +julia> push!(x, 2) -# Or -valid_votes = dropif(x -> x == :Invalid, :NoVote, votes) +julia> value(y) +3 + +julia> push!(x, 3) + +julia> value(y) +3 ``` -To drop certain updates to a signal you can use [`keepwhen`](api.html#keepwhen) and [`dropwhen`](api.html#dropwhen). You could stop collecting votes when there is a security breach like this: -```{.julia execute="false"} -secure_votes = keepwhen(everything_secure, votes) +When we wrote `y=foldp(+, 0, x)` we created a signal `y` which collects updates to `x` using the function `+` and starting from `0`. In other words, `y` holds the sum of all updates to `x`. -# Or -secure_votes = dropwhen(security_breached, votes) +We can rewrite the above bouncing ball example by summing time-deltas given by `fps` instead of calling time() as follows. +``` +ticks = fps(60) +t = foldp(+, 0.0, ticks) +map(drawball, t) ``` -[`merge`](api.html#merge) merges multiple signals of the same type. To collect all votes from 3 polls into a single signal you'd do something like +If one were to use `fpswhen(switch, 60)` instead of `fps(60)` here to start and stop the fps signal with respect to some other boolean signal called `switch`, after switching off the animation and switching it on, the ball would start off where it was paused with the foldp version of the animation. + +## Filtering + +Another important operator on signals is `filter`. It can be used to filter only those updates which are true according to a given condition. + +`filter(a -> a % 2 == 0, x)` will only keep even updates to the integer signal `x`. + +A variation of `filter` called `filterwhen` lets you keep updates to a signal only when another boolean signal is true. + +`filterwhen(switch_signal, signal_to_filter)` + +## Merging + +`d = merge(a,b,c)` will merge updates to `a`, `b` and `c` to produce a signle signal `d`. + +## Drop repeats + +You can drop repeated updates to a signal with [`droprepeats`](api.html#droprepeats) -```{.julia execute="false"} -votes = merge(poll1_votes, poll2_votes, poll3_votes) ``` +julia> p = Signal(0) -You can drop repeated updates to a signal with [`droprepeats`](api.html#droprepeats): +julia> foreach(println, p) -```{.julia execute="false"} -leading_norepeat = droprepeats(leading) +julia> push!(p, 0) + +julia> push!(p, 1) +1 + +julia> push!(p, 1) ``` -`leading_norepeat` only updates when the leading candidate changes. +Notice how the value of p did not get printed when it didn't change from the previous value. + +**Example: A Voting System** + +To illustrate the functions described above, we will try to model a voting system in an electorate using Reactive. The voters can either vote for Alice, Bob, or cast an invalid vote. + +Input `votes` signal: -Finally, -```{.julia execute="false"} -lift(show_on_TV, alice, bob, stats) +``` +votes = Signal(:NoVote) # Let's :NoVote to denote the initial case ``` -## Timed signals and sampling +Now we can split the vote stream into votes for alice and those for bob. -Reactive provides functions to create timed signals. -[`every`](api.html#every) can be used to create a signal that updates at a certain interval. +``` +alice_votes = filter(v -> v == :Alice, votes) +bob_votes = filter(v -> v == :Bob, votes) +``` -```{.julia execute="false"} -# E.g. +Now let's count the votes cast for alice and bob using foldp -every10secs = every(10.0) ``` +function count(cnt, _) + cnt+1 +end -`every10secs` is a signal of timestamps (Float64) which updates every 10 seconds. +alice_count = foldp(count, 0, alice_votes) +bob_count = foldp(count, 0, bob_votes) +``` -[`sampleon`](api.html#sampleon) function takes two signals and samples the second signal when the first one changes. +We can use the counts to show at real time who is leading the election. -```{.julia execute="false"} -# E.g. +``` +leading = map(alice_count, bob_count) do a, b + if a > b + :Alice + elseif b > a + :Bob + else + :Tie + end +end +``` -# Update to the leading candidate every 10 seconds -periodic_leading = sampleon(every10secs, leading) +Notice the use of [`do` block](http://docs.julialang.org/en/release-0.4/manual/functions/#do-block-syntax-for-function-arguments) syntax here. `do` is a short-hand for creating anonymous functions and passing it as the first argument in a function call (here, to `map`). It's often useful to improve readability. + +Notice that the `leading` signal will update on every valid vote received. This is not ideal if we want to say broadcast it to someone over a slow connection, which will result in sending the same value over and over again. To alleviate this problem, we can use the droprepeats function. + +``` +norepeats = droprepeats(leading) ``` -While `every` guarrantees the interval, [`fps`](api.html#fps) tries to update at a certain maximum rate. +To demonstrate the use of `filterwhen` we will conceive a global `election_switch` signal which can be used to turn voting on or turn off. One could use this switch to stop registering votes before and after the designated time for votes, for example. ```{.julia execute="false"} -# E.g. - -fps10 = fps(10.0) +secure_votes = filterwhen(election_switch, votes) ``` +`secure_votes` will only update when `value(election_switch)` is `true`. -We can use `fps` to simplify the [event loop](#the-event-loop) in our robot example above: +Finally, to demonstrate the use of merge, let's imagine there are multiple polling stations for this election and we would like to merge votes coming in from all of them. This is pretty straightforward: ```{.julia execute="false"} -# fps returns the time delta between the past two frames -# This could be useful in animations or plotting. We ignore it here. -sensor_input = lift((delta) -> read_sensors(), fps(10.0)) +votes = merge(poll1_votes, poll2_votes, poll3_votes) ``` -[`fpswhen`](api.html#fpswhen) takes a boolean signal as the first argument and stops the timer when this signal becomes false. + +## Time, sampling and throttle + +Reactive provides functions to create timed signals. +[`every`](api.html#every) can be used to create a signal that updates at a certain interval. ```{.julia execute="false"} -# assume circuit completes if none of the sensors can see the line -circuit_not_complete = lift(s -> sum(s) != 0.0, sensor_inputs) -sensor_input = lift(read_sensors, fpswhen(circuit_not_complete, 10.0)) +# E.g. + +every10secs = every(10.0) ``` -this stops reading the input (and hence moving the bot) when the circuit is complete. +`every10secs` is a signal of timestamps (Float64) which updates every 10 seconds. + +[`sampleon`](api.html#sampleon) function takes two signals and samples the second signal when the first one changes. -[`timestamp`](api.html#timestamp) function can be used to timestamp any signal. +Let's say in our voting example, we want a signal of the leading voted candidate in the election but would like an update at most every 10 seconds, one could do it like this: ```{.julia execute="false"} # E.g. -timestamped_votes = timestamp(votes) +periodic_leading = sampleon(every10secs, leading) +``` + +`throttle` lets you limit updates to a signal to a maximum of one update in a specified interval of time. + +Suppose you are receiving an input from a sensor and the sampling rate of it can vary and sometimes becomes too high for your program to handle, you can use throttle to down sample it if the frequency of updates become too high. + +``` +throttle(1/100, sensor_input) # Update at most once in 10ms ``` -`timestamped_votes` is a signal of `(timestamp, vote)` where `timestamp` is a `Float64` timestamp denoting when the `vote` came in. # Reactive in the wild Reactive is a great substrate to build interactive GUI libraries. Here are a few projects that make use of Reactive: diff --git a/src/React.jl b/src/React.jl deleted file mode 100644 index c74d8c9..0000000 --- a/src/React.jl +++ /dev/null @@ -1,3 +0,0 @@ -module React - warn("React.jl has been renamed to Reactive.jl. Please install it with Pkg.add(\"Reactive\")") -end diff --git a/src/Reactive.jl b/src/Reactive.jl index 83f3aad..26c276b 100644 --- a/src/Reactive.jl +++ b/src/Reactive.jl @@ -1,498 +1,12 @@ -VERSION >= v"0.4.0-dev+6521" && __precompile__() - module Reactive using Compat -using Base.Order -using Base.Collections -import Base.consume - -export SignalSource, - Signal, - Input, - Node, - signal, - value, - lift, - @lift, - trylift, - consume, - tryconsume, - map, - foldl, - flatten, - switch, - foldr, - merge, - filter, - dropif, - droprepeats, - dropwhen, - sampleon, - prev, - keepwhen, - ⟿ - -import Base: eltype, join_eltype, convert, push!, merge, map, show, writemime, filter - -if VERSION >= v"0.3-" - import Base: foldl, foldr -end - -@compat typealias Callable Union{DataType, Function} - -# SignalSource is a contract that you can call signal() on the -# value to get a Signal -abstract SignalSource - -# A `Signal{T}` is a time-varying value of type T. -# Signal itself is a subtype of SignalSource for easy -# dispatch (e.g. see foldl below) -abstract Signal{T} <: SignalSource -signal(x::Signal) = x - -convert(::Type{Signal}, x::SignalSource) = signal(x) -eltype{T}(::Signal{T}) = T - -# A topological order -begin - local counter = @compat UInt(0) - - function next_rank() - counter += 1 - return counter - end -end - -rank(x::Signal) = x.rank # topological rank -value(x::Signal) = x.value # current value - -# An `Input` is a signal which can be updated explicitly by code external to Reactive. -# All other signal types have implicit update logic. -# `Input` signals can be updated by a call to `push!`. -# An `Input` must be created with an initial value. -type Input{T} <: Signal{T} - rank::UInt - children::Vector{Signal} - value::T - - function Input(v) - new(next_rank(), Signal[], convert(T, v)) - end -end -Input{T}(v::T) = Input{T}(v) - -# An intermediate node. A `Node` can be created by functions -# in this library that return signals. -abstract Node{T} <: Signal{T} - -#function add_child!(parents::Tuple{Vararg{Signal}}, child::Signal) -function add_child!(parents::@compat(Tuple{Vararg{Signal}}), child::Signal) - for p in parents - push!(p.children, child) - end -end -add_child!(parent::Signal, child::Signal) = push!(parent.children, child) - -function remove_child!(parents::(@compat Tuple{Vararg{Signal}}), child::Signal) - for p in parents - p.children = p.children[find(p.children .!= child)] - end -end -remove_child!(parent::Signal, child::Signal) = - remove_child!((parent,), child) - -type Lift{T} <: Node{T} - rank::UInt - children::Vector{Signal} - f::Callable - signals::@compat Tuple{Vararg{Signal}} - value::T - - function Lift(f::Callable, signals, init) - node = new(next_rank(), Signal[], f, signals, convert(T, init)) - add_child!(signals, node) - return node - end -end - -function update{T}(node::Lift{T}, parent) - node.value = convert(T, node.f([s.value for s in node.signals]...)) - return true -end - -type Filter{T} <: Node{T} - rank::UInt - children::Vector{Signal} - predicate::Function - signal::Signal{T} - value::T - - function Filter(predicate, v0, s::Signal{T}) - node = new(next_rank(), Signal[], predicate, s, - predicate(s.value) ? - s.value : convert(T, v0)) - add_child!(s, node) - return node - end -end - -function update{T}(node::Filter{T}, parent::Signal{T}) - if node.predicate(node.signal.value) - node.value = node.signal.value - return true - else - return false - end -end - -type DropWhen{T} <: Node{T} - rank::UInt - children::Vector{Signal} - test::Signal{Bool} - signal::Signal{T} - value::T - - function DropWhen(test, v0, s::Signal{T}) - node = new(next_rank(), Signal[], test, s, - test.value ? convert(T, v0) : s.value) - add_child!(s, node) - return node - end -end - -function update{T}(node::DropWhen{T}, parent::Signal{T}) - if node.test.value - return false - else - node.value = parent.value - return true - end -end - -type DropRepeats{T} <: Node{T} - rank::UInt - children::Vector{Signal} - signal::Signal{T} - value::T - - function DropRepeats(s) - node = new(next_rank(), Signal[], s, s.value) - add_child!(s, node) - return node - end -end - -function update{T}(node::DropRepeats{T}, parent::Signal{T}) - if node.value != parent.value - node.value = parent.value - return true - else - return false - end -end - -type Merge{T} <: Node{T} - rank::UInt - children::Vector{Signal} - signals::@compat Tuple{Vararg{Signal}} - ranks::Dict{Signal, Int} - value::T - function Merge(signals::@compat Tuple{Vararg{Signal}}) - if length(signals) < 1 - error("Merge requires at least one as argument.") - end - fst, _ = signals - node = new(next_rank(), Signal[], signals, - Dict{Signal, Int}(), fst.value) - for (r, s) in enumerate(signals) - node.ranks[s] = r # precedence - end - add_child!(signals, node) - return node - end -end - -function update{T}(node::Merge{T}, parent) - node.value = convert(T, parent.value) - return true -end - -type SampleOn{T, U} <: Node{U} - rank::UInt - children::Vector{Signal} - signal1::Signal{T} - signal2::Signal{U} - value::U - function SampleOn(signal1, signal2) - node = new(next_rank(), Signal[], signal1, signal2, signal2.value) - add_child!(signal1, node) - return node - end -end - -function update(node::SampleOn, parent) - node.value = node.signal2.value - return true -end - -deepvalue(s::Signal) = value(s) -deepvalue{T <: Signal}(s::Signal{T}) = deepvalue(value(s)) - -type Flatten{T} <: Node{T} - rank::UInt - children::Vector{Signal} - value::T - function Flatten(signalsignal::Signal) - @assert isa(value(signalsignal), Signal) - node = new(next_rank(), Signal[], value(value(signalsignal))) - - firstsig = value(signalsignal) - add_child!(signalsignal, node) - - foldl(begin add_child!(firstsig, node); firstsig end, signalsignal; typ=Any) do prev, next - remove_child!(prev, node) - add_child!(next, node) - next - end - - return node - end -end - -function update(node::Flatten, parent) - # Note: node depends on 1) the signal of signals 2) the current signal - # so deepvalue actually has different behavior in these 2 cases. - node.value = deepvalue(parent) - return true -end - -begin - local isupdating = false - # Update the value of an Input signal and propagate the - # change. - # - # Args: - # input: An Input signal - # val: The new value to be set - # Returns: - # nothing - function push!{T}(input::Input{T}, val) - try - if isupdating - error("push! called when another signal is still updating.") - end - - isupdating = true - input.value = convert(T, val) - heap = Any[] # a min-heap of (child, parent) - - child_rank(x) = rank(x[1]) - ord = By(child_rank) # ordered topologically by child.rank - - # first dirty parent - merge_parent = Dict{Merge, Signal}() - for c in input.children - if isa(c, Merge) - merge_parent[c] = input - end - heappush!(heap, (c, input), ord) - end - - prev = nothing - while !isempty(heap) - (n, parent) = heappop!(heap, ord) - if n == prev - continue # already processed - end - # Merge is a special case! - if isa(n, Merge) && haskey(merge_parent, n) - propagate = update(n, merge_parent[n]) - else - propagate = update(n, parent) - end - - if propagate - for c in n.children - if isa(c, Merge) - if haskey(merge_parent, c) - if c.ranks[n] < c.ranks[merge_parent[c]] - merge_parent[c] = n - end - else - merge_parent[c] = n - end - end - heappush!(heap, (c, n), ord) - end - end - prev = n - end - isupdating = false - return nothing - catch ex - # FIXME: Rethink this. - isupdating = false - showerror(STDERR, ex) - println(STDERR) - Base.show_backtrace(STDERR, catch_backtrace()) - println(STDERR) - throw(ex) - end - end -end - -# The `lift` operator can be used to create a new signal from -# existing signals. The value of the new signal will be the return -# value of a function `f` applied to the current values of the input -# signals. -# -# Args: -# f: The transformation function -# inputs...: Signals to apply `f` to. Same number as the arity of `f`. -# init: (kwarg) - the initial value, defaults to `f(values of inputs...)` -# typ: (kwarg) - the output type, defaults to typeof(init) -# Returns: -# a signal which updates when an argument signal updates. - -consume(f::Callable, inputs::Signal...; init=f(map(value, inputs)...), typ=typeof(init)) = - Lift{typ}(f, inputs, init) - -consume(f::Callable, inputs::SignalSource...; kwargs...) = - consume(f, map(signal, inputs)...; kwargs...) - -lift(args...; kwargs...) = consume(args...; kwargs...) - -@compat typealias Try{T} Union{T, Exception} - -wraptrycatch(typ, f) = - (a...) -> try - convert(typ, f(a...)) - catch ex - ex - end - -trylift(f::Callable, typ::Type, inputs...; - init=wraptrycatch(typ, f)(map(value, inputs)...)) = - lift(wraptrycatch(typ, f), inputs...; init=init, typ=Try{typ}) -const tryconsume = trylift - -# Uncomment in Julia >= 0.3 to enable cute infix operators. -# ⟿(signals::(Any...), f::Callable) = lift(f, signals...) -# ⟿(signal, f::Callable) = lift(f, signal) -# function ⟿(signals:: Union{Any, (Any, Callable))...} -# last = signals[end] -# ss = [signals[1:end-1]..., last[1]] -# f = last[2] -# (ss...) ⟿ f -# end - -# [Fold](http://en.wikipedia.org/wiki/Fold_(higher-order_function)) over time. -# foldl can be used to reduce a signal updates to a signal of an accumulated value. -# -# Args: -# f: A function that takes its previously returned value as the first argument -# and the values of the signals as the second argument -# v0: initial value of the fold -# signals: as many signals as one less than the arity of f. -# Returns: -# A signal which updates when one of the argument signals update. -function foldl{T}(f, v0::T, signal::SignalSource, signals::SignalSource...; typ=T) - local a = v0 - lift((b...) -> a = f(a, b...), - signal, signals...; init=v0, typ=typ) -end - -function foldr{T}(f::Function, v0::T, signal::SignalSource, signals::SignalSource...; typ=T) - local a = v0 - lift((b...) -> a = f(b..., a), - signal, signals...; init=v0, typ=typ) -end - -# Keep only updates that return true when applied to a predicate function. -# -# Args: -# pred: a function of type that returns a boolean value -# v0: the value the signal should take if the predicate is not satisfied initially. -# s: the signal to be filtered -# Returns: -# A filtered signal -filter{T}(pred::Function, v0, s::Signal{T}) = Filter{T}(pred, v0, s) -filter(pred::Function, v0, s::SignalSource) = filter(pred, v0, signal(s)) - -# Drop updates when the first signal is true. -# -# Args: -# test: a Signal{Bool} which tells when to drop updates -# v0: value to be used if the test signal is true initially -# s: the signal to drop updates from -# Return: -# a signal which updates only when the test signal is false -dropwhen{T}(test::Signal{Bool}, v0, s::Signal{T}) = - DropWhen{T}(test, v0, s) - -# Sample from the second signal every time an update occurs in the first signal -# -# Args: -# s1: the signal to watch for updates -# s2: the signal to sample from when s1 updates -# Returns: -# a of the same type as s2 which updates with s1 -sampleon{T, U}(s1::Signal{T}, s2::Signal{U}) = SampleOn{T, U}(s1, s2) -sampleon(s1::SignalSource, s2::SignalSource) = sampleon(signal(s1), signal(s2)) - -# Merge multiple signals of the same type. If more than one signals -# update together, the first one gets precedence. -# -# Args: -# signals...: two or more signals -# Returns: -# a merged signal -merge(signals::Signal...) = Merge{join_eltype(signals...)}(signals) -merge(signals::SignalSource...) = merge(map(signal, signals)) - -# Drop repeated updates. To be used on signals of immutable types. -# -# Args: -# s: the signal to drop repeats from -# Returns: -# a signal with repeats dropped. -droprepeats{T}(s::Signal{T}) = DropRepeats{T}(s) -droprepeats(s::SignalSource) = droprepeats(signal(s)) - -function show{T}(io::IO, node::Signal{T}) - write(io, string("[$(typeof(node))] ", node.value)) -end - -# -# Flatten a signal of signal into a signal -# -# Args: -# ss: the signal of signals -# Returns: -# A signal -# -flatten(ss::Signal; typ=eltype(value(ss))) = - Flatten{typ}(ss) - -# -# `switch(f, switcher)` is the same as `flatten(lift(f, switcher))` -# -# Args: -# f: A function from `T` to `Signal` -# switcher: A signal of type `T` -# Returns: -# A flattened signal -# -switch(f, switcher; typ=eltype(switcher)) = - flatten(lift(f, switcher), typ=typ) -function writemime{T}(io::IO, m::MIME"text/plain", node::Signal{T}) - writemime(io, m, node.value) -end +include("core.jl") +include("operators.jl") +include("async.jl") +include("time.jl") -include("macros.jl") -include("timing.jl") -include("util.jl") +include("deprecation.jl") end # module diff --git a/src/async.jl b/src/async.jl new file mode 100644 index 0000000..2305b65 --- /dev/null +++ b/src/async.jl @@ -0,0 +1,50 @@ + +export remote_map, + async_map + +""" + tasks, results = async_map(f, init, input...;typ=typeof(init), onerror=Reactive.print_error) +Spawn a new task to run a function when input signal updates. Returns a signal of tasks and a `results` signal which updates asynchronously with the results. `init` will be used as the default value of `results`. `onerror` is the callback to be called when an error occurs, by default it is set to a callback which prints the error to STDERR. It's the same as the `onerror` argument to `push!` but is run in the spawned task. +""" +function async_map(f, init, inputs...; typ=typeof(init), onerror=print_error) + + node = Node(typ, init, inputs) + map(inputs...; init=nothing, typ=Any) do args... + outer_task = current_task() + @async begin + try + x = f(args...) + push!(node, x, onerror) + catch err + Base.throwto(outer_task, CapturedException(err, catch_backtrace())) + end + end + end, node +end + +""" + remoterefs, results = remote_map(procid, f, init, input...;typ=typeof(init), onerror=Reactive.print_error) + +Spawn a new task on process `procid` to run a function when input signal updates. Returns a signal of remote refs and a `results` signal which updates asynchronously with the results. `init` will be used as the default value of `results`. `onerror` is the callback to be called when an error occurs, by default it is set to a callback which prints the error to STDERR. It's the same as the `onerror` argument to `push!` but is run in the spawned task. +""" +function remote_map(procid, f, init, inputs...; typ=typeof(init), onerror=print_error) + + node = Node(typ, init, inputs) + + node = Node(typ, init, inputs) + map(inputs...; init=nothing, typ=Any) do args... + outer_task = current_task() + rref = @spawnat procid begin + f(args...) + end + @async begin + try + x = fetch(rref) + push!(node, x, onerror) + catch err + Base.throwto(outer_task, CapturedException(err, catch_backtrace())) + end + end + end, node +end + diff --git a/src/core.jl b/src/core.jl new file mode 100644 index 0000000..898b4f7 --- /dev/null +++ b/src/core.jl @@ -0,0 +1,226 @@ +import Base: push!, eltype, close +export Signal, push!, value, preserve, unpreserve, close + +##### Node ##### + +const debug_memory = false # Set this to true to debug gc of nodes + +const nodes = WeakKeyDict() +const io_lock = ReentrantLock() + +if !debug_memory + type Node{T} + value::T + parents::Tuple + actions::Vector + alive::Bool + preservers::Dict + end +else + type Node{T} + value::T + parents::Tuple + actions::Vector + alive::Bool + preservers::Dict + bt + function Node(v, parents, actions, alive, pres) + n=new(v,parents,actions,alive,pres,backtrace()) + nodes[n] = nothing + finalizer(n, log_gc) + n + end + end +end + +typealias Signal Node + +log_gc(n) = + @async begin + lock(io_lock) + print(STDERR, "Node got gc'd. Creation backtrace:") + Base.show_backtrace(STDERR, n.bt) + println(STDOUT) + unlock(io_lock) + end + +immutable Action + recipient::WeakRef + f::Function +end +isrequired(a::Action) = a.recipient.value != nothing && a.recipient.value.alive + +Node{T}(x::T, parents=()) = Node{T}(x, parents, Action[], true, Dict{Node, Int}()) +Node{T}(::Type{T}, x, parents=()) = Node{T}(x, parents, Action[], true, Dict{Node, Int}()) + +# preserve/unpreserve nodes from gc +""" + preserve(signal::Signal) + +prevents `signal` from being garbage collected as long as any of its parents are around. Useful for when you want to do some side effects in a signal. +e.g. `preserve(map(println, x))` - this will continue to print updates to x, until x goes out of scope. `foreach` is a shorthand for `map` with `preserve`. +""" +function preserve(x::Node) + for p in x.parents + p.preservers[x] = get(p.preservers, x, 0)+1 + preserve(p) + end + x +end + +""" + unpreserve(signal::Signal) + +allow `signal` to be garbage collected. See also `preserve`. +""" +function unpreserve(x::Node) + for p in x.parents + n = get(p.preservers, x, 0)-1 + if n <= 0 + delete!(p.preservers, x) + else + p.preservers[x] = n + end + unpreserve(p) + end + x +end + +Base.show(io::IO, n::Node) = + write(io, "Signal{$(eltype(n))}($(n.value), nactions=$(length(n.actions))$(n.alive ? "" : ", closed"))") + +value(n::Node) = n.value +value(::Void) = false +eltype{T}(::Node{T}) = T +eltype{T}(::Type{Node{T}}) = T + +##### Connections ##### + +function add_action!(f, node, recipient) + a = Action(WeakRef(recipient), f) + push!(node.actions, a) + a +end + +function remove_action!(f, node, recipient) + node.actions = filter(a -> a.f != f, node.actions) +end + +function close(n::Node, warn_nonleaf=true) + finalize(n) # stop timer etc. + n.alive = false + if !isempty(n.actions) + any(map(isrequired, n.actions)) && warn_nonleaf && + warn("closing a non-leaf node is not a good idea") + empty!(n.actions) + end +end + +function send_value!(node::Node, x, timestep) + # Dead node? + !node.alive && return + + # Set the value and do actions + node.value = x + for action in node.actions + do_action(action, timestep) + end +end +send_value!(wr::WeakRef, x, timestep) = wr.value != nothing && send_value!(wr.value, x, timestep) + +do_action(a::Action, timestep) = + isrequired(a) && a.f(a.recipient.value, timestep) + +# If any actions have been gc'd, remove them +cleanup_actions(node::Node) = + node.actions = filter(isrequired, node.actions) + + +##### Messaging ##### + +const CHANNEL_SIZE = 1024 + +# Global channel for signal updates +const _messages = Channel{Any}(CHANNEL_SIZE) + +""" +`push!(signal, value, onerror=Reactive.print_error)` + +Queue an update to a signal. The update will be propagated when all currently +queued updates are done processing. + +The third optional argument is a callback to be called in case the update +ends in an error. The callback receives 3 arguments: the signal, the value, +and a `CapturedException` with the fields `ex` which is the original exception +object, and `processed_bt` which is the backtrace of the exception. + +The default error callback will print the error and backtrace to STDERR. +""" +Base.push!(n::Node, x, onerror=print_error) = _push!(n, x, onerror) + +function _push!(n, x, onerror=print_error) + taken = Base.n_avail(_messages) + if taken >= CHANNEL_SIZE + warn("Message queue is full. Ordering may be incorrect.") + @async put!(_messages, (n, x, onerror)) + else + put!(_messages, (WeakRef(n), x, onerror)) + end + nothing +end +_push!(::Void, x, onerror=print_error) = nothing + +# remove messages from the channel and propagate them +global run +let timestep = 0 + function run(steps=typemax(Int)) + runner_task = current_task()::Task + iter = 1 + while iter <= steps + timestep += 1 + iter += 1 + + let message = take!(_messages) + node, value, onerror = message + try + send_value!(node, value, timestep) + catch err + if isa(err, InterruptException) + println("Reactive event loop was inturrupted.") + rethrow() + else + bt = catch_backtrace() + onerror(node, value, CapturedException(err, bt)) + end + end + end + end + end +end + +# Default error handler function +function print_error(node, value, ex) + lock(io_lock) + io = STDERR + println(io, "Failed to push!") + print(io, " ") + show(io, value) + println(io) + println(io, "to node") + print(io, " ") + show(io, node) + println(io) + showerror(io, ex) + println(io) + unlock(io_lock) +end + +# Run everything queued up till the instant of calling this function +run_till_now() = run(Base.n_avail(_messages)) + +# A decent default runner task +function __init__() + global runner_task = @async begin + Reactive.run() + end +end diff --git a/src/deprecation.jl b/src/deprecation.jl new file mode 100644 index 0000000..efe2f83 --- /dev/null +++ b/src/deprecation.jl @@ -0,0 +1,13 @@ +import Base: consume, foldl, call, @deprecate +export lift, consume, foldl, keepwhen, keepif, dropif, dropwhen + +@deprecate Input Signal + +@deprecate lift(f, s::Signal...; kwargs...) map(f,s...; kwargs...) +@deprecate consume(f::Union{Function, DataType}, s::Signal...;kwargs...) map(f, s...;kwargs...) +@deprecate foldl(f, x, s::Signal...;kwargs...) foldp(f, x, s...;kwargs...) +@deprecate keepwhen filterwhen +@deprecate keepif filter +@deprecate dropif(f, default, signal) filter(x -> !f(x), default, signal) +@deprecate dropwhen(predicate, x, signal) filterwhen(map(!, predicate), x, signal) +@deprecate call{T}(::Type{Node{T}}, x) Input(T, x) diff --git a/src/macros.jl b/src/macros.jl deleted file mode 100644 index ae01f8d..0000000 --- a/src/macros.jl +++ /dev/null @@ -1,85 +0,0 @@ -# evaluate as much of an expression as you can -function sub_val(x, m::Module) - try - eval(m, x) - catch MethodError e - return x - end -end - -sub_val(x::Symbol, m::Module) = eval(m, x) - -function sub_val(ex::Expr, m::Module) - if in(ex.head, [:call, :row, :vcat, :vect, :ref, :tuple, :cell1d, :(:)]) - ex.args = map(x->sub_val(x, m), ex.args) - elseif ex.head == :kw - ex.args[2] = sub_val(ex.args[2], m) - end - - # This bit is required for things like sampleon(x, y) to be - # turned into a single signal first and then used as input - # to the expression being lifted. - if ex.head == :call - try - eval(m, ex) - catch MethodError e - return ex - end - else - return ex - end -end - -function extract_signals!(ex, m::Module, dict::Dict{Any, Symbol}) - if applicable(signal, ex) - if haskey(dict, signal(ex)) - return dict[signal(ex)] - else - sym = gensym() - dict[signal(ex)] = sym - return sym - end - else - return ex - end -end - -function extract_signals!(ex::Expr, m::Module, dict::Dict{Any, Symbol}) - if in(ex.head, [:call, :row, :vcat, :vect, :ref, :tuple, :cell1d, :(:)]) - ex.args = map(x->extract_signals!(x, m, dict), ex.args) - elseif ex.head == :kw - ex.args[2] = extract_signals!(ex.args[2], m, dict) - end - ex -end - -function extract_signals(ex, m::Module) - dict = Dict{Any, Symbol}() - ex = extract_signals!(ex, m, dict) - return ex, dict -end - -# Convenience macro for calling `lift`. Evaluates an -# expression looking for signal values, and returns a -# signal whose values are that of the expression as the -# signals in it change. -# -# Args: -# expr: Expression -macro lift(ex) - ex = Expr(:quote, ex) - esc(quote - ex = Reactive.sub_val($ex, current_module()) - ex, sigs = Reactive.extract_signals(ex, current_module()) - args = Symbol[] - vals = Any[] - for (k, v) in sigs - push!(args, v) - push!(vals, k) - end - eval(current_module(), - Expr(:call, :lift, - Expr(:->, Expr(:tuple, args...), ex), - vals...)) - end) -end diff --git a/src/operators.jl b/src/operators.jl new file mode 100644 index 0000000..568570f --- /dev/null +++ b/src/operators.jl @@ -0,0 +1,302 @@ +import Base: map, merge, filter + +export map, + probe, + filter, + filterwhen, + foldp, + sampleon, + merge, + previous, + delay, + droprepeats, + flatten, + bind!, + unbind! + +""" + map(f, s::Signal...) -> signal + +Transform signal `s` by applying `f` to each element. For multiple signal arguments, apply `f` elementwise. +""" +function map(f, inputs::Node...; + init=f(map(value, inputs)...), typ=typeof(init)) + + n = Node(typ, init, inputs) + connect_map(f, n, inputs...) + n +end + +function connect_map(f, output, inputs...) + let prev_timestep = 0 + for inp in inputs + add_action!(inp, output) do output, timestep + if prev_timestep != timestep + result = f(map(value, inputs)...) + send_value!(output, result, timestep) + prev_timestep = timestep + end + end + end + end +end + +probe(node, name, io=STDERR) = + map(x -> println(io, name, " >! ", x), node) + +""" + foreach(f, inputs...) + +Same as `map`, but will be prevented from gc until all the inputs have gone out of scope. Should be used in cases where `f` does a side-effect. +""" +foreach(f, inputs...) = preserve(map(f, inputs...)) + +""" + filter(f, signal) + +remove updates from the signal where `f` returns `false`. +""" +function filter{T}(f::Function, default, input::Node{T}) + n = Node(T, f(value(input)) ? value(input) : default, (input,)) + connect_filter(f, default, n, input) + n +end + +function connect_filter(f, default, output, input) + add_action!(input, output) do output, timestep + val = value(input) + f(val) && send_value!(output, val, timestep) + end +end + +""" + filterwhen(switch::Signal{Bool}, default, input) + +Keep updates to `input` only when `switch` is true. + +If switch is false initially, the specified default value is used. +""" +function filterwhen{T}(predicate::Node{Bool}, default, input::Node{T}) + n = Node(T, value(predicate) ? value(input) : default, (input,)) + connect_filterwhen(n, predicate, input) + n +end + +function connect_filterwhen(output, predicate, input) + add_action!(input, output) do output, timestep + value(predicate) && send_value!(output, value(input), timestep) + end +end + +""" + foldp(f, init, input) + +[Fold](http://en.wikipedia.org/wiki/Fold_(higher-order_function)) over past values. + +Accumulate a value as the `input` signal changes. `init` is the initial value of the accumulator. +`f` should take 2 arguments: the current accumulated value and the current update, and result in the next accumulated value. +""" +function foldp(f::Function, v0, inputs...; typ=typeof(v0)) + n = Node(typ, v0, inputs) + connect_foldp(f, v0, n, inputs) + n +end + +function connect_foldp(f, v0, output, inputs) + let acc = v0 + for inp in inputs + add_action!(inp, output) do output, timestep + vals = map(value, inputs) + acc = f(acc, vals...) + send_value!(output, acc, timestep) + end + end + end +end + +""" + sampleon(a, b) + +Sample the value of `b` whenever `a` updates. +""" +function sampleon{T}(sampler, input::Node{T}) + n = Node(T, value(input), (sampler, input)) + connect_sampleon(n, sampler, input) + n +end + +function connect_sampleon(output, sampler, input) + add_action!(sampler, output) do output, timestep + send_value!(output, value(input), timestep) + end +end + + +""" + merge(input...) + +Merge many signals into one. Returns a signal which updates when +any of the inputs update. If many signals update at the same time, +the value of the *youngest* input signal is taken. +""" +function merge(inputs...) + @assert length(inputs) >= 1 + n = Node(typejoin(map(eltype, inputs)...), value(inputs[1]), inputs) + connect_merge(n, inputs...) + n +end + +function connect_merge(output, inputs...) + let prev_timestep = 0 + for inp in inputs + add_action!(inp, output) do output, timestep + # don't update twice in the same timestep + if prev_timestep != timestep + send_value!(output, value(inp), timestep) + prev_time = timestep + end + end + end + end +end + +""" + previous(input, default=value(input)) + +Create a signal which holds the previous value of `input`. +You can optionally specify a different initial value. +""" +function previous{T}(input::Node{T}, default=value(input)) + n = Node(T, default, (input,)) + connect_previous(n, input) + n +end + +function connect_previous(output, input) + let prev_value = value(input) + add_action!(input, output) do output, timestep + send_value!(output, prev_value, timestep) + prev_value = value(input) + end + end +end + +""" + delay(input, default=value(input)) + +Schedule an update to happen after the current update propagates +throughout the signal graph. + +Returns the delayed signal. +""" +function delay{T}(input::Node{T}, default=value(input)) + n = Node(T, default, (input,)) + connect_delay(n, input) + n +end + +function connect_delay(output, input) + add_action!(input, output) do output, timestep + push!(output, value(input)) + end +end + +""" + droprepeats(input) + +Drop updates to `input` whenever the new value is the same +as the previous value of the signal. +""" +function droprepeats{T}(input::Node{T}) + n = Node(T, value(input), (input,)) + connect_droprepeats(n, input) + n +end + +function connect_droprepeats(output, input) + let prev_value = value(input) + add_action!(input, output) do output, timestep + if prev_value != value(input) + send_value!(output, value(input), timestep) + prev_value = value(input) + end + end + end +end + +""" + flatten(input::Signal{Signal}; typ=Any) + +Flatten a signal of signals into a signal which holds the +value of the current signal. The `typ` keyword argument specifies +the type of the flattened signal. It is `Any` by default. +""" +function flatten(input::Node; typ=Any) + n = Node(typ, value(value(input)), (input,)) + connect_flatten(n, input) + n +end + +function connect_flatten(output, input) + let current_node = value(input), + callback = (output, timestep) -> begin + send_value!(output, value(value(input)), timestep) + end + + add_action!(callback, current_node, output) + + add_action!(input, output) do output, timestep + + # Move around action from previous node to current one + remove_action!(callback, current_node, output) + current_node = value(input) + add_action!(callback, current_node, output) + + send_value!(output, value(current_node), timestep) + end + end +end + +const _bindings = Dict() + +""" + bind!(a,b,twoway=true) + +for every update to `a` also update `b` with the same value and vice-versa. +To only bind updates from b to a, pass in a third argument as `false` +""" +function bind!(a::Node, b::Node, twoway=true) + + let current_timestep = 0 + action = add_action!(b, a) do a, timestep + if current_timestep != timestep + current_timestep = timestep + send_value!(a, value(b), timestep) + end + end + _bindings[a=>b] = action + end + + if twoway + bind!(b, a, false) + end +end + +""" + unbind!(a,b,twoway=true) + +remove a link set up using `bind!` +""" +function unbind!(a::Node, b::Node, twoway=true) + if !haskey(_bindings, a=>b) + return + end + + action = _bindings[a=>b] + a.actions = filter(x->x!=action, a.actions) + delete!(_bindings, a=>b) + + if twoway + unbind!(b, a, false) + end +end diff --git a/src/time.jl b/src/time.jl new file mode 100644 index 0000000..a209b87 --- /dev/null +++ b/src/time.jl @@ -0,0 +1,93 @@ +export every, fps, fpswhen, throttle + +""" + throttle(dt, input, f=(acc,x)->x, init=value(input), reinit=x->x) + +Throttle a signal to update at most once every dt seconds. By default, the throttled signal holds the last update in the time window. + +This behavior can be changed by the `f`, `init` and `reinit` arguments. The `init` and `f` functions are similar to `init` and `f` in `foldp`. `reinit` is called when a new throttle time window opens to reinitialize the initial value for accumulation, it gets one argument, the previous accumulated value. + +For example + y = throttle(0.2, x, push!, Int[], _->Int[]) +will create vectors of updates to the integer signal `x` which occur within 0.2 second time windows. + +""" +function throttle{T}(dt, node::Node{T}, f=(acc, x) -> x, init=value(node), reinit=x->x) + output = Node(init, (node,)) + throttle_connect(dt, output, node, f, init, reinit) + output +end + +# Aggregate a signal producing an update at most once in dt seconds +function throttle_connect(dt, output, input, f, init, reinit) + let collected = init, timer = Timer(x->x, 0) + add_action!(input, output) do output, timestep + collected = f(collected, value(input)) + close(timer) + timer = Timer(x -> begin push!(output, collected); collected=reinit(collected) end, dt) + end + end +end + +""" + every(dt) + +A signal that updates every `dt` seconds to the current timestamp. Consider using `fpswhen` or `fps` before using `every`. +""" +function every(dt) + n = Node(time(), ()) + every_connect(dt, n) + n +end + +function every_connect(dt, output) + outputref = WeakRef(output) + timer = Timer(x -> _push!(outputref, time(), ()->close(timer)), dt, dt) + finalizer(output, _->close(timer)) + output +end + +""" + fpswhen(switch, rate) + +returns a signal which when `switch` signal is true, updates `rate` times every second. If `rate` is not possible to attain because of slowness in computing dependent signal values, the signal will self adjust to provide the best possible rate. +""" +function fpswhen(switch, rate) + switch_ons = filter(x->x, false, switch) # only turn-ons + n = Node(Float64, 0.0, (switch, switch_ons,)) + fpswhen_connect(rate, switch, switch_ons, n) + n +end + +function setup_next_tick(outputref, switchref, dt, wait_dt) + if value(switchref.value) + Timer(t -> if value(switchref.value) + _push!(outputref, dt) + end, wait_dt) + end +end + +function fpswhen_connect(rate, switch, switch_ons, output) + let prev_time = time() + dt = 1.0/rate + outputref = WeakRef(output) + switchref = WeakRef(switch) + + for inp in [output, switch_ons] + add_action!(inp, output) do output, timestep + start_time = time() + setup_next_tick(outputref, switchref, start_time-prev_time, dt) + prev_time = start_time + end + end + + setup_next_tick(outputref, switchref, dt, dt) + end +end + +""" + fps(rate) + +Same as `fpswhen(Input(true), rate)` +""" +fps(rate) = fpswhen(Node(Bool, true, ()), rate) diff --git a/src/timing.jl b/src/timing.jl deleted file mode 100644 index df885b6..0000000 --- a/src/timing.jl +++ /dev/null @@ -1,102 +0,0 @@ -export every, fpswhen, fps, timestamp - -# Create a signal of timestamps that updates every delta seconds -# -# Args: -# delta: interval between updates. -# Returns: -# a periodically updating timestamp as a signal -function every(delta::Float64) - i = Input(time()) - t = @compat Timer(t->push!(i, time()), delta, delta) - return lift(identity, i) # prevent push! -end - -# Same as the fps function, but you can turn it on and off. -# The first time delta after a pause is always zero, no matter how long the pause was. -# -# Args: -# test: a switch signal of booleans to turn fps on or off -# freq: the maximum frequency at which fpswhen should update -# Returns: -# an signal of Float64 time deltas -function gate(wason_timer::(@compat Tuple{Bool, Timer}), ison::Bool, s::Input{Float64}, delta::Float64) - wason, timer = wason_timer - (!wason&&ison) && return (ison, Timer(x->push!(s, time()), 0, delta)) # start pushing again - (wason&&!ison) && (close(timer); return (ison, timer)) # stop it now! - (ison, timer) -end -function fpswhen(test::Signal{Bool}, freq) - delta = 1.0/freq - feedback = Input(time()) - time_signal = merge(feedback, lift(_->time(), keepwhen(test, false, test))) - timer = @compat Timer(x->value(test) && push!(feedback, time()), delta) - state = foldl((0.0,time()), time_signal) do prev, t - prev_t, _ = prev - @compat Timer(x->value(test) && push!(feedback, time()), delta) - t, (t - prev_t) - end - lift(x->x[2], state) -end -fpswhen(test, freq) = fpswhen(signal(test), freq) - -# Takes a desired number of frames per second and updates -# as quickly as possible at most the desired number of times a second. -# -# Args: -# freq: the desired fps -# Returns: -# a signal of time delta between two updates -function fps(freq) - return fpswhen(Input(true), float(freq)) -end - -# Timestamp a signal. -# -# Args: -# s: a signal to timestamp -# Returns: -# a signal of type (Float64, T) where the first element is the time -# at which the value (2nd element) got updated. -_timestamp(x) = (time(), x) -timestamp{T}(s::Signal{T}) = lift(_timestamp, s) -timestamp(s) = timestamp(signal(s)) - -# Collect signal updates into lists of updates within a given time -# period. -# -# Args: -# signal: a signal Signal{T} -# t: the time window -# Returns: -# A throttled signal of Signal{Vector[T]} -## type ThrottleNode{T} <: Node{Vector{T}} -## rank::UInt -## children::Vecto{Signal} -## signal::Signal{T} -## window::Float64 -## value::Vector{T} - -## function ThrottleNode(s::Signal{T}, t::Float64) -## node = new(Reactive.next_rank(), Signal[], s, window, [s.value]) -## Reactive.add_child!(s, node) -## end -## end -## function update{T}(s::ThrottleNode{T}, parent::Signal{T}) -## end - -## function throttle{T}(s::Signal{T}, t::Float64) -## i = Input([s.value]) -## if noBin exists -## createANewBin which will update the signal in t seconds. -## else -## add to bin -## end -## return i -## end - -# Remove this in a 0.2 release -module Timing -import Reactive: fps, fpswhen, every, timestamp -export fps, fpswhen, every, timestamp -end diff --git a/src/util.jl b/src/util.jl deleted file mode 100644 index 3c1ecf9..0000000 --- a/src/util.jl +++ /dev/null @@ -1,33 +0,0 @@ -function prev{T}(s::Signal{T}, first) - fst(x) = x[1] - folder(a, b) = (a[2], b) - lift(fst, - foldl(folder, (first, s.value), s)) -end - -function prev(s::Signal) - prev(s, s.value) -end - -# Drop updates if the predicate is true. Complement of filter. -# -# Args: -# pred: a predicate function -# v0: base value to be used if the predicate is satisfied initially -# s: the signal to drop updates on -# Returns: -# a filtered signal -dropif(pred::Function, v0, s::SignalSource) = filter(x->!pred(x), v0, s) - -# Keep only updates to the second signal only when the first signal is true. -# Complements dropwhen. -# -# Args: -# test: a Signal{Bool} which tells when to keep updates to s -# v0: base value to use if the signal is false initially -# s: the signal to filter -# Returns: -# a signal which updates only when the test signal is true -function keepwhen(test::Signal{Bool}, v0, s::SignalSource) - dropwhen(lift(!, test), v0, s) -end diff --git a/test/async.jl b/test/async.jl new file mode 100644 index 0000000..f103363 --- /dev/null +++ b/test/async.jl @@ -0,0 +1,17 @@ + +facts("Async") do + + context("async_map") do + x = Signal(1) + t, y = async_map(-, 0, x) + + @fact value(t) --> nothing + @fact value(y) --> 0 + + push!(x, 2) + step() + step() + + @fact value(y) --> -2 + end +end diff --git a/test/basics.jl b/test/basics.jl index c26a362..5a4bf61 100644 --- a/test/basics.jl +++ b/test/basics.jl @@ -2,139 +2,231 @@ using FactCheck using Reactive using Compat +step() = Reactive.run(1) +queue_size() = Base.n_avail(Reactive._messages) number() = round(Int, rand()*1000) - ## Basics facts("Basic checks") do - a = Input(number()) - b = lift(x -> x*x, a) + a = Signal(number()) + b = map(x -> x*x, a) - context("lift") do + context("map") do # Lift type - @fact typeof(b) --> Reactive.Lift{Int} + #@fact typeof(b) --> Reactive.Lift{Int} # type conversion push!(a, 1.0) - @fact b.value --> 1 + step() + @fact value(b) --> 1 # InexactError to be precise - @fact_throws InexactError push!(a, 1.1) + push!(a, 2.1, (n,x,err) -> @fact n --> a) + step() + + @fact value(b) --> 1 push!(a, number()) - @fact b.value --> a.value*a.value + step() + @fact value(b) --> value(a)^2 push!(a, -number()) - @fact b.value --> a.value*a.value + step() + @fact value(b) --> value(a)^2 ## Multiple inputs to Lift - c = lift(+, a, b, typ=Int) - @fact c.value --> a.value + b.value + c = map(+, a, b, typ=Int) + @fact value(c) --> value(a) + value(b) push!(a, number()) - @fact c.value --> a.value+b.value + step() + @fact value(c) --> value(a) + value(b) - push!(a, number()) - @fact c.value --> a.value+b.value + push!(b, number()) + step() + @fact value(c) --> value(a) + value(b) end context("merge") do ## Merge - d = Input(number()) + d = Signal(number()) e = merge(d, b, a) # precedence to d - @fact e.value --> d.value + @fact value(e) --> value(d) push!(a, number()) - # precedence to b over a - @fact e.value --> b.value + step() + # precedence to b over a -- a is older. + @fact value(e) --> value(a) + + c = map(_->_, a) # Make a younger than b + f = merge(d, c, b) + push!(a, number()) + step() + @fact value(f) --> value(c) end - context("foldl") do + context("foldp") do ## foldl over time push!(a, 0) - f = foldl(+, 0, a) + step() + f = foldp(+, 0, a) nums = round(Int, rand(100)*1000) - map(x -> push!(a, x), nums) + map(x -> begin push!(a, x); step() end, nums) - @fact sum(nums) --> f.value + @fact sum(nums) --> value(f) end context("filter") do # filter - g = Input(0) + g = Signal(0) pred = x -> x % 2 != 0 h = filter(pred, 1, g) + j = filter(x -> x % 2 == 0, 1, g) - @fact h.value --> 1 + @fact value(h) --> 1 + @fact value(j) --> 0 push!(g, 2) - @fact h.value --> 1 + step() + @fact value(h) --> 1 push!(g, 3) - @fact h.value --> 3 + step() + @fact value(h) --> 3 end context("sampleon") do # sampleon - g = Input(0) + g = Signal(0) push!(g, number()) - i = Input(true) + step() + i = Signal(true) j = sampleon(i, g) # default value - @fact j.value --> g.value - push!(g, g.value-1) - @fact j.value --> g.value+1 + @fact value(j) --> value(g) + push!(g, value(g)-1) + step() + @fact value(j) --> value(g)+1 push!(i, true) - @fact j.value --> g.value + step() + @fact value(j) --> value(g) end context("droprepeats") do # droprepeats - count = s -> foldl((x, y) -> x+1, 0, s) + count = s -> foldp((x, y) -> x+1, 0, s) - k = Input(1) + k = Signal(1) l = droprepeats(k) - @fact l.value --> k.value + @fact value(l) --> value(k) push!(k, 1) - @fact l.value --> k.value + step() + @fact value(l) --> value(k) push!(k, 0) - #println(l.value, " ", k.value) - @fact l.value --> k.value + step() + #println(l.value, " ", value(k)) + @fact value(l) --> value(k) m = count(k) n = count(l) seq = [1, 1, 1, 0, 1, 0, 1, 0, 0] - map(x -> push!(k, x), seq) + map(x -> begin push!(k, x); step() end, seq) - @fact m.value --> length(seq) - @fact n.value --> 6 + @fact value(m) --> length(seq) + @fact value(n) --> 6 end - context("dropwhen") do - # dropwhen - b = Input(true) - n = Input(1) - dw = dropwhen(b, 0, n) - @fact dw.value --> 0 + context("filterwhen") do + # filterwhen + b = Signal(false) + n = Signal(1) + dw = filterwhen(b, 0, n) + @fact value(dw) --> 0 push!(n, 2) - @fact dw.value --> 0 - push!(b, false) - @fact dw.value --> 0 + step() + @fact value(dw) --> 0 + push!(b, true) + step() + @fact value(dw) --> 0 push!(n, 1) - @fact dw.value --> 1 + step() + @fact value(dw) --> 1 push!(n, 2) - @fact dw.value --> 2 - dw = dropwhen(b, 0, n) - @fact dw.value --> 2 + step() + @fact value(dw) --> 2 + dw = filterwhen(b, 0, n) + @fact value(dw) --> 2 + end + + context("push! inside push!") do + a = Signal(0) + b = Signal(1) + map(x -> push!(a, x), b) + + @fact value(a) --> 0 + + step() + @fact value(a) --> 1 + + push!(a, 2) + step() + @fact value(a) --> 2 + @fact value(b) --> 1 + + push!(b, 3) + step() + @fact value(b) --> 3 + @fact value(a) --> 2 + + step() + @fact value(a) --> 3 + end + + context("previous") do + x = Signal(0) + y = previous(x) + @fact value(y) --> 0 + + push!(x, 1) + step() + + @fact value(y) --> 0 + + push!(x, 2) + step() + + @fact value(y) --> 1 + + push!(x, 3) + step() + + @fact value(y) --> 2 + @fact queue_size() --> 0 + end + + + context("delay") do + x = Signal(0) + y = delay(x) + @fact value(y) --> 0 + + push!(x, 1) + step() + + @fact queue_size() --> 1 + @fact value(y) --> 0 + + step() + @fact value(y) --> 1 end end diff --git a/test/call_count.jl b/test/call_count.jl index a86ac54..e59275a 100644 --- a/test/call_count.jl +++ b/test/call_count.jl @@ -1,19 +1,16 @@ -using FactCheck -using Reactive -using Compat -number() = round(Int, rand()*100) +number() = rand(0:100) facts("Call counting") do - a = Input(0) - b = Input(0) + a = Signal(0) + b = Signal(0) - c = lift(+, a, b) + c = map(+, a, b) d = merge(a, b) - e = lift(+, a, lift(x->2x, a)) # Both depend on a - f = lift(+, a, b, c, e) + e = map(+, a, map(x->2x, a)) # Both depend on a + f = map(+, a, b, c, e) - count = s -> foldl((x, y) -> x+1, 0, s) + count = s -> foldp((x, y) -> x+1, 0, s) ca = count(a) cb = count(b) @@ -24,7 +21,9 @@ facts("Call counting") do for i in 1:100 push!(a, number()) + step() push!(b, number()) + step() @fact ca.value --> i @fact cb.value --> i diff --git a/test/concurrency.jl b/test/concurrency.jl deleted file mode 100644 index c7a3669..0000000 --- a/test/concurrency.jl +++ /dev/null @@ -1,13 +0,0 @@ -using Base.Test -using Reactive - -a = Input(0) - -function crash(x) - push!(a, 1) -end - -facts("push! inside push!") do - b = lift(crash, a) - @fact_throws push!(a, 1) -end diff --git a/test/flatten.jl b/test/flatten.jl index 9fb9b4a..cf22b02 100644 --- a/test/flatten.jl +++ b/test/flatten.jl @@ -3,13 +3,13 @@ using Reactive facts("Flatten") do - a = Input(0) - b = Input(1) + a = Signal(0) + b = Signal(1) - c = Input(a) + c = Signal(a) d = flatten(c) - cnt = foldl((x, y) -> x+1, 0, d) + cnt = foldp((x, y) -> x+1, 0, d) context("Signal{Signal} -> flat Signal") do # Flatten implies: @@ -24,6 +24,7 @@ facts("Flatten") do context("Current signal updates") do push!(a, 2) + step() @fact value(cnt) --> 1 @fact value(d) --> value(a) @@ -31,14 +32,17 @@ facts("Flatten") do context("Signal swap") do push!(c, b) + step() @fact value(cnt) --> 2 @fact value(d) --> value(b) push!(a, 3) + step() @fact value(cnt) --> 2 @fact value(d) --> value(b) push!(b, 3) + step() @fact value(cnt) --> 3 @fact value(d) --> value(b) diff --git a/test/macro.jl b/test/macro.jl deleted file mode 100644 index 4865689..0000000 --- a/test/macro.jl +++ /dev/null @@ -1,43 +0,0 @@ -using Reactive -using FactCheck -using Compat - -a = Input(2) -b = @lift a^2 -facts("@lift") do - - context("@lift input expressions") do - - t1 = @lift (a,) - t2 = @lift (a, b) - l1 = @lift [a] - l2 = @lift [a, b] - c1 = @lift Any[a] - - push!(a, 3) - - @fact t1.value --> (a.value,) - @fact t2.value --> (a.value, b.value) - @fact l1.value --> [a.value] - @fact l2.value --> [a.value, b.value] - @fact c1.value --> Any[a.value] - end - context("@lift basics") do - @fact value(a)^2 --> value(b) - - push!(a, 3) - - @fact a.value^2 --> b.value - end - - # test use in a function - context("@lift inside a function") do - k = 3 - # f(a,b) = @lift a + b + 1 + k - - # z = f(a,b) - # push!(a, 4) - # @fact a.value^2 + a.value + 4 --> z.value - end - -end diff --git a/test/runtests.jl b/test/runtests.jl index a344a29..bed2be0 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,7 +1,16 @@ +using Reactive + +# Stop the runner task +try + println("Killing ", Reactive.runner_task) # the task switch caused here is required! + Base.throwto(Reactive.runner_task, InterruptException()) +catch +end + include("basics.jl") -include("trylift.jl") +#include("gc.jl") include("call_count.jl") -include("concurrency.jl") -include("macro.jl") include("flatten.jl") +include("time.jl") +include("async.jl") FactCheck.exitstatus() diff --git a/test/time.jl b/test/time.jl new file mode 100644 index 0000000..0e617bd --- /dev/null +++ b/test/time.jl @@ -0,0 +1,126 @@ + +facts("Timing functions") do + + context("fpswhen") do + b = Signal(false) + t = fpswhen(b, 10) + acc = foldp((x, y) -> x+1, 0, t) + sleep(0.14) + + @fact queue_size() --> 0 + push!(b, true) + + dt = @elapsed Reactive.run(11) # the first one starts the timer + push!(b, false) + Reactive.run(1) + + sleep(0.11) # no more updates + @fact queue_size() --> 0 + + @fact dt --> roughly(1, atol=0.2) # mac OSX needs a lot of tolerence here + @fact value(acc) --> 10 + + end + + context("fps") do + t = fps(10) + acc = foldp(push!, Float64[], t) + Reactive.run(11) # Starts with 0 + log = copy(value(acc)) + + @fact log[1] --> roughly(0.1, atol=0.1) # First one's always crappy + log = log[2:end] + + @fact sum(log) --> roughly(1.0, atol=0.05) + @fact [1/10 for i=1:10] --> roughly(log, atol=0.03) + + @fact queue_size() --> 0 + sleep(0.11) + @fact queue_size() --> 1 + sleep(0.22) + @fact queue_size() --> 1 + + close(acc) + close(t) + + Reactive.run_till_now() + end + + context("every") do + t = every(0.1) + acc = foldp(push!, Float64[], t) + Reactive.run(11) + end_t = time() + log = copy(value(acc)) + + @fact log[end-1] --> roughly(end_t, atol=0.001) + + close(acc) + close(t) + Reactive.run_till_now() + + @fact [0.1 for i=1:10] --> roughly(diff(log), atol=0.03) + + sleep(0.2) + # make sure close actually also closed the timer + @fact queue_size() --> 0 + end + + context("throttle") do + x = Signal(0) + y = throttle(0.1, x) + y′ = throttle(0.2, x, push!, Int[], x->Int[]) # collect intermediate updates + z = foldp((acc, x) -> acc+1, 0, y) + z′ = foldp((acc, x) -> acc+1, 0, y′) + + push!(x, 1) + step() + + push!(x, 2) + step() + + push!(x, 3) + t0=time() + step() + + @fact value(y) --> 0 + @fact value(z) --> 0 + @fact value(z′) --> 0 + @fact queue_size() --> 0 + + sleep(0.07) + + @fact value(y) --> 0 # update hasn't come in yet + @fact value(z′) --> 0 + @fact queue_size() --> 0 + sleep(0.03) + @fact queue_size() --> 1 + step() + @fact value(y) --> 3 + @fact value(z) --> 1 + sleep(0.1) + + @fact queue_size() --> 1 + step() + @fact value(z′) --> 1 + @fact value(y′) --> Int[1,2,3] + + push!(x, 3) + step() + + push!(x, 2) + step() + + push!(x, 1) + step() + sleep(0.2) + + @fact queue_size() --> 2 + step() + step() + @fact value(y) --> 1 + @fact value(z′) --> 2 + @fact value(y′) --> Int[3,2,1] + end +end + diff --git a/test/trylift.jl b/test/trylift.jl deleted file mode 100644 index d13184a..0000000 --- a/test/trylift.jl +++ /dev/null @@ -1,22 +0,0 @@ -facts("trylift") do - - context("trylift") do - - x = Input(1) - m = [1.0, 2.0, 3.1] - - y = trylift(i -> m[i], Int64, x) - @fact eltype(y) --> Reactive.Try{Int64} - @fact value(y) --> 1 - push!(x, 2) - @fact value(y) --> 2 - push!(x, 3) - @fact value(y) --> InexactError() - push!(x, 4) - if VERSION < v"0.4.0-dev+2374" - @fact value(y) --> BoundsError() - else - @fact value(y) --> BoundsError(m,(4,)) - end - end -end