concurrent.cr

Crystal CI GitHub release Docs

Modern Adequate Any New opportunities for concurrency tools in Crystal.
Large empty lots spacious directories available to build your dream home algorithm!
Space is filling up at (24k code bytes / 2 months) 0.004 bytes per second. Register your PR today!
©️ Real estate marketing association

Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns which inspired Ruby, which inspired this library.

Available classes:

TODO

More algorithms are coming. Contributions welcome.

Installation

  1. Add the dependency to your shard.yml:

    dependencies:
      concurrent:
        github: didactic-drunk/concurrent.cr
  2. Run shards install

Usage

Parallel map (experimental)

require "concurrent/enumerable"

(1..50).parallel.select(&.even?).map { |n| n + 1 }.serial.sum
                 ^               ^                 ^ Results joined.
                 |               | Spawns separate fiber pool
                 | Spawns fiber pool

Batches

(1..50).parallel.map { |n|
  # Parallel processing in a fiber pool
  Choose::A::ORM.new(id: n)
}.batch(10).run { |array_of_records|
  # Run 10 Inserts inside a transaction for faster db writes
  # Real applications should choose ~~~100-100000 depending on the database, schema, data & hardware
  ORM.transaction { array_of_records.each &.save! }
}.wait

Stream processing from a Channel (experimental).

require "concurrent/channel"

# Same interface and restrictions as concurrent/enumerable.

ch = Channel(Int32).new

spawn do
  10.times { |i| ch.send 1 }
  ch.close
end

# map is processed in a Fiber pool.
# All other fibers will shut down after all messages are processed.
# Any errors in processing are raised here.
ch.parallel.map { |n| n + 1 }.serial.sum

Open ended stream processing aka simplified fiber pools (experimental)

require "concurrent/channel"

# Same interface and restrictions as concurrent/enumerable.

ch = Channel(Int32).new
# Messages may be processed in parallel within each `tee` and `run`.
# Make sure to use immutable objects or concurrency safe data structures.
run = ch.parallel.tee { |n| Log.info { "n=#{n}" } }.batch(2).run { |n| p n }

10.times { |i| ch.send 1 }
ch.close

# Wait until all messages/errors are processed.
run.wait

Stream error handling

ary = (1..10).to_a.parallel.select { |i|
  raise "select error" if i == 2
  true
}.parallel.map { |i|
  raise "map error" if i.even?
  i.to_s
# All errors in prior blocks handled here
}.errors { |ex, obj|
  puts "#{obj} #{ex}"
}.map { |s|
  s.to_i
}.to_a

p ary => [1, 3, 5, 7]

CountDownLatch

require "concurrent/count_down_latch"

fiber_count = 10
latch = Concurrent::CountDownLatch.new
10.times do
  spawn do
    # Do work
    latch.count_down
  end
end

latch.wait_count = fiber_count
latch.wait

Semaphore

require "concurrent/semaphore"

sem = Concurrent::Semaphore.new n

# spawn a lot of fibers
2000.times do
  spawn do
    sem.acquire do
      ...
    end
  end
end

Development

TODO Write development instructions here

Contributing

  1. Fork it (https://github.com/didactic-drunk/concurrent.cr/fork)
  2. Install a formatting check git hook (ln -sf ../../scripts/git/pre-commit .git/hooks)
  3. Create your feature branch (git checkout -b my-new-feature)
  4. Commit your changes (git commit -am 'Add some feature')
  5. Push to the branch (git push origin my-new-feature)
  6. Create a new Pull Request

Contributors