module Concurrent::Stream

Overview

Influenced by Ruby parallel and Java streams.

Creating a stream:

Stream operations:

Final results and error handling

All method chains should end with #wait, #serial, or #to_a all of which gather errors and end parallel processing. You may omit calling #wait when using #run for background tasks where completion is not guaranteed. When used in this fashion make sure to catch all exceptions in the run block or the internal exception channel may fill. causing the entire pipeline to stop.

Error behavior

Errors are passed through to the end of stream or until the first errors call. [#serial, #to_a, or #run + #wait] will raise on the first error encountered closing the pipeline behind it recursively. This may raise Channel::ClosedError in your source

Error handling

Option 1

If the entire pipeline must succeed or fail early: Use #serial, #to_a, or #run + #wait at the end of the pipeline to receive the first error.

Option 2

Make sure there are no errors by rescuing within any blocks.

Option 3

src.parallel.select { raise_randomly }.map { raise_randomly }.errors { |ex, obj| }.map { raise_randomly }.errors { }.to_a

Option 4

Use a 3rd party Result type and never raise in any block

EXPERIMENTAL

Defined in:

concurrent/stream.cr

Constant Summary

Log = ::Log.for(self)