module Fluxite::PipeOut(T)

Direct including types

Defined in:

fluxite/pipeout.cr

Instance Method Summary

Instance Method Detail

def after(other : IFanout(U)) forall U #

[View source]
def batch(&fn : Array(T), T -> Cut) #

[View source]
def batch(n : Int) #

Emits batches of n incoming objects. Waits until the entire batch is collected, and only then emits it. Then starts fresh from a new empty batch.

The emitted batch array is fully yours. You can read/mutate it however you want.

xs = Fluxite::Port(Int32).new
xs.batch(3).each { |triple| p! triple }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
Fluxite[xs, 5]
Fluxite[xs, 6]
Fluxite[xs, 7]

# STDOUT:
#   triple # => [1, 2, 3]
#   triple # => [4, 5, 6]

[View source]
def before(other : IFanout(U)) forall U #

[View source]
def blast(&fn : T -> Enumerable(U)) forall U #

Forwards incoming objects to fn, emits each element from the enumerable returned by fn.

xs = Fluxite::Port(String).new
xs.blast(&.chars).squash.each { |ch| p! ch }

Fluxite[xs, "helloo"]

# STDOUT:
#   ch # => 'h'
#   ch # => 'e'
#   ch # => 'l'
#   ch # => 'o'

[View source]
def compact_map(&fn : T -> U | Nil) forall U #

Similar to #map, but skips .nil? return values of fn (so false is still emitted).

xs = Fluxite::Port(Int32).new
xs.compact_map { |x| x.even? ? "#{x} even" : nil }.each { |x| p! x }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]

# STDOUT:
#   x # => "2 even"
#   x # => "4 even"

[View source]
def cord : Cord(T) #

Creates and returns a cord passing objects emitted by self to a new port. You can use Cord#output to obtain this new port, for instance to attach some units to it.

Note that the cord is disabled by default.

xs = Fluxite.port(Int32)

cord = xs.cord
cord.output
  .select(&.even?)
  .each { |n| p! n }

Fluxite[xs, 2, 3, 4]
# Nothing is printed! The cord is disabled by default.

cord.enable

Fluxite[xs, 2, 3, 4]

# STDOUT:
#   n # => 2
#   n # => 4

cord.disable

Fluxite[xs, 2, 3, 4]
# Nothing again. But this time since we've disabled the cord manually.

See Cord and its methods to learn more.


[View source]
def cord(*, to other : Port(T)) : Cord(T) #

Similar to #cord but lets you specify the other port other manually. See #cord for details and examples.


[View source]
def during(gate : IFanout(Bool)) #

[View source]
def each(&fn : T -> ) #

Attaches a terminal function fn which consumes data but does not emit any.

xs = Fluxite::Port(Int32).new
xs.each { |x| p! x }

Fluxite[xs, 100]
Fluxite[xs, 200]
Fluxite[xs, 300]

# STDOUT:
#   x # => 100
#   x # => 200
#   x # => 300

[View source]
def forward(cls : U.class, &fn : T, Forward::Feed(U) -> ) forall U #

[View source]
def gate(*, by other : IFanout(Bool)) #

[View source]
def into(other : IFanout(U), as cls : U.class) : self forall U #

[View source]
def into(other : IMailbox(T)) #

[View source]
def into(other : IMailbox(U)) forall U #

[View source]
def into(other : IFanout(U), &fn : T -> U) : self forall U #

[View source]
def into(receiver, &fn : T -> U) : Nil forall T, U #

Transforms incoming objects using fn and appends them to receiver.

receiver must respond to <<(object) where object is the result of fn. So all of IO, Array, Set, Deque etc. are supported by this method.

squares = [] of Int32

xs = Fluxite::Port(Int32).new
xs.select(&.even?).into(squares) { |n| n ** 2 }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]

p! squares

# STDOUT:
#   squares # => [4, 16]

[View source]
def into(&fn : T -> Enumerable(IMailbox(T))) : Nil #

[View source]
def into(receiver) : Nil #

Appends incoming objects to receiver.

receiver must respond to <<(object) where object the incoming object. So all of IO, Array, Set, Deque etc. are supported by this method.

cubes = [] of Int32

xs = Fluxite::Port(Int32).new
xs.select(&.odd?).map { |n| n ** 3 }.into(cubes)

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
Fluxite[xs, 5]

p! cubes

# STDOUT:
#   cubes # => [1, 27, 125]

[View source]
def map(&fn : T -> U) forall U #

Emits data transformed using fn.

xs = Port(Int32).new
xs.map(&.chr).each { |ch| p! ch }

Fluxite[xs, 100]
Fluxite[xs, 102]
Fluxite[xs, 103]

