concurrent.cr
Modern Adequate Any
New opportunities for concurrency tools in Crystal.
Large empty lots spacious directories available to build your dream home algorithm!
Space is filling up at (24k code bytes / 2 months) 0.004 bytes per second. Register your PR today!
©️ Real estate marketing association
Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns which inspired Ruby, which inspired this library.
Available classes:
- Concurrent::Enumerable
- Concurrent::Channel
- Concurrent::CountDownLatch
- Concurrent::CyclicBarrier
- Concurrent::Semaphore
TODO
- [ ] Change Enumerable/Channel in to generic stream processing.
- [ ] Enumerable/Channel custom error handling.
More algorithms are coming. Contributions welcome.
Installation
-
Add the dependency to your
shard.yml
:dependencies: concurrent: github: didactic-drunk/concurrent.cr
-
Run
shards install
Usage
Parallel map (experimental)
require "concurrent/enumerable"
(1..50).parallel.select(&.even?).map { |n| n + 1 }.serial.sum
^ ^ ^ Results joined.
| | Spawns separate fiber pool
| Spawns fiber pool
Batches
(1..50).parallel.map { |n|
# Parallel processing in a fiber pool
Choose::A::ORM.new(id: n)
}.batch(10).run { |array_of_records|
# Run 10 Inserts inside a transaction for faster db writes
# Real applications should choose ~~~100-100000 depending on the database, schema, data & hardware
ORM.transaction { array_of_records.each &.save! }
}.wait
Stream processing from a Channel
(experimental).
require "concurrent/channel"
# Same interface and restrictions as concurrent/enumerable.
ch = Channel(Int32).new
spawn do
10.times { |i| ch.send 1 }
ch.close
end
# map is processed in a Fiber pool.
# All other fibers will shut down after all messages are processed.
# Any errors in processing are raised here.
ch.parallel.map { |n| n + 1 }.serial.sum
Open ended stream processing aka simplified fiber pools (experimental)
require "concurrent/channel"
# Same interface and restrictions as concurrent/enumerable.
ch = Channel(Int32).new
# Messages may be processed in parallel within each `tee` and `run`.
# Make sure to use immutable objects or concurrency safe data structures.
run = ch.parallel.tee { |n| Log.info { "n=#{n}" } }.batch(2).run { |n| p n }
10.times { |i| ch.send 1 }
ch.close
# Wait until all messages/errors are processed.
run.wait
Stream error handling
ary = (1..10).to_a.parallel.select { |i|
raise "select error" if i == 2
true
}.parallel.map { |i|
raise "map error" if i.even?
i.to_s
# All errors in prior blocks handled here
}.errors { |ex, obj|
puts "#{obj} #{ex}"
}.map { |s|
s.to_i
}.to_a
p ary => [1, 3, 5, 7]
CountDownLatch
require "concurrent/count_down_latch"
fiber_count = 10
latch = Concurrent::CountDownLatch.new
10.times do
spawn do
# Do work
latch.count_down
end
end
latch.wait_count = fiber_count
latch.wait
Semaphore
require "concurrent/semaphore"
sem = Concurrent::Semaphore.new n
# spawn a lot of fibers
2000.times do
spawn do
sem.acquire do
...
end
end
end
Development
TODO Write development instructions here
Contributing
- Fork it (https://github.com/didactic-drunk/concurrent.cr/fork)
- Install a formatting check git hook (ln -sf ../../scripts/git/pre-commit .git/hooks)
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request
Contributors
- Click or Run
git shortlog --summary --numbered --email