|
| 1 | +# Streaming |
| 2 | + |
| 3 | +Dagger tasks have a limited lifetime - they are created, execute, finish, and |
| 4 | +are eventually destroyed when they're no longer needed. Thus, if one wants |
| 5 | +to run the same kind of computations over and over, one might re-create a |
| 6 | +similar set of tasks for each unit of data that needs processing. |
| 7 | + |
| 8 | +This might be fine for computations which take a long time to run (thus |
| 9 | +dwarfing the cost of task creation, which is quite small), or when working with |
| 10 | +a limited set of data, but this approach is not great for doing lots of small |
| 11 | +computations on a large (or endless) amount of data. For example, processing |
| 12 | +image frames from a webcam, reacting to messages from a message bus, reading |
| 13 | +samples from a software radio, etc. All of these tasks are better suited to a |
| 14 | +"streaming" model of data processing, where data is simply piped into a |
| 15 | +continuously-running task (or DAG of tasks) forever, or until the data runs |
| 16 | +out. |
| 17 | + |
| 18 | +Thankfully, if you have a problem which is best modeled as a streaming system |
| 19 | +of tasks, Dagger has you covered! Building on its support for |
| 20 | +[Task Queues](@ref), Dagger provides a means to convert an entire DAG of |
| 21 | +tasks into a streaming DAG, where data flows into and out of each task |
| 22 | +asynchronously, using the `spawn_streaming` function: |
| 23 | + |
| 24 | +```julia |
| 25 | +Dagger.spawn_streaming() do # enters a streaming region |
| 26 | + vals = Dagger.@spawn rand() |
| 27 | + print_vals = Dagger.@spawn println(vals) |
| 28 | +end # exits the streaming region, and starts the DAG running |
| 29 | +``` |
| 30 | + |
| 31 | +In the above example, `vals` is a Dagger task which has been transformed to run |
| 32 | +in a streaming manner - instead of just calling `rand()` once and returning its |
| 33 | +result, it will re-run `rand()` endlessly, continuously producing new random |
| 34 | +values. In typical Dagger style, `print_vals` is a Dagger task which depends on |
| 35 | +`vals`, but in streaming form - it will continuously `println` the random |
| 36 | +values produced from `vals`. Both tasks will run forever, and will run |
| 37 | +efficiently, only doing the work necessary to generate, transfer, and consume |
| 38 | +values. |
| 39 | + |
| 40 | +As the comments point out, `spawn_streaming` creates a streaming region, during |
| 41 | +which `vals` and `print_vals` are created and configured. Both tasks are halted |
| 42 | +until `spawn_streaming` returns, allowing large DAGs to be built all at once, |
| 43 | +without any task losing a single value. If desired, streaming regions can be |
| 44 | +connected, although some values might be lost while tasks are being connected: |
| 45 | + |
| 46 | +```julia |
| 47 | +vals = Dagger.spawn_streaming() do |
| 48 | + Dagger.@spawn rand() |
| 49 | +end |
| 50 | + |
| 51 | +# Some values might be generated by `vals` but thrown away |
| 52 | +# before `print_vals` is fully setup and connected to it |
| 53 | + |
| 54 | +print_vals = Dagger.spawn_streaming() do |
| 55 | + Dagger.@spawn println(vals) |
| 56 | +end |
| 57 | +``` |
| 58 | + |
| 59 | +More complicated streaming DAGs can be easily constructed, without doing |
| 60 | +anything different. For example, we can generate multiple streams of random |
| 61 | +numbers, write them all to their own files, and print the combined results: |
| 62 | + |
| 63 | +```julia |
| 64 | +Dagger.spawn_streaming() do |
| 65 | + all_vals = [Dagger.spawn(rand) for i in 1:4] |
| 66 | + all_vals_written = map(1:4) do i |
| 67 | + Dagger.spawn(all_vals[i]) do val |
| 68 | + open("results_$i.txt"; write=true, create=true, append=true) do io |
| 69 | + println(io, repr(val)) |
| 70 | + end |
| 71 | + return val |
| 72 | + end |
| 73 | + end |
| 74 | + Dagger.spawn(all_vals_written...) do all_vals_written... |
| 75 | + vals_sum = sum(all_vals_written) |
| 76 | + println(vals_sum) |
| 77 | + end |
| 78 | +end |
| 79 | +``` |
| 80 | + |
| 81 | +If you want to stop the streaming DAG and tear it all down, you can call |
| 82 | +`Dagger.cancel!.(all_vals)` and `Dagger.cancel!.(all_vals_written)` to |
| 83 | +terminate each streaming task. In the future, a more convenient way to tear |
| 84 | +down a full DAG will be added; for now, each task must be cancelled individually. |
| 85 | + |
| 86 | +Alternatively, tasks can stop themselves from the inside with |
| 87 | +`finish_streaming`, optionally returning a value that can be `fetch`'d. Let's |
| 88 | +do this when our randomly-drawn number falls within some arbitrary range: |
| 89 | + |
| 90 | +```julia |
| 91 | +vals = Dagger.spawn_streaming() do |
| 92 | + Dagger.spawn() do |
| 93 | + x = rand() |
| 94 | + if x < 0.001 |
| 95 | + # That's good enough, let's be done |
| 96 | + return Dagger.finish_streaming("Finished!") |
| 97 | + end |
| 98 | + return x |
| 99 | + end |
| 100 | +end |
| 101 | +fetch(vals) |
| 102 | +``` |
| 103 | + |
| 104 | +In this example, the call to `fetch` will hang (while random numbers continue |
| 105 | +to be drawn), until a drawn number is less than 0.001; at that point, `fetch` |
| 106 | +will return with `"Finished!"`, and the task `vals` will have terminated. |
0 commit comments