# STDOUT:
#   ch # => 'd'
#   ch # => 'f'
#   ch # => 'g'

[View source]
def map(cls : U.class) forall U #

If U.class responds to [] (treated as a smart constructor), emits U[object], otherwise, emits U.new(object), where object is each incoming object.

record Var, id : UInt32

xs = Fluxite::Port(UInt32).new
xs.each { |x| p! x }
xs.map(Var).each { |var| p! var }

Fluxite[xs, 100u32]
Fluxite[xs, 200u32]
Fluxite[xs, 300u32]

# STDOUT:
#   x   # => 100
#   var # => Var(@id=100)
#   x   # => 200
#   var # => Var(@id=200)
#   x   # => 300
#   var # => Var(@id=300)

[View source]
def map(*layout : *U) forall U #

Similar to #map(cls : U.class), but performs elementwise conversion as described by layout.

record Foo, x : Int32
record Bar, x : String
record Baz, x : Bool

xs = Fluxite::Port({Int32, String, Bool}).new
xs.map(Foo, Bar, Baz).each { |ys| p! ys }

Fluxite[xs, {100, "hello", true}]
Fluxite[xs, {200, "world", false}]

# STDOUT:
#   ys # => {Foo(@x=100), Bar(@x="hello"), Baz(@x=true)}
#   ys # => {Foo(@x=200), Bar(@x="world"), Baz(@x=false)}

[View source]
def only(*args, **kwargs, &fn : T -> Bool) #

Alias of #select for when you cannot use #select (as it is a Crystal keyword).


[View source]
def only(*args, **kwargs) #

Alias of #select for when you cannot use #select (as it is a Crystal keyword).


[View source]
def or(other : IFanout(U)) forall U #

Combines emission of self and other.

xs = Fluxite::Port(Symbol).new
ys = Fluxite::Port(Int32).new

xs.or(ys).each { |common| p! common }

Fluxite[xs, :foo]
Fluxite[ys, 200]
Fluxite[ys, 300]
Fluxite[xs, :bar]

# STDOUT:
#   common # => :foo
#   common # => 200
#   common # => 300
#   common # => :bar

[View source]
def partition(&fn : T -> Bool) #

Creates and returns two ports, yay and nay, redirecting those objects for which fn returns true to yay; and those objects for which fn returns false to nay.

xs = Fluxite::Port(Int32).new
even, odd = xs.partition(&.even?)
even.each { |even| p! even }
odd.each { |odd| p! odd }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]

# STDOUT:
#   odd # => 1
#   even # => 2
#   odd # => 3
#   even # => 4

[View source]
def port : Port(T) #

Creates and returns a new port that emits objects emitted by self. If self is a port already then simply returns self.

xs = Fluxite.port(Int32)
ys = xs.select(&.even?).map { |n| n ** 2 }.port
ys.each { |n| p! n }
ys.select { |n| n < 10 }.each { |b| p! b }

Fluxite[xs, 1, 2, 3, 4]

# STDOUT:
#   n # => 4
#   n # => 16
#   b # => 4

[View source]
def recent(&fn : Array(T), T -> Cut) #

[View source]
def reject(&fn : T -> Bool) #

Emits only those incoming objects for which fn returns false.

xs = Fluxite::Port(Int32).new
xs.reject(&.even?).each { |odd| p! odd }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]

# STDOUT:
#   odd # => 1
#   odd # => 3

[View source]
def reject(pattern) #

Emits only those incoming objects that compare not equal to the given pattern. Equality is tested using ===.


[View source]
def select(&fn : T -> Bool) #

Emits only those incoming objects for which fn returns true.

xs = Fluxite::Port(Int32).new
xs.select(&.even?).each { |even| p! even }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]

# STDOUT:
#   even # => 2
#   even # => 4

See also: #only.


[View source]
def select(as cls : U.class) forall U #

Emits incoming objects of type U, casting them to type U. This method is particularly useful to narrow down a union type. If type cast is impossible the incoming object is ignored.

xs = Fluxite::Port(Symbol | String | Int32).new
xs.select(Symbol).each { |sym| p! sym }
xs.select(String).each { |str| p! str }
xs.select(Int32).each { |int| p! int }

Fluxite[xs, 100]
Fluxite[xs, :hello]
Fluxite[xs, "world"]

# STDOUT:
#   int # => 100
#   sym # => :hello
#   str # => "world"

See also: #only.


[View source]
def select(pattern) #

Emits only those incoming objects that compare equal to the given pattern. Equality is tested using ===.

See also: #only.


[View source]
def squash(initial : T, &fn : T, T -> Bool) #

Emits an incoming object if it is different from the preceding object. Optionally, the initial predecessor may be provided. In such case, the first incoming object is compared with initial. Otherwise, the first object is always emitted.

