Virtual Multi-valued Streams
or, "Pac-Man Continuations"
Qi Improvement Proposal (QIP)
Status: ACCEPTED
A detailed description of the design of the continuation-passing multi-valued stream implementation proposed for the Qi compiler, which naturally generalizes the existing single-valued implementation while preserving its performance characteristics. The generalization is achieved by threading a new return continuation through the stream to support returning to points where additional values should be produced in a single stream cycle, while maintaining local state as needed in each stream segment using mutable lexical variables. These mechanisms "unroll" the static stream, which is capable of producing zero or one value per cycle, into a longer dynamic stream capable of producing any number of values per cycle.
1 Background
Stream fusion is an approach to compiling functional sequence-oriented operations like map, filter and foldl. It represents the input as a stream of values rather than as a specific data structure such as a list, and then operates on these values, directly, through all of the stages of transformation. This effectively fuses these sequential operations into a single aggregate operation on the input values considered in each stream cycle, and thus avoids intermediate representations of the entire collection (for example, the construction of an intermediate list after map, after filter, and so on). The survey paper by Vincent St-Amour [St-Amour12] explains the motivation and development of this approach.
This optimization is essential in settings dealing with large datasets or in realtime applications where latency is critical. Additionally, because streams operate on values directly, they are agnostic to input and output data structures, making them a good candidate for optimizing Qi flows which, too, operate on values directly.
Qi currently implements this optimization, but it is restricted to list inputs and "linear" list-oriented operations, i.e., those consuming and producing at most one value at each step. Qi’s implementation of stream fusion employs continuation-passing style (CPS) to take advantage of efficient compilation of this pattern in Racket/Chez, where continuing via lambdas in tail position (rather than the usual programming pattern of function invocation and use of return values) effectively compiles to GOTOs (see "LAMBDA: The Ultimate GOTO"[Steele77]). In other words, CPS is very fast in Racket.
2 Guide
Overview summarizes the semantics of a stream. Static Architecture describes Qi’s stream implementation and the proposed modifications. A Stream Cycle describes the runtime processing of a single value by the stream, including the dynamic of "cycles" and "epicycles." Analysis analyzes each component of the stream’s operation in isolation. Reference Code provides a minimal and faithful end-to-end code example. Future Work describes extending the paradigm to multi-streams. Finally, Summary summarizes the proposal.
3 Overview
A stream is a computation expressed as a series of operations on values. These operations may each accept zero or more input values and produce zero or more output values. Most commonly, they accept and produce single values.
The semantics of each such operation is encapsulated in a stream segment. Any given segment is one of:
Producer
Transformer
Consumer
A producer generates values one at a time given some input parameters (e.g., an input list, which it destructures).
A transformer considers each value, or multiple values in succession, and yields zero or more values, one at a time (e.g., map or filter).
A consumer incorporates values one at a time in constructing the stream result (e.g., converting back to a list, or foldl to produce an aggregate value such as a sum).
Thus, a stream is made up of stream segments: a producer, followed by any number of transformers, followed by a consumer.
4 Static Architecture
This section describes the static construction of continuation-passing streams in the Qi compiler. Much of it describes Qi’s existing implementation already documented in the Wiki, but summarized below, while some of it describes the new proposal.
4.1 The "Dilated" Driver Loop
A representative example of something we’d like to do with streams is implement operations on lists. Typically, in order to process a list and, say, construct a new list, we would use structural recursion in a simple loop. With continuation-passing streams, we do something analogous, except that this driver loop is "dilated" across the producer and consumer instead of being a single expression in one place.
A simple stream that translates a list to a stream and then back again to a list illustrates this core, "dilated," driver loop:
(define (list→stream vs) (λ (done yield) (let go ([vs vs]) (if (null? vs) (done) (yield (car vs) (λ () (go (cdr vs)))))))) (define (stream→list previous) (previous (λ () null) (λ (v return) (cons v (return))))) (stream→list (list→stream (list 1 2 3)))
With a little squinting, it’s not hard to see that this is essentially just a decomposition of:
(define (list→stream→list vs) (if (null? vs) null (let ([v (car vs)] [vs (cdr vs)]) (cons v (list→stream→list vs))))) (list→stream→list (list 1 2 3))
The benefit of "dilation" is that it allows inserting any number of specialized components between the producer and consumer that operate directly on these produced values without building any intermediate data structures, and which seamlessly compose with one another by conforming to a simple interface.
4.2 Stream Segments
Generally speaking, a stream segment is a closure that accepts two arguments: the continuations done (called by any segment to terminate the stream) and yield (called to convey a value forward to the next stream segment). For example, this is the map transformer, which applies a function to each value:
(define (map f previous) (λ (done yield) (previous done (λ (v return) (yield (f v) return)))))
Each stream segment closes over additional arguments that parametrize its behavior. These arguments include, at a minimum, the previous stream segment (also a closure) [note that in the Qi source code, we use the word "next" for this, rather than "previous," which although true in an order-isomorphic sense, we avoid here because the binding refers to the preceding rather than the next segment in the stream!].
The two exceptions to these rules are, first, that consumers don’t close over any continuations, since they are the end of the stream, and are responsible for defining these ultimate continuations. For example, the consumer we saw earlier that translates the stream into a list:
(define (stream→list previous) (previous (λ () null) (λ (v return) (cons v (return)))))
And second, producers don’t close over a previous segment, since they are the beginning of the stream and are responsible for processing the input. For example, the producer we saw earlier that translates a list into a stream:
(define (list→stream vs) (λ (done yield) (let go ([vs vs]) (if (null? vs) (done) (yield (car vs) (λ () (go (cdr vs))))))))
Aside from these, the other arguments that parametrize each segment are specific to the semantics of the operation being performed. For instance, map closes over, in addition to previous, a function f to apply to each value, while take closes over a number n of values to keep. In the Qi compiler, we also capture and pass source syntax objects among these closed-over arguments, to aid error reporting.
When the stream is constructed, each segment, in turn, binds the previous segment’s done and yield continuations by wrapping the ones it receives from the next segment (which are ultimately defined in the consumer) with its own semantics.
When applied, the stream eagerly evaluates to a result.
4.3 Differences from the Existing Implementation
This section discusses some subtle differences from the reference implementation in the Qi compiler.
In the reference implementation, the loop driving stream evaluation is located in the consumer although parsing and analysis of the stream state is the responsibility of the producer. This necessitates having the (for example) structural recursion on the stream state in the consumer and passing the next state all the way back to the producer for analysis there, and the further need to once again thread the remaining stream state all the way back to the consumer. The proposal moves the driver loop to its natural place in the producer, avoiding the need to thread the stream state through the various segments. It brings the significant benefit that stream components can now be compiled to closures independently and not only as complete sequences, simplifying compilation.
The reference compiler implements segment-local state by consing it onto the stream state shared between segments, and unconsing it before use, entailing significant supporting macro infrastructure. The proposal instead uses mutable lexical variables that are locally managed by each stream segment, as this is simpler and preserves the reference performance characteristics while also trivially generalizing to nonlinear streams such as zip.
In the reference implementation, in addition to the done and yield continuations, there is a skip continuation that is used by any segment to skip the current value and recur to the next stream cycle. It expects one argument: the next stream state, as discussed above. The yield continuation expects two arguments — the current segment’s computed output value and, once again, the next stream state.
In addition to eliminating the need to thread the remaining input state through the stream segments, the proposal introduces one additional argument to yield — a return continuation, which bookmarks points to return to in each stream cycle and thus enables individual segments to yield multiple times (i.e., produce multiple values) before moving on to the next cycle. As the return continuation enables resuming from any earlier point in the stream and not only from the very beginning, it generalizes the capability of the former skip continuation which is therefore eliminated in this proposal.
5 A Stream Cycle
A single stream cycle is defined as the processing of a single value produced from the input state at runtime. This section describes a stream cycle.
As described in the previous section, a stream is a nest of closures, with the producer of the stream being innermost, and transformers forming successive wrapping layers, and the consumer being outermost.
When the stream is evaluated, the consumer defines the previous segment’s done and yield continuations, which in turn does the same, and, in this manner, the stream continuations are defined all the way to the producer, kicking off eager evaluation of the stream.
At this point, the producer first analyzes the input state (which it closes over) to determine whether the stream is exhausted, in which case it signals termination of the stream to the consumer. Otherwise, it produces (1) a value, and (2) a lambda (i.e., the aforementioned "return continuation") that defers recursion on the stream state.
Crucially, in either of these cases, these aren’t conveyed to subsequent (i.e., wrapping) stream segments as return values, but rather, as arguments to the continuations that were received from them at the time of construction of the present segment: either the done continuation (in the former case of stream exhaustion) or the yield continuation (in the latter case of producing a value and a return continuation).
If at any point thereon a segment (typically a transformer) decides that a value should be excluded from the result (e.g., filter), it calls the return continuation instead of calling yield, resuming production of the next value at an earlier point in the stream (often the producer).
These continuation invocations are tail calls, that is, their return values aren’t used in the calling frame, so these frames may be ignored and are in practice eliminated by the Racket/Chez compiler (via tail-call elimination), and we can just follow the computation "forward" all the way through the series of stream segments to the consumer, rather than needing to unwind the stack.
In the case of a yield, the produced value will be processed by all succeeding segments in the stream in the present cycle, yielding zero or more output values or even early termination of the stream (as determined by the semantics of each stream segment), while the return continuation is conveyed unchanged unless a segment intends to yield multiple times. In the common case where all stream segments yield single values, the return continuation is only invoked in the consumer which resumes evaluation at the point in the producer where recursion on the remaining stream state had been deferred. This begins the next stream cycle, which then proceeds in exactly the way described in the present section.
When the producer determines that the stream is exhausted, the stream evaluates to a result determined by the consumer.
5.1 Epicycles
In the reference implementation, control always moves sequentially through segments in each cycle, and returns to the producer upon reaching the consumer, beginning the next cycle. With the introduction of the return continuation, any segment may return to any earlier bookmarked point in the stream, continuing forward through the stream once again from there, producing additional values mid-stream in the current cycle. These recurrences within the current cycle form smaller "epicycles" which may be nested. They facilitate yielding multiple values (e.g., the dup transformer that duplicates each input value), and also, symmetrically, accumulating multiple values (e.g., the pair transformer that collects input values into pairs before yielding).
When all intermediate return points have been fulfilled, evaluation naturally returns to the producer on the final return (whether from the consumer or any other segment), starting the next stream cycle, as before.
6 Analysis
The previous section described the holistic operation of the stream.
In this section, we examine the specific components in isolation, describing their mechanics and how they facilitate overall stream evaluation.
6.1 The yield Continuation
When a stream segment wishes to pass a computed value forward to the next stream segment, it calls the yield continuation. In this invocation, it must forward:
A value (exactly one).
The return continuation for any pending work.
If the segment wishes to yield a single value, it simply does so, and forwards the received return continuation, unchanged (e.g., see map).
As the continuation can yield precisely one value, we need some way to not yield if we wish to skip a value, and to yield multiple times in the present stream cycle if we wish to yield multiple values from a single stream segment. That’s where the new return continuation comes in.
6.2 The return Continuation
The return continuation enables any stream segment to either skip a value (e.g., filter), or yield multiple values (e.g., dup), or accumulate multiple values (e.g., pair) in a single stream cycle.
It "bookmarks" a point in the stream to return to and continue forward from, forming an epicycle within the containing stream cycleThis dynamic is the reason the approach is referred to as "Pac-Man streams," as it is reminiscent of Pac-Man moving past the edge of the screen on the right and emerging from somewhere on the left while continuing to consume values, in the classic arcade game..
For example, the filter transformer uses return to skip certain values:
(define (filter f previous) (λ (done yield) (previous done (λ (v return) (if (f v) (yield v return) (return))))))
And the dup transformer uses return to yield multiple values:
(define (dup previous) (λ (done yield) (previous done (λ (v return) (yield v (λ () (yield v return)))))))
Some general remarks on the return continuation:
The return continuation is a thunk resembling (λ () ... next steps ...).
A segment defines a return continuation when it wishes to yield multiple times. A segment enters this continuation when it wishes to skip the present value or when it wishes to accumulate additional values in local state.
The producer always defines a return continuation (e.g., see list→stream), as evaluation must return here in order to produce each additional stream value from the input state.
If there are multiple return continuations defined, they "stack" — a later one shadows an earlier one as long as it is still yielding, and then conveys the a priori one forward when yielding the final time.
The consumer always enters the return continuation after incorporating a value received via yield.
6.2.1 Virtual Length of the Stream
The return continuation has the effect of "virtually" extending the stream, unrolling recurrence into a longer stream from the perspective of the values flowing through it. If each stream segment i yields kᵢ values, then a stream of length N appears to be of virtual length bounded above by N × k₁ × k₂ × ... × kn.
6.3 The done continuation
Any stream segment may signal the conclusion of the stream via the done continuation.
The done continuation is a thunk accepting no arguments.
It must be defined by the consumer, as this is where the result of stream evaluation is determined.
At construction time, stream segments typically forward done unchanged all the way from the consumer back to the producer, so that it short-circuits all the way to the consumer at any point in the stream.
In cases where transformers accumulate multiple values before yielding a result, they may choose to override the done continuation to, for example, flush the pending output by yielding it. In this case, they assume responsibility for (1) defining a return continuation if multiple values are to be yielded, and (2) calling the received done at the end of their operation, as the producer and all preceding stream segments have already concluded their operation and have signed off on the stream being "done," and the consumer still needs, ultimately, to be notified of this.
6.4 Styles of Consumer
Consumers like foldr process the stream from left to right and are not tail-recursive in the "dilated" driver loop. But others like foldl are tail-recursive and accumulate the intermediate result in a mutable lexical variable local to the consumerAs an alternative to local state, we could pass the accumulated result back through the return continuation and cycle it back and forth through the entire stream as an immutable binding, but since it is determined entirely by the consumer, it is simpler to retain it in the consumer than to introduce conventions around (not) handling an unused variable in other stream segments, and has similar performance characteristics..
As with ordinary recursion, the tail-recursive consumers exhibit better performance as they avoid growing the call stack.
6.4.1 Non-tail-recursive Consumer
The recursive call resembles, e.g., (cons v (return)).
6.4.2 Tail-recursive Consumer
The recursive call resembles, e.g., (begin (set! result (cons v result)) (return)).
6.5 State
Each stream segment may retain local state across the handling of values. For instance, take needs to remember how many values remain to be yielded before truncating the rest of the stream. And pair needs to buffer pending values before yielding pairs of them together as a single cons pair or list value.
State of this kind that is local to a stream segment is maintained simply as a mutable lexical variable in the segment itself. For example:
(define (take n previous) (let ([remaining n]) (λ (done yield) (previous done (λ (v return) (if (> remaining 0) (begin (set! remaining (sub1 remaining)) (yield v return)) (done)))))))
7 Reference Code
The code block below contains a runnable end-to-end stream, minimally and faithfully illustrating the approach described by this document.
producer produces a stream from a list.
dup-transformer is a transformer that duplicates each input value, yielding two values for each (thus serving to illustrate a multi-valued transformer).
pair-transformer is a transformer that groups values into pairs, yielding a single list value for every two input values (thus serving to illustrate buffering of values in local state). For an odd number of input values, the last value yields a singleton list.
rconsumer is a non-tail-recursive consumer (like foldr) that simply constructs a list from the stream.
lconsumer is a tail-recursive consumer (like foldl) that simply constructs a list from the stream.
(define (producer vs) (λ (done yield) (let go ([vs vs]) (if (null? vs) (done) (yield (car vs) (λ () (go (cdr vs)))))))) (define (dup-transformer previous) (λ (done yield) (previous done (λ (v return) (yield v (λ () (yield v return))))))) (define (pair-transformer previous) (let ([buffer #false]) (λ (done yield) (previous (λ () (if buffer (yield (list buffer) done) done)) (λ (v return) (if buffer (let ([buffered-v buffer]) (set! buffer #false) (yield (list buffered-v v) return)) (begin (set! buffer v) (return)))))))) (define (rconsumer init previous) (previous (λ () init) (λ (v return) (cons v (return))))) (define (lconsumer init previous) (let ([result init]) (previous (λ () result) (λ (v return) (set! result (cons v result)) (return))))) (rconsumer null (dup-transformer (producer (list 1 2 3)))) (lconsumer null (dup-transformer (producer (list 1 2 3)))) (rconsumer null (pair-transformer (producer (list 1 2 3))))
8 Future Work
The present proposal describes streams where each segment has a single input stream and a single output stream, though these input and output streams may produce multiple values (or no values) from each input value.
There are cases where we may wish to consume multiple input streams and produce multiple output streams. An example of the former is the zip operation which merges two streams into a single stream using a specified binary operation. An example of the latter is unzip, which splits a stream into two streams using a specified function or pair of functions.
We include below a sample implementation of zip and unzip. For the single input and output streams that are the subject of this proposal, our implementation performs comparably to hand-written recursions or specialized for-based implementations. However, for multi-streams, the implementations below are significantly slower than using specialized recursion or for forms, while still being faster than using naive Racket list operations. It will be useful to understand the reasons for this, and whether the current approach could be generalized to achieve efficient multi-streams.
(define (zip-transformer left right) (let ([left-return #false] [right-return #false] [left-v #false]) (λ (done yield) (left done (λ (v return) (set! left-return return) (set! left-v v) (if right-return (right-return) (right done (λ (v return) (set! right-return return) (yield (cons left-v v) left-return))))))))) (define (unzip-transformer previous) (let ([left (λ (done yield) (previous done (λ (v return) (yield (car v) return))))] [right (λ (done yield) (previous done (λ (v return) (yield (cdr v) return))))]) (values left right))) (rconsumer null (zip-transformer (producer '(a b c)) (producer '(1 2 3)))) (call-with-values (λ () (unzip-transformer (producer '((a . 1) (b . 2) (c . 3))))) (λ (left right) (values (rconsumer null left) (rconsumer null right))))
These implementations can be generalized to the variadic case by accumulating values from each input stream in succession and using a growable vector to keep track of changing returns in each of them, as seen in the accompanying proof-of-concept code.
9 Summary
We described in detail Qi’s continuation-passing stream fusion implementation, including a natural generalization to support multiple values while preserving the performance characteristics of the single-valued implementation. This covered the stream’s construction as a nest of closures; the use of done and yield continuations to implement each cycle of the stream’s semantics; the addition of the return continuation which introduces "epicycles" within a cycle to support multiple yields; the distinct roles and responsibilities of the producer, each transformer, and the consumer, and how these make use of local state to express diverse semantics including counting, and accumulation of values before yielding; the subtle distinction between tail-recursive and non-tail-recursive consumers and implications for performance; and the stream’s eager evaluation to a result. Finally, we considered further generalization of the approach to support multi-streams.
Bibliography
| [St-Amour12] | Vincent St-Amour, “Deforestation.” 2012. https://www.ccs.neu.edu/home/amal/course/7480-s12/deforestation-notes.pdf | |
| [Steele77] | Guy Lewis Steele Jr., “LAMBDA: The Ultimate GOTO.” 1977. https://www2.cs.sfu.ca/CourseCentral/383/havens/pubs/lambda-the-ultimate-goto.pdf |