module Fluxite::PipeOut(T)
Direct including types
Defined in:
fluxite/pipeout.crInstance Method Summary
- #after(other : IFanout(U)) forall U
- #batch(&fn : Array(T), T -> Cut)
-
#batch(n : Int)
Emits batches of n incoming objects.
- #before(other : IFanout(U)) forall U
-
#blast(&fn : T -> Enumerable(U)) forall U
Forwards incoming objects to fn, emits each element from the enumerable returned by fn.
-
#compact_map(&fn : T -> U | Nil) forall U
Similar to
#map
, but skips.nil?
return values of fn (sofalse
is still emitted). -
#cord : Cord(T)
Creates and returns a cord passing objects emitted by
self
to a new port. -
#cord(*, to other : Port(T)) : Cord(T)
Similar to
#cord
but lets you specify the other port other manually. - #during(gate : IFanout(Bool))
-
#each(&fn : T -> )
Attaches a terminal function fn which consumes data but does not emit any.
- #forward(cls : U.class, &fn : T, Forward::Feed(U) -> ) forall U
- #gate(*, by other : IFanout(Bool))
- #into(other : IFanout(U), as cls : U.class) : self forall U
- #into(other : IMailbox(T))
- #into(other : IMailbox(U)) forall U
- #into(other : IFanout(U), &fn : T -> U) : self forall U
-
#into(receiver, &fn : T -> U) : Nil forall T, U
Transforms incoming objects using fn and appends them to receiver.
- #into(&fn : T -> Enumerable(IMailbox(T))) : Nil
-
#into(receiver) : Nil
Appends incoming objects to receiver.
-
#map(&fn : T -> U) forall U
Emits data transformed using fn.
-
#map(cls : U.class) forall U
If
U.class
responds to[]
(treated as a smart constructor), emitsU[object]
, otherwise, emitsU.new(object)
, where object is each incoming object. -
#map(*layout : *U) forall U
Similar to
#map(cls : U.class)
, but performs elementwise conversion as described by layout. - #only(*args, **kwargs, &fn : T -> Bool)
- #only(*args, **kwargs)
-
#or(other : IFanout(U)) forall U
Combines emission of
self
and other. -
#partition(&fn : T -> Bool)
Creates and returns two ports, yay and nay, redirecting those objects for which fn returns
true
to yay; and those objects for which fn returnsfalse
to nay. -
#port : Port(T)
Creates and returns a new port that emits objects emitted by
self
. - #recent(&fn : Array(T), T -> Cut)
-
#reject(&fn : T -> Bool)
Emits only those incoming objects for which fn returns
false
. -
#reject(pattern)
Emits only those incoming objects that compare not equal to the given pattern.
-
#select(&fn : T -> Bool)
Emits only those incoming objects for which fn returns
true
. -
#select(as cls : U.class) forall U
Emits incoming objects of type
U
, casting them to typeU
. -
#select(pattern)
Emits only those incoming objects that compare equal to the given pattern.
-
#squash(initial : T, &fn : T, T -> Bool)
Emits an incoming object if it is different from the preceding object.
-
#squash(&fn : T, T -> Bool)
Emits an incoming object if it is different from the preceding object.
-
#squash(*args, **kwargs)
Emits an incoming object if it is different from the preceding one.
-
#squash_by(initial : U, &fn : T -> U) forall U
Emits an incoming object if its return value of fn is different from that produced by the preceding object.
-
#squash_by(&fn : T -> U) forall U
Emits an incoming object if its return value of fn is different from that produced by the preceding object.
- #track(other : IFanout(U), default : U) forall U
- #track(other : IFanout(U)) forall U
-
#track(*layout : *U) forall U
Tracks multiple values simultaneously as described by layout.
-
#upto(n : Int)
Emits up to n incoming objects.
Instance Method Detail
Emits batches of n incoming objects. Waits until the entire batch is collected, and only then emits it. Then starts fresh from a new empty batch.
The emitted batch array is fully yours. You can read/mutate it however you want.
xs = Fluxite::Port(Int32).new
xs.batch(3).each { |triple| p! triple }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
Fluxite[xs, 5]
Fluxite[xs, 6]
Fluxite[xs, 7]
# STDOUT:
# triple # => [1, 2, 3]
# triple # => [4, 5, 6]
Forwards incoming objects to fn, emits each element from the enumerable returned by fn.
xs = Fluxite::Port(String).new
xs.blast(&.chars).squash.each { |ch| p! ch }
Fluxite[xs, "helloo"]
# STDOUT:
# ch # => 'h'
# ch # => 'e'
# ch # => 'l'
# ch # => 'o'
Similar to #map
, but skips .nil?
return values of fn (so false
is still emitted).
xs = Fluxite::Port(Int32).new
xs.compact_map { |x| x.even? ? "#{x} even" : nil }.each { |x| p! x }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
# STDOUT:
# x # => "2 even"
# x # => "4 even"
Creates and returns a cord passing objects emitted by self
to a new port.
You can use Cord#output
to obtain this new port, for instance to attach
some units to it.
Note that the cord is disabled by default.
xs = Fluxite.port(Int32)
cord = xs.cord
cord.output
.select(&.even?)
.each { |n| p! n }
Fluxite[xs, 2, 3, 4]
# Nothing is printed! The cord is disabled by default.
cord.enable
Fluxite[xs, 2, 3, 4]
# STDOUT:
# n # => 2
# n # => 4
cord.disable
Fluxite[xs, 2, 3, 4]
# Nothing again. But this time since we've disabled the cord manually.
See Cord
and its methods to learn more.
Similar to #cord
but lets you specify the other port other manually.
See #cord
for details and examples.
Attaches a terminal function fn which consumes data but does not emit any.
xs = Fluxite::Port(Int32).new
xs.each { |x| p! x }
Fluxite[xs, 100]
Fluxite[xs, 200]
Fluxite[xs, 300]
# STDOUT:
# x # => 100
# x # => 200
# x # => 300
Transforms incoming objects using fn and appends them to receiver.
receiver must respond to <<(object)
where object is the result of fn.
So all of IO
, Array
, Set
, Deque
etc. are supported by this method.
squares = [] of Int32
xs = Fluxite::Port(Int32).new
xs.select(&.even?).into(squares) { |n| n ** 2 }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
p! squares
# STDOUT:
# squares # => [4, 16]
Appends incoming objects to receiver.
receiver must respond to <<(object)
where object the incoming object.
So all of IO
, Array
, Set
, Deque
etc. are supported by this method.
cubes = [] of Int32
xs = Fluxite::Port(Int32).new
xs.select(&.odd?).map { |n| n ** 3 }.into(cubes)
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
Fluxite[xs, 5]
p! cubes
# STDOUT:
# cubes # => [1, 27, 125]
Emits data transformed using fn.
xs = Port(Int32).new
xs.map(&.chr).each { |ch| p! ch }
Fluxite[xs, 100]
Fluxite[xs, 102]
Fluxite[xs, 103]
# STDOUT:
# ch # => 'd'
# ch # => 'f'
# ch # => 'g'
If U.class
responds to []
(treated as a smart constructor), emits U[object]
,
otherwise, emits U.new(object)
, where object is each incoming object.
record Var, id : UInt32
xs = Fluxite::Port(UInt32).new
xs.each { |x| p! x }
xs.map(Var).each { |var| p! var }
Fluxite[xs, 100u32]
Fluxite[xs, 200u32]
Fluxite[xs, 300u32]
# STDOUT:
# x # => 100
# var # => Var(@id=100)
# x # => 200
# var # => Var(@id=200)
# x # => 300
# var # => Var(@id=300)
Similar to #map(cls : U.class)
, but performs elementwise conversion as
described by layout.
record Foo, x : Int32
record Bar, x : String
record Baz, x : Bool
xs = Fluxite::Port({Int32, String, Bool}).new
xs.map(Foo, Bar, Baz).each { |ys| p! ys }
Fluxite[xs, {100, "hello", true}]
Fluxite[xs, {200, "world", false}]
# STDOUT:
# ys # => {Foo(@x=100), Bar(@x="hello"), Baz(@x=true)}
# ys # => {Foo(@x=200), Bar(@x="world"), Baz(@x=false)}
Combines emission of self
and other.
xs = Fluxite::Port(Symbol).new
ys = Fluxite::Port(Int32).new
xs.or(ys).each { |common| p! common }
Fluxite[xs, :foo]
Fluxite[ys, 200]
Fluxite[ys, 300]
Fluxite[xs, :bar]
# STDOUT:
# common # => :foo
# common # => 200
# common # => 300
# common # => :bar
Creates and returns two ports, yay and nay, redirecting those objects
for which fn returns true
to yay; and those objects for which fn
returns false
to nay.
xs = Fluxite::Port(Int32).new
even, odd = xs.partition(&.even?)
even.each { |even| p! even }
odd.each { |odd| p! odd }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
# STDOUT:
# odd # => 1
# even # => 2
# odd # => 3
# even # => 4
Creates and returns a new port that emits objects emitted by self
. If self
is a port already then simply returns self
.
xs = Fluxite.port(Int32)
ys = xs.select(&.even?).map { |n| n ** 2 }.port
ys.each { |n| p! n }
ys.select { |n| n < 10 }.each { |b| p! b }
Fluxite[xs, 1, 2, 3, 4]
# STDOUT:
# n # => 4
# n # => 16
# b # => 4
Emits only those incoming objects for which fn returns false
.
xs = Fluxite::Port(Int32).new
xs.reject(&.even?).each { |odd| p! odd }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
# STDOUT:
# odd # => 1
# odd # => 3
Emits only those incoming objects that compare not equal to the given
pattern. Equality is tested using ===
.
Emits only those incoming objects for which fn returns true
.
xs = Fluxite::Port(Int32).new
xs.select(&.even?).each { |even| p! even }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 4]
# STDOUT:
# even # => 2
# even # => 4
See also: #only
.
Emits incoming objects of type U
, casting them to type U
. This method
is particularly useful to narrow down a union type. If type cast is impossible
the incoming object is ignored.
xs = Fluxite::Port(Symbol | String | Int32).new
xs.select(Symbol).each { |sym| p! sym }
xs.select(String).each { |str| p! str }
xs.select(Int32).each { |int| p! int }
Fluxite[xs, 100]
Fluxite[xs, :hello]
Fluxite[xs, "world"]
# STDOUT:
# int # => 100
# sym # => :hello
# str # => "world"
See also: #only
.
Emits only those incoming objects that compare equal to the given pattern.
Equality is tested using ===
.
See also: #only
.
Emits an incoming object if it is different from the preceding object. Optionally, the initial predecessor may be provided. In such case, the first incoming object is compared with initial. Otherwise, the first object is always emitted.
Equality of two objects is determined using fn.
max = nil
xs = Fluxite::Port(Int32).new
xs.squash { |x, y| x >= y }.each { |n| max = n }
(-100..100).to_a.shuffle!.each do |n|
Fluxite[xs, n]
end
max # => 100
Emits an incoming object if it is different from the preceding object. Optionally, the initial predecessor may be provided. In such case, the first incoming object is compared with initial. Otherwise, the first object is always emitted.
Equality of two objects is determined using fn.
max = nil
xs = Fluxite::Port(Int32).new
xs.squash { |x, y| x >= y }.each { |n| max = n }
(-100..100).to_a.shuffle!.each do |n|
Fluxite[xs, n]
end
max # => 100
Emits an incoming object if it is different from the preceding one.
The objects are compared using ==
.
xs = Fluxite::Port(Int32).new
xs.squash.each { |x| p! x }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 2]
Fluxite[xs, 3]
Fluxite[xs, 2]
# STDOUT:
# x # => 1
# x # => 2
# x # => 3
# x # => 2
Emits an incoming object if its return value of fn is different from that produced by the preceding object. Optionally, the initial value of fn may be provided. In such case, the first incoming object is compared with that value. Otherwise, the first object is always emitted.
# Do not emit consecutive even numbers.
xs = Fluxite::Port(Int32).new
xs.squash_by(&.even?).each { |x| p! x }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 4]
Fluxite[xs, 5]
Fluxite[xs, 6]
Fluxite[xs, 7]
# STDOUT:
# x # => 1
# x # => 2
# x # => 5
# x # => 6
# x # => 7
Emits an incoming object if its return value of fn is different from that produced by the preceding object. Optionally, the initial value of fn may be provided. In such case, the first incoming object is compared with that value. Otherwise, the first object is always emitted.
# Do not emit consecutive even numbers.
xs = Fluxite::Port(Int32).new
xs.squash_by(&.even?).each { |x| p! x }
Fluxite[xs, 1]
Fluxite[xs, 2]
Fluxite[xs, 4]
Fluxite[xs, 5]
Fluxite[xs, 6]
Fluxite[xs, 7]
# STDOUT:
# x # => 1
# x # => 2
# x # => 5
# x # => 6
# x # => 7
Tracks multiple values simultaneously as described by layout.
Remember: tracking is for when you have a master unit and a few units that
the master's emission should be combined with, and you want to know their most up
to date values. In other words, #track
quietly tracks the units, and emits when
the master unit emits.
Layout can feature any combination of the following:
- A unit of any type (e.g.
#track(xs.select(&.even?), ys.reject(&.odd?))
) - A spec for a unit with a default value (e.g.
track({ from: xs.select(&.even?), default: 2 }, { from: ys.select(&.odd?), default: 3 })
) - A spec for a unit without a default value (e.g.
#track({ from: xs.select(&.even?) }, { from: ys.select(&.odd?) })
), allowed mostly for consistency (when some units have defaults and some don't, it's recommended to use this form).
Using raw unit (will have to wait until all of age, profession arrive):
names = Fluxite::Port(String).new
ages = Fluxite::Port(Int32).new
professions = Fluxite::Port(String).new
names
.track(ages, professions)
.each { |name, age, profession| p!({name, age, profession}) }
Fluxite[ages, 25]
Fluxite[names, "John Doe"]
Fluxite[professions, "programmer"]
# STDOUT:
# {name, age, profession} # => {"John Doe", 25, "programmer"}
Fluxite[profession, "gardener"]
Fluxite[age, 32]
# Prints nothing. `age` and `profession` are quietly tracked by `name`,
# to get their freshest values when `name` changes.
Fluxite[names, "Susan Doe"]
# STDOUT:
# {name, age, profession} # => {"Susan Doe", 32, "gardener"}
Specifying a default value to be used before a tracked unit emits:
names = Fluxite::Port(String).new
ages = Fluxite::Port(Int32).new
professions = Fluxite::Port(String).new
names
.track(ages, {from: professions, default: "unspecified"})
.each { |name, age, profession| p!({name, age, profession}) }
# For consistency you may write the above as:
# name
# .track(
# { from: ages },
# { from: professions, default: "unspecified" })
# .each { |name, age, profession| p!({ name, age, profession })
Fluxite[ages, 25]
Fluxite[names, "John Doe"]
# STDOUT:
# {name, age, profession} # => {"John Doe", 25, "unspecified"}
Fluxite[professions, "writer"]
# Again, prints nothing. We've used the default value. Now a new `name`
# must arrive before we register the new profession.
Fluxite[names, "Mark Stephenson"]
# STDOUT:
# {name, age, profession} # => {"Mark Stephenson", 25, "writer"}