Equality of two objects is determined using fn.

max = nil

xs = Fluxite::Port(Int32).new
xs.squash { |x, y| x >= y }.each { |n| max = n }

(-100..100).to_a.shuffle!.each do |n|
  Fluxite[xs, n]
end

max # => 100

[View source]
def squash(&fn : T, T -> Bool) #

Emits an incoming object if it is different from the preceding object. Optionally, the initial predecessor may be provided. In such case, the first incoming object is compared with initial. Otherwise, the first object is always emitted.

Equality of two objects is determined using fn.

max = nil

xs = Fluxite::Port(Int32).new
xs.squash { |x, y| x >= y }.each { |n| max = n }

(-100..100).to_a.shuffle!.each do |n|
  Fluxite[xs, n]
end

max # => 100

[View source]
def squash(*args, **kwargs) #

Emits an incoming object if it is different from the preceding one.

The objects are compared using ==.

xs = Fluxite::Port(Int32).new
xs.squash.each { |x| p! x }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 2]

# STDOUT:
#   x # => 1
#   x # => 2
#   x # => 3
#   x # => 2

[View source]
def squash_by(initial : U, &fn : T -> U) forall U #

Emits an incoming object if its return value of fn is different from that produced by the preceding object. Optionally, the initial value of fn may be provided. In such case, the first incoming object is compared with that value. Otherwise, the first object is always emitted.

# Do not emit consecutive even numbers.
xs = Fluxite::Port(Int32).new
xs.squash_by(&.even?).each { |x| p! x }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 4]
Fluxite[xs, 5]
Fluxite[xs, 6]
Fluxite[xs, 7]

# STDOUT:
#   x # => 1
#   x # => 2
#   x # => 5
#   x # => 6
#   x # => 7

[View source]
def squash_by(&fn : T -> U) forall U #

Emits an incoming object if its return value of fn is different from that produced by the preceding object. Optionally, the initial value of fn may be provided. In such case, the first incoming object is compared with that value. Otherwise, the first object is always emitted.

# Do not emit consecutive even numbers.
xs = Fluxite::Port(Int32).new
xs.squash_by(&.even?).each { |x| p! x }

Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 4]
Fluxite[xs, 5]
Fluxite[xs, 6]
Fluxite[xs, 7]

# STDOUT:
#   x # => 1
#   x # => 2
#   x # => 5
#   x # => 6
#   x # => 7

[View source]
def track(other : IFanout(U), default : U) forall U #

[View source]
def track(other : IFanout(U)) forall U #

[View source]
def track(*layout : *U) forall U #

Tracks multiple values simultaneously as described by layout.

Remember: tracking is for when you have a master unit and a few units that the master's emission should be combined with, and you want to know their most up to date values. In other words, #track quietly tracks the units, and emits when the master unit emits.

Layout can feature any combination of the following:

Using raw unit (will have to wait until all of age, profession arrive):

names = Fluxite::Port(String).new
ages = Fluxite::Port(Int32).new
professions = Fluxite::Port(String).new

names
  .track(ages, professions)
  .each { |name, age, profession| p!({name, age, profession}) }

Fluxite[ages, 25]
Fluxite[names, "John Doe"]
Fluxite[professions, "programmer"]

# STDOUT:
#   {name, age, profession} # => {"John Doe", 25, "programmer"}

Fluxite[profession, "gardener"]
Fluxite[age, 32]

# Prints nothing. `age` and `profession` are quietly tracked by `name`,
# to get their freshest values when `name` changes.

Fluxite[names, "Susan Doe"]

# STDOUT:
#   {name, age, profession} # => {"Susan Doe", 32, "gardener"}

Specifying a default value to be used before a tracked unit emits:

names = Fluxite::Port(String).new
ages = Fluxite::Port(Int32).new
professions = Fluxite::Port(String).new

names
  .track(ages, {from: professions, default: "unspecified"})
  .each { |name, age, profession| p!({name, age, profession}) }

# For consistency you may write the above as:
# name
#   .track(
#     { from: ages },
#     { from: professions, default: "unspecified" })
#   .each { |name, age, profession| p!({ name, age, profession })

Fluxite[ages, 25]
Fluxite[names, "John Doe"]

# STDOUT:
#   {name, age, profession} # => {"John Doe", 25, "unspecified"}

Fluxite[professions, "writer"]

# Again, prints nothing. We've used the default value. Now a new `name`
# must arrive before we register the new profession.

Fluxite[names, "Mark Stephenson"]

# STDOUT:
#   {name, age, profession} # => {"Mark Stephenson", 25, "writer"}

[View source]
def upto(n : Int) #

Emits up to n incoming objects.


[View source]