module Concurrent::Stream
Overview
Influenced by Ruby parallel and Java streams.
Creating a stream:
- Channel#parallel creates a Stream::Source reading from the Channel.
- Enumerable#parallel creates a Channel and Stream::Source reading from it.
Stream operations:
- #map { } - Same as Enumerable#map but runs in a fiber pool.
- #select { } - Same as Enumerable#select but runs in a fiber pool.
- #batch(size) { } - Groups results in to chunks up to the given size.
- #flatten() - TODO
- #run { } - Runs block in a fiber pool. Further processing is not possible except for #wait.
- #tee { } - Runs block in a fiber pool passing the original message to the next Stream.
- #errors { } - Runs accumulated errors through block in a fiber pool
- #serial - returns an Enumerable collecting results from a parallel Stream.
Final results and error handling
All method chains should end with #wait, #serial, or #to_a all of which gather errors and end parallel processing. You may omit calling #wait when using #run for background tasks where completion is not guaranteed. When used in this fashion make sure to catch all exceptions in the run block or the internal exception channel may fill. causing the entire pipeline to stop.
Error behavior
Errors are passed through to the end of stream or until the first errors call.
[#serial
, #to_a
, or #run
+ #wait
] will raise on the first error encountered
closing the pipeline behind it recursively.
This may raise Channel::ClosedError
in your source
Error handling
Option 1
If the entire pipeline must succeed or fail early:
Use #serial
, #to_a
, or #run
+ #wait
at the end of the pipeline to receive the first error.
Option 2
Make sure there are no errors by rescuing within any blocks.
Option 3
- The first
errors()
call receives errors for firstselect
andmap
. - The 2nd
errors
call receives errors for the 2ndmap
. to_a
is guaranteed to succeed (although it may be empty) because all errors were handled
src.parallel.select { raise_randomly }.map { raise_randomly }.errors { |ex, obj| }.map { raise_randomly }.errors { }.to_a
Option 4
Use a 3rd party Result type and never raise in any block
EXPERIMENTAL
Defined in:
concurrent/stream.crConstant Summary
-
Log =
::Log.for(self)