A purely functional library to build distributed and event-driven systems
It's not a secret that writing distributed systems is a challenging task that can be logically broken into two main aspects: implementing distributed algorithms and running them. Parapet plays the role of execution framework for distributed algorithms - it can be viewed as an intermediate layer between a low-level effect library and high-level operations exposed in the form of DSL. Distributed engineers who mainly focused on designing and implementing distributed algorithms don't need to be worried about low-level abstractions such as IO
or have a piece of deep knowledge in certain computer science subjects, for instance, Concurrency. All they need to know is what properties the library satisfies and what guarantees it provides. On the other hand, engineers who are specializing in writing low-level libraries can concentrate on implementing core abstractions such as IO
or Task
, working on performance optimizations and implementing new features. Parapet is the modular library where almost any component can be replaced with a custom implementation.
Before starting using Parapet it's recommended to get familiar with the following topics:
Monad
, Free Monad
, Functor
. However, understanding of the underlying monad theory is not necessary to use the library. Links: Scala with Cats Book is a very good start,
A Gentle Introduction to I/O,
Free Monad,
Data types a la carte +
InjectK in CatsSynchronous/Asynchronous
FIFO/non-FIFO
Liveness
and Safety
The first thing you need to do is to add two dependencies into your project: parapet-core
and interop-{effect_library}
for a specific effect library. You can find the latest version in maven central.
libraryDependencies += "io.parapet" %% "core" % version
libraryDependencies += "io.parapet" %% "interop-cats" % version
libraryDependencies += "io.parapet" %% "interop-monix" % version"
libraryDependencies += "io.parapet" %% "interop-scalaz-zio" % version
Once you added the library, you can start writing your first program. However, it's worth taking a few minutes and getting familiar with two main approaches to write processes: Generic
and Effect Specific
. I'll describe both in a minute. For those who aren't familiar with effect systems like Cats Effect
, I'd recommend to read the Wiki page about Effect system
or the Cats Effect official documentation. Fortunately, you don't need to be an expert in Cats Effect
or any other effect system library to use Parapet.
The first approach we'll consider is Generic
. It's recommended to stick to this style when writing processes. Let's develop a simple Printer process that will print users requests to the system output.
import io.parapet.core.{Event, Process} class Printer[F[_]] extends Process[F] { import Printer._ // import Printer API import dsl._ // import DSL operations override def handle: Receive = { case Print(data) => eval(println(data)) } } object Printer { case class Print(data: Any) extends Event }Let's walk through this code. You start writing your processes by extending
Process
trait and parameterizing it with an effect type. In this example, we left so-called hole F[_]
in our
Printer
type which can be any type constructor with a single argument, e.g. F[_]
is a generic type constructor, cats effect IO
is a specific type constructor and IO[Unit]
is a concrete type. Starting from this moment, it should become clear what it means for a process to be generic
. Simply speaking, it means that a process doesn't depend on any specific effect type e.g. IO
. Thus we can claim that our Printer
process is surely generic.
The next step is to define a process API
or contract that defines a set of events that it can send and receive. Process contract is an important part of any process specification that should be taken seriously. API defines a protocol that other processes will use to communicate with your process. Please remember that it's a very important aspect of any process definition and take it seriously.
Then we need to import DSL
smart constructors. Parapet DSL is a small set of operations that we will consider in detail in the next chapters. In this example, we need only eval
operator that suspends a side effect in F
, in our Printer
process we suspend println
effectful computation.
Finally, every process should override handle
function defined in Process
trait. handle
function is a partial function that matches input events and produces an executable flows
. If you ever tried Akka framework you may find this approach familiar (for the curious, Receive
is simply a type alias for PartialFunction[Event, DslF[F, Unit]]
). In our Printer process, we match on Print
event using a well known pattern-matching feature in Scala language. If you are new in functional programming, I'd recommend to read about pattern-matching - it's a very powerful instrument.
That's it. We have considered every important aspect of our Printer
process.
Let's move forward and write a simple client process that will talk to our Printer
.
import io.parapet.core.Event.Start import io.parapet.core.{Process, ProcessRef} import io.parapet.examples.Printer._ // import Printer API class PrinterClient[F[_]](printer: ProcessRef) extends Process[F] { override def handle: Receive = { // Start is a lifecycle event that gets delivered when a process started case Start => Print("hello world") ~> printer } }
As you already might have noticed, we are repeating the same steps we made when were writing our Printer
process:
F[_]
in its type definition io.parapet.core.Process
trait and parametrizing it with generic effect type F
handle
partial function
Let's consider some new types and operators we have used to write our client: ProcessRef
, Start
lifecycle event and ~>
(send) infix operator. Let's start from ProcessRef
. ProcessRef
is a unique process identifier (UUID by default). It represents a process address in Parapet system and must be unique - it's recommended to use ProcessRef
instead of a Process
object directly unless you are sure you want otherwise. It's not prohibited to use Process
object directly, however using a process reference may be useful in some scenarios. Let's consider one such case. Imagine we want to dynamically change the current Printer
process in our client so that it will store data in a file on disk instead of printing it to the console. We can add a new event ChangePrinter
:
case class ChangePrinter(printer: ProcessRef) extends Event
Then our client will look like this:
class PrinterClient[F[_]](private var printer: ProcessRef) extends Process[F] { import PrinterClient._ import dsl._ override def handle: Receive = { case Start => Print("hello world") ~> printer case ChangePrinter(newPrinter) => eval(printer = newPrinter) } } object PrinterClient { case class ChangePrinter(printer: ProcessRef) extends Event }
This design cannot be achieved when using direct processes b/c it's not possible to send `Process` objects, processes are NOT serializable in general. One more thing, you can override a Process#ref
field, only make sure it's unique otherwise Parapet system will return an error during the startup.
Ok, we are almost done! There are a few more things left we need to cover: Start
lifecycle event and ~>
operator and there is nothing special about these two. Parapet has two lifecycle events:
Start
event is sent to a process once it's created in Parapet system Stop
event is sent to a process when an application is interrupted with Ctrl-C
or when some other process sent Stop
or Kill
event to that process. The main difference between Stop
and Kill
is that in the former case a process can finish processing all pending events before it will receive Stop
event, whereas Kill
will interrupt a process and then deliver Stop
event, all pending events will be discarded. If you familiar with Java ExecutorService
then you can think of Stop
as shutdown
and Kill
as shutdownNow
.
Finally ~>
is the most frequently used operator that is defined for any type that extends Event
trait. ~>
is just a symbolic name for send(event, processRef)
operator.
By this moment we have two processes: Printer
and PrinterClient
, nice! But wait, we need to run them somehow, right?
Fortunately, it's extremely easy to do so, all we need is to create PrinterApp
object which represents our application and extend it from CatsApp
abstract class. CatsApp
extends ParApp by specifying concrete effect type IO
:
abstract class CatsApp extends ParApp[IO]
CatsApp
comes from interop-cats
library.
import cats.effect.IO import io.parapet.CatsApp import io.parapet.core.Process object PrinterApp extends CatsApp { override def processes: IO[Seq[Process[IO]]] = IO { val printer = new Printer[IO] val printerClient = new PrinterClient[IO](printer.ref) Seq(printer, printerClient) } }
This is Cats Effect specific application, meaning it uses IO
type under the hood. If you run your program you should see hello world
printed to the console. Also notice that we are using concrete effect type IO
to fill the hole in our Printer
type, e.g.: new Printer[IO]
in practice it can be any other effect type like Task
.
In our example, we created PrinterClient
which does nothing but sending Print
event at the startup. In my opinion, it doesn't deserve to be a standalone process, would be better if we create a process in place:
object PrinterApp extends CatsApp { override def processes: IO[Seq[Process[IO]]] = IO { val printer = new Printer[IO] val start = Process[IO](_ => { case Start => Printer.Print("hello world") ~> printer.ref }) Seq(start, printer) } }
Although it's a matter of taste, there is no hard rule.
This chapter describes each DSL operator in details. Let's get started.
unit
- semantically this operator is equivalent with Monad.unit
and obeys the same laws. Having said that the following expressions are equivalent:
event ~> process <-> unit ++ event ~> process event ~> process <-> event ~> process ++ unit
This operator can be used in fold
operator to combine multiple flows. Example:
processes.map(event ~> _).fold(unit)(_ ++ _)
It also can be used to represent an empty flow:
{ case Start => unit // do nothing case Stop => unit // do nothing }
flow
- suspends the thunk that produces flow. Semantically this operator is equivalent with suspend
for effects however it's strongly not recommended to perform any side effects within flow
.
Not recommended:
def print(str: String) = flow { println(str) unit }
Recommended:
def print(str: String) = flow { eval(println(str)) }
flow
may be useful to implement recursive flows. Example:
def times[F[_]](n: Int) = { def step(remaining: Int): DslF[F, Unit] = flow { if (remaining == 0) unit else eval(print(remaining)) ++ step(remaining - 1) } step(n) }
If you try to remove flow
you will get StackOverflowError
Another useful application is using lazy values inside flow
. Example:
lazy val lazyValue: String = { println("evaluated") "hello" } val useLazyValue = flow { val tmp = lazyValue + " world" eval(println(tmp)) }
send
- sends an event to one or more receivers. Event will be delivered to all receivers in the specified order.
Parapet provides a symbolic name for this operator ~>
although in the current implementation it doesn't allow to send an event to multiple receivers. It will be added in the future releases.
Examples:
send(Ping, processA, processB, processC)
Ping
event will be sent to the processA
then processB
and finaly processC
. It's not guaranteed that processA
will receive Ping
event before processC
as it depends on it's processing speed and current workload.
Ping ~> processA
Not supported:
Ping ~> Seq(processA, processB, processC)
Possible workaround:
Seq(processA, processB, processC).map(Ping ~> _).fold(unit)(_ ++ _)
Send multiple events to a process:
Seq(e1, e2, e3) ~> process
forward
- sends an event to the receiver using the original sender reference. This may be useful for implementing a proxy process.
Example:
val server = Process[IO](_ => { case Request(body) => withSender(sender => eval(println(s"$sender-$body"))) }) val proxy = Process[IO](_ => { case Request(body) => forward(Request(s"proxy-$body"), server.ref) }) val client = Process.builder[IO](_ => { case Start => Request("ping") ~> proxy }).ref(ProcessRef("client")).build
The code above will print:
client-proxy-ping
par
- executes operations from the given flow in parallel.
Example:
par(eval(print(1)) ++ eval(print(2)))
Possible outputs:
12 or 21
delay
- delays every operation in the given flow for the given duration.
For sequential flows the flowing expressions are semantically equivalent:
delay(duration, x~>p ++ y~>p) <-> delay(duration, x~>p) ++ delay(duration, y~>p) delay(duration, x~>p ++ y~>p) <-> delay(duration) ++ x~>p ++ delay(duration) ++ y~>p
For parallel flows:
delay(duration, par(x~>p ++ y~>p)) <-> delay(duration) ++ par(x~>p ++ y~>p)
Note: since the flow below will be executed in parallel the second operation won't be delayed:
par(delay(duration) ++ eval(print(1)))
instead, use:
par(delay(duration, eval(print(1))))
withSender
- accepts a callback function that takes a sender reference and produces a new flow.
Example:
val server = Process[IO](_ => { case Request(data) => withSender(sender => eval(print(s"$sender says $data"))) }) val client = Process.builder[IO](_ => { case Start => Request("hello") ~> server }).ref(ProcessRef("client")).build
The code above will print:
client says hello
fork
- does what exactly the name says, executes the given flow concurrently.
Example:
val process = Process[IO](_ => { case Start => fork(eval(print(1))) ++ fork(eval(print(2))) })
Possible outputs:
12 or 21
register
- registers a child process in the Parapet context. It's guaranteed that a child process will receive Stop
event before its parent.
Example:
val server = Process[IO](ref => { case Start => register(ref, Process[IO](_ => { case Stop => eval(println("stop worker")) })) case Stop => eval(println("stop server")) })
The code above will print:
stop worker stop server
race
- runs two flows concurrently. The loser of the race is canceled.
Example:
val forever = eval(while (true) {}) val process: Process[IO] = Process[IO](_ => { case Start => race(forever, eval(println("winner"))) })
Output:
winner
suspend
- adds an effect which produces `F` to the current flow.
Example:
suspend(IO(print("hello world")))
Output:
hello world
Not recommended:
suspend { println("hello world") IO.unit }
suspendWith
- suspends an effect which produces F
and then feeds that into a function that takes a normal value and returns a new flow. All operations from produced flow added to the current flow.
Example:
suspend(IO.pure(1))) { i => eval(print(i)) }
Output:
1
eval
- suspends a side effect in F
and then adds that to the current flow.
Example:
eval(println("hello world"))
Output:
hello world
evalWith
- Suspends a side effect in F
and then feeds that into a function that takes a normal value and returns a new flow. All operations from a produced flow will be added to the current flow.
Example:
evalWith("hello world")(a => eval(println(a)))
Output:
hello world
blocking
- marks the given flow as blocking. All blocking operations should be wrapped using this operator to not block a scheduler worker. An example of a blocking operation is a long-running IO
operation, e.g. reading data from the network socket.
Example:
class Service { def blockingCall: IO[String] = IO.sleep(1.second) >> IO("data") } class Client extends Process[IO] { import dsl._ private lazy val service = new Service() override def handle: Receive = { case Start => blocking { suspendWith(service.blockingCall)(data => eval(println(data))) } } }
Output:
data
Parapet wouldn't be a truly distributed library without network support.
Starting with 0.0.1-RC5 Parapet comes with a basic networking support.
Cluster
is in early development, however, it's ready to use for basic over-network communications.
In this section you will find the information on how to install Cluster
and exchange messages between two process over the network.
node.properties.template
to node.properties
node.id=<unique_node_id> node.address=<node_address> node.peers=[<node_id:node_address>] node.election-delay=10 node.heartbeat-delay=5 node.monitor-delay=10 node.peer-timeout=10 node.leader-election-threshold=0.7
node.id=node-1 node.address=localhost:5555 node.peers=[node-2:localhost:6666] # ...Node 2
node.id=node-2 node.address=localhost:6666 node.peers=[node-1:localhost:5555] # ...
parapet-cluster-version
folder and run ./bin/cluster
.
You can provide a custom path to node.properties
using
./bin/cluster --config=<path>
2021-05-12 01:07:57 DEBUG RouletteLeaderElection:329 - received Heartbeat(addr=localhost:5555, leader=Some(localhost:5555)) 2021-05-12 01:07:57 DEBUG RouletteLeaderElection:329 - current leader: 'localhost:5555' is healthyIn this case the first node was elected.
parapet-node
import cats.effect.IO import io.parapet.cluster.node.{MessageHandler, Node, Req} import io.parapet.core.Event._ import io.parapet.core.{DslInterpreter, Process, ProcessRef} class NodeProcess(id: String, port: Int, sink: ProcessRef) extends Process[IO] { import dsl._ private val msgHandler = new MessageHandler() { override def handle(req: Req): Unit = { (req ~> sink).foldMap(DslInterpreter.instance.interpret(ref, sink)).unsafeRunSync() // temporary workaround } } private val node = new Node(host = "localhost", port = port, id = id, servers = Array("localhost:5555", "localhost:6666"), msgHandler = msgHandler) override def handle: Receive = { case Start => eval { node.connect() node.join("test") } case req: Req => eval(node.send(req)) } }Create two node apps
import cats.effect.IO import io.parapet.cluster.node.Req import io.parapet.core.Event._ import io.parapet.core.{Process, ProcessRef} import scala.concurrent.duration._ object NodeApp1 extends CatsApp { import dsl._ val echoRef = ProcessRef("echo") val nodeProcess = new NodeProcess("node-1", 8001, echoRef) private val echo = Process.builder[IO](_ => { case Start => Req("node-2", "ping".getBytes()) ~> nodeProcess.ref case req: Req => val rep = "echo: " + new String(req.data) delay(3.seconds) ++ Req("node-2", rep.getBytes()) ~> nodeProcess.ref }).ref(echoRef).build override def processes(args: Array[String]): IO[Seq[core.Process[IO]]] = { IO(Seq(nodeProcess, echo)) } }similarly Node 2:
import cats.effect.IO import io.parapet.cluster.node.Req import io.parapet.core.Event.Start import io.parapet.core.{Process, ProcessRef} import scala.concurrent.duration._ object NodeApp2 extends CatsApp { import dsl._ val echoRef = ProcessRef("echo") val nodeProcess = new NodeProcess("node-2", 8002, echoRef) private val echo = Process.builder[IO](_ => { case Start => Req("node-1", "ping".getBytes()) ~> nodeProcess.ref case req: Req => val rep = "echo: " + new String(req.data) delay(3.seconds) ++ Req("node-1", rep.getBytes()) ~> nodeProcess.ref }).ref(echoRef).build override def processes(args: Array[String]): IO[Seq[core.Process[IO]]] = { IO(Seq(nodeProcess, echo)) } }
2021-05-12 01:55:32 DEBUG Node:118 - req echo: ping to node-2 has been sent 2021-05-12 01:55:38 DEBUG Node:118 - req echo: echo: echo: ping to node-2 has been sent 2021-05-12 01:55:44 DEBUG Node:118 - req echo: echo: echo: echo: echo: ping to node-2 has been sent 2021-05-12 01:55:50 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: echo: ping to node-2 has been sent 2021-05-12 01:55:56 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: echo: echo: echo: ping to node-2 has been sent 2021-05-12 01:56:02 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: ping to node-2 has been sent ...Second node's logs:
2021-05-12 01:55:35 DEBUG Node:118 - req echo: echo: ping to node-1 has been sent 2021-05-12 01:55:41 DEBUG Node:118 - req echo: echo: echo: echo: ping to node-1 has been sent 2021-05-12 01:55:47 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: ping to node-1 has been sent 2021-05-12 01:55:53 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: echo: echo: ping to node-1 has been sent 2021-05-12 01:55:59 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: ping to node-1 has been sent 2021-05-12 01:56:05 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: ping to node-1 has been sent 2021-05-12 01:56:11 DEBUG Node:118 - req echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: echo: ping to node-1 has been sent ....
Process
is a key abstraction in Parapet, any application must have a least one process. If you try to run an application w/o processes you will get an error saying that at least one process required. This section covers some useful features that we haven't seen yet, below you will find a shortlist of features:
and
and or
There are there several ways to create the Process.
class GenericProcess[F[_]] extends Process[F] { import dsl._ override def handle: Receive = { case Start => unit // TODO } }
or using a specific effect type, e.g. IO
:
class IOProcess extends Process[IO] { import dsl._ override def handle: Receive = { case Start => unit // TODO } }
You can use Process Builder to create a stateless process:
Process.Builder
setters:
name
sets a process name; undefined
is a default valueref
sets a process reference; jdk UUID
is a default valuebufferSize
sets a process queue size;
-1
- unbounded (by default) unless it's set globally usingParConfig#processBufferSize
;
valid values: -1; [1,Int.MaxValue)
Example:
val process: Process[IO] = Process.builder[IO](ref => { case Start => dsl.unit }).ref(ProcessRef("process")) // sets a process reference .name("process") // sets a process name .bufferSize(1000) // set a process queue size limit .build
If you don't need to override any values then you can use Process#apply
. Example:
val process: Process[IO] = Process[IO](_ => { case Start => dsl.unit })
Parapet has some reserved process references, e.g.: KernelRef(parapet-kernel)
, SystemRef(parapet-system)
, DeadLetterRef(parapet-deadletter)
, UndefinedRef(parapet-undefined)
. The general rule is that any reference that starts with parapet-
prefix can be used by the platform code for any purpose.
Parapet has a SystemProcess
that cannot be overridden by users. SystemProcess
is a starting point, i.e. it's created before any other process. Lifecycle event Start
is sent by SystemProcess
. Any event sent to the SystemProcess
will be ignored and dropped. Don't try to send any events to SystemProcess
b/c it can lead to unpredictable errors.
DeadLetterProcess
is another process that is created by default, although it can be overridden, for more details check DeadLetterProcess
section under Event Handling
.
Sometimes it might be useful to dynamically switch a process behavior, e.g.: from uninitialized
to ready
state. Thankfully Process
provides switch
method that does exactly that.
Example (lazy server):
// for some effect `F[_]` val server = new Process[F] { val init = eval(println("acquire resources: create socket and etc.")) def ready: Receive = { case Request(data) => withSender(Success(data) ~> _) case Stop => eval(println("release resources: close socket and etc.")) } def uninitialized: Receive = { case Start => unit // ignore Start event, wait for Init case Stop => unit // process is not initialized, do nothing case Init => init ++ switch(ready) case _ => withSender(Failure("process is not initialized", ErrorCodes.ProcessUninitialized) ~> _) } override def handle: Receive = uninitialized } // API object Init extends Event case class Request(data: Any) extends Event sealed trait Response extends Event case class Success(data: Any) extends Event case class Failure(data: Any, errorCode: Int) extends Event object ErrorCodes { val ProcessUninitialized = 0 }
A client which sends Request
event w/o sending Init
:
val impatientClient = Process[F](_ => { case Start => Request("PING") ~> server case Success(_) => eval(println("that is not going to happen")) case f:Failure => eval(println(f)) })
The code above will print:
Failure(process is not initialized,0)
A client which sends Init
first and then Request
:
val humbleClient = Process[F](_ => { case Start => Seq(Init, Request("PING")) ~> server case Success(data) => eval(println(s"client receive response from server: $data")) case _:Failure => eval(println("that is not going to happen")) })
The code above will print:
acquire resources: create socket and etc. client receive response from server: PING release resources: close socket and etc.
switch
is NOT an atomic operation, avoid using switch
in concurrent flows because it may result in an error or lead to unpredictable behavior.
Not recommended:
val process = new Process[F] { def ready: Receive = _ override def handle: Receive = { case Init => fork(switch(ready)) // bad, may lead to unpredictable behaviour } }
If you need to switch behavior from a concurrent flow just send an event e.g. Swith(State.Ready)
to itself. Process will eventually switch its behavior.
Recommended:
val process = new Process[F] { def ready: Receive = _ override def handle: Receive = { case Init => fork { eval(println("do some work in parallel")) Switch(Ready) ~> ref // notify the process that it's time to switch it's behaviour } case Switch(Ready) => switch(ready) } } sealed trait State object Ready extends State case class Switch(next: State) extends Event
Sometimes it may be useful to call a process directly. Especially it's a common case for short living processes. For instance, you may want to create a process, call it and then abandon, garbage collector will do its job. However, if you try to send an event to a process that doesn't exist in the system you will receive Failure
event with UnknownProcessException
. This is where direct call
comes to rescue.
Example:
// API case class Sum(a: Int, b: Int) extends Event case class Result(value: Int) extends Event class Calculator[F[_]] extends Process[F] { override def handle: Receive = { case Sum(a, b) => withSender(Result(a + b - 1) ~> _) // yes, very poor calculator } } val student = Process[F](ref => { case Start => new Calculator().apply(ref, Sum(2, 2)) case Result(value) => eval(println(s"2 + 2 = $value")) })
Output: 2 + 2 = 3
Note that apply
method doesn't return a normal value rather it returns a program which will be executed as normal flow.
In other words the following expressions are equivalent:
Sum(2, 2) ~> calculator <-> new Calculator().apply(ref, Sum(2, 2)) // where ref belongs to the same process in both cases
Also, you may be wondering why do we need to pass the process ref as an argument in the apply method. The reason is that the library needs to know the address of a sender so it can send a reply to it.
Processes can be combined using two logical operators: or
and and
.
and
- combines two processes by producing a new process with ref
of the first process; combines flows iff handle
function is defined for the given event in both processes. Sends an error to the sender if either of two processes isn't defined for the given event.
Example:
import cats.effect.IO import io.parapet.CatsApp import io.parapet.core.Event.Start import io.parapet.core.{Event, Process} object Example extends CatsApp { import dsl._ case class Print(data: Any) extends Event override def processes: IO[Seq[Process[IO]]] = for { printerA <- IO.pure(Process[IO](_ => { case Print(data) => eval(println(s"printerA: $data")) })) printerB <- IO.pure(Process[IO](_ => { case Print(data) => eval(println(s"printerB: $data")) })) client <- IO.pure(Process[IO](ref => { case Start => printerA.and(printerB).apply(ref, Print("test")) })) } yield Seq(printerA, printerB, client) }Output:
printerA: test printerB: test
If you want to register a combined process then you don't need to register printerA
.
Example:
import cats.effect.IO import io.parapet.CatsApp import io.parapet.core.Event.Start import io.parapet.core.{Event, Process} object Example extends CatsApp { import dsl._ case class Print(data: Any) extends Event override def processes: IO[Seq[Process[IO]]] = for { printerA <- IO.pure(Process[IO](_ => { case Print(data) => eval(println(s"printerA: $data")) })) printerB <- IO.pure(Process[IO](_ => { case Print(data) => eval(println(s"printerB: $data")) })) combined <- IO.pure(printerA.and(printerB)) client <- IO.pure(Process[IO](_ => { case Start => Print("test") ~> combined })) } yield Seq(combined, printerB, client) }
or
- creates a new process with ref
of the first process. A combined process refers to the first process if its handle
is defined for the given event, otherwise, to the second process. Sends an error to the sender if neither process is defined for the given event.
Example:
import cats.effect.IO import io.parapet.CatsApp import io.parapet.core.Event.Start import io.parapet.core.{Event, Process} object Example extends CatsApp { import dsl._ case class Print(data: Any) extends Event override def processes: IO[Seq[Process[IO]]] = for { printerA <- IO.pure(Process[IO](_ => { case Print(data: Int) => eval(println(s"printerA: $data")) })) printerB <- IO.pure(Process[IO](_ => { case Print(data: String) => eval(println(s"printerB: $data")) })) combined <- IO.pure(printerA.or(printerB)) client <- IO.pure(Process[IO](_ => { case Start => Print("test") ~> combined ++ Print(1) ~> combined })) } yield Seq(combined, printerB, client) }Output:
printerB: test printerA: 1
Integration tests in parapet written in a generic style that we discussed before so that the same tests can be run against any effect system. Let's try to write a simple test for a proxy process. The first thing you need to do is to add test-utils
library into your project:
libraryDependencies += "io.parapet" %% "test-utils" % version
A simple proxy process that receives requests and forwards them to a service
class Proxy(service: ProcessRef) extends Process[F] { override def handle: Receive = { case Request(data) => Request(s"proxy-$data") ~> service } }
Test for our Proxy
:
import io.parapet.core.{Event, Process, ProcessRef} import io.parapet.tests.intg.ProxySpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} import org.scalatest.FunSuite import org.scalatest.Matchers._ import org.scalatest.OptionValues._ abstract class ProxySpec[F[_]] extends FunSuite with IntegrationSpec[F] { import dsl._ test("proxy") { val eventStore = new EventStore[F, Event] val testService = Process(ref => { case req: Request => eval(eventStore.add(ref, req)) }) val proxy = new Proxy[F](testService.ref) val init = onStart(Request("req") ~> proxy) unsafeRun(eventStore.await(1, createApp(ct.pure(Seq(init, testService, proxy))).run)) eventStore.get(testService.ref).headOption.value shouldBe Request("proxy-req") } }
In order to run this test against Cats Effect IO
you need to extend BasicCatsIOSpec
:
import cats.effect.IO import io.parapet.testutils.BasicCatsIOSpec class ProxySpec extends io.parapet.tests.intg.ProxySpec[IO] with BasicCatsIOSpec
Channel is a process that implements strictly synchronous request-reply dialog. The channel sends an event to a receiver and then waits for a response in one step, i.e. it blocks asynchronously until it receives a response. Doing any other sequence, e.g., sending two request or reply events in a row will return a failure to the sender.
Example for some F[_]
:
val server = new Process[F] { override def handle: Receive = { case Request(data) => withSender(sender => Response(s"echo: $data") ~> sender) } } val client = new Process[F] { lazy val ch = Channel[F] override def handle: Receive = { case Start => register(ref, ch) ++ ch.send(Request("PING"), server.ref, { case scala.util.Success(Response(data)) => eval(println(data)) case scala.util.Failure(err) => eval(println(s"server failed to process request. err: ${err.getMessage}")) }) } } case class Request(data: Any) extends Event case class Response(data: Any) extends EventOutput:
echo: PING
There are some scenarios when a process may receive a Failure
event:
Example:
// for some effect F[_] val faultyServer = Process.builder[F](_ => { case Request(_) => eval(throw new RuntimeException("server is down")) }).ref(ProcessRef("server")).build val client = Process.builder[F](_ => { case Start => Request("PING") ~> faultyServer case Failure(Envelope(me, event, receiver), EventHandlingException(errMsg, cause)) => eval { println(s"self: $me") println(s"event: $event") println(s"receiver: $receiver") println(s"errMsg: $errMsg") println(s"cause: ${cause.getMessage}") } }).ref(ProcessRef("client")).build
The code above will output:
self: client event: Request(PING) receiver: server errMsg: process [name=undefined, ref=server] has failed to handle event: Request(PING) cause: server is down
EventHandlingException
indicates that a receiver process failed to handle an event.
It's possible when a process experiencing performance degradation due to heavy load.
Example:
For this example we need to tweak SchedulerConfig
:
queueSize = 10000 processQueueSize = 100
// for some effect F[_] val slowServer = Process.builder[F](_ => { case Request(_) => eval(while (true) {}) // very slow process... NEVER WRITE SUCH CODE }).ref(ProcessRef("server")).build val client = Process.builder[F](_ => { case Start => generateRequests(1000) ~> slowServer case Failure(Envelope(me, event, receiver), EventDeliveryException(errMsg, cause)) => eval { println(s"self: $me") println(s"event: $event") println(s"receiver: $receiver") println(s"errMsg: $errMsg") println(s"cause: ${cause.getMessage}") println("=====================================================") } }).ref(ProcessRef("client")).build def generateRequests(n: Int): Seq[Event] = { (0 until n).map(Request) }
The code above will print a dozens of lines, four lines per Failure
event:
client sent events self: client event: Request(101) receiver: server errMsg: System failed to deliver an event to process [name=undefined, ref=server] cause: process [name=undefined, ref=server] event queue is full ===================================================== self: client event: Request(102) receiver: server errMsg: System failed to deliver an event to process [name=undefined, ref=server] cause: process [name=undefined, ref=server] event queue is full ===================================================== self: client event: Request(103) receiver: server errMsg: System failed to deliver an event to process [name=undefined, ref=server] cause: process [name=undefined, ref=server] event queue is full =====================================================
EventDeliveryException
indicates that the system failed to deliver an event. Handling such types of errors may be useful for runtime analysis, e.g. a sender process might consider lowering event send rate or even stop sending events to let a target process to finish processing pending events. It's worth noting that you should avoid any long-running computations when processing Failure
events because it could lead to cascading failures.
Example:
// for some effect F[_] val uselessService = Process.builder[F](_ => { case Start => unit case Stop => unit }).ref(ProcessRef("server")).build val client = Process.builder[F](_ => { case Start => Request("PING") ~> uselessService case Failure(Envelope(me, event, receiver), EventMatchException(errMsg)) => eval { println(s"self: $me") println(s"event: $event") println(s"receiver: $receiver") println(s"errMsg: $errMsg") } }).ref(ProcessRef("client")).build
The code above will print:
self: client event: Request(PING) receiver: server errMsg: process [name=undefined, ref=server] handler is not defined for event: Request(PING)
// for some effect F[_] val unknownService = Process.builder[F](_ => { case Start => unit case Stop => unit }).ref(ProcessRef("server")).build val client = Process.builder[F](_ => { case Start => Request("PING") ~> unknownService case Failure(Envelope(me, event, receiver), UnknownProcessException(errMsg)) => eval { println(s"self: $me") println(s"event: $event") println(s"receiver: $receiver") println(s"errMsg: $errMsg") } }).ref(ProcessRef("client")).build
The code above will print:
self: client event: Request(PING) receiver: server errMsg: there is no such process with id=server registered in the system
Final notes regarding error handling:
Failure
events sent by parapet-system
process (if you are curious you can check it by yourself using withSender
).Failure
event will be sent to DeadLetterProcess
. More about DeadLetterProcess
you will find below
The library by default provides an implementation of DeadLetterProcess
which just logs failures. Although it might be not very practical, for instance, you may prefer to store failures into a database for further analyses. The library allows providing a custom implementation of DeadLetterProcess
.
Example using CatsApp
:
import cats.effect.IO import io.parapet.CatsApp import io.parapet.core.Event.{DeadLetter, Start} import io.parapet.core.processes.DeadLetterProcess import io.parapet.core.{Event, Process, ProcessRef} object CustomDeadLetterProcessDemo extends CatsApp { import dsl._ override def deadLetter: IO[DeadLetterProcess[IO]] = IO.pure { new DeadLetterProcess[IO] { override def handle: Receive = { // can be stored in database case DeadLetter(envelope, error) => eval { println(s"sender: ${envelope.sender}") println(s"receiver: ${envelope.receiver}") println(s"event: ${envelope.event}") println(s"errorType: ${error.getClass.getSimpleName}") println(s"errorMsg: ${error.getMessage}") } } } } val faultyServer = Process.builder[IO](_ => { case Request(_) => eval(throw new RuntimeException("server is down")) }).ref(ProcessRef("server")).build val client = Process.builder[IO](_ => { case Start => Request("PING") ~> faultyServer // no error handling }).ref(ProcessRef("client")).build override def processes: IO[Seq[Process[IO]]] = IO { Seq(client, faultyServer) } case class Request(data: Any) extends Event }
The code above will print:
sender: client receiver: server event: Request(PING) errorType: EventHandlingException errorMsg: process [name=undefined, ref=server] has failed to handle event: Request(PING)
EventLog
can be used to store events on disk. Latter, events can be retrieved and resubmitted.
In a case, the event queue is full all unsubmitted events will be redirected to EventLog
.
The default implementation just logs such events. In future releases, more practical implementation will be provided.
Parapet system can be configured by providing an instance of ParConfig
.
Example:
import cats.effect.IO import io.parapet.core.Parapet.ParConfig import io.parapet.{CatsApp, core} object ConfigExample extends CatsApp{ override def processes: IO[Seq[core.Process[IO]]] = _ override val config: ParConfig = ParConfig(...) }
ParConfig
has the following properties
processBufferSize
sets a buffer size limit for all processes. If a process created with a different value then that value will be used.
Examples:
Example 1:
override val config: ParConfig = ParConfig(-1, SchedulerConfig.default) val processA = Process[IO](_ => { case _ => unit }) val processB = Process.builder[IO](_ => { case _ => unit }).bufferSize(1000).build
processA will be created with unbounded buffer
processB will be created with buffer size limit equals 1000
Example 2:
override val config: ParConfig = ParConfig(5000, SchedulerConfig.default) val processA = Process[IO](_ => { case _ => unit }) val processB = Process.builder[IO](_ => { case _ => unit }).bufferSize(1000).build
processA will be created with buffer size limit equals 5000
processB will be created with buffer size limit equals 1000
Example 3:
override val config: ParConfig = ParConfig(5000, SchedulerConfig.default) val processA = Process[IO](_ => { case _ => unit }) val processB = Process.builder[IO](_ => { case _ => unit }).bufferSize(-1).build
processA will be created with buffer size limit equals 5000
processB will be created with unbounded buffer
You should set bufferSize
to a value that would match the expected workload. For example, if you are going to send 1M events within the same flow it's recommended to set bufferSize
to 1M. However, it depends on how fast your consumer processes and amount of available memory, if that's possible to keep some amount of events in memory - go for it, if not - you will probably need to reconsider your design decisions.
In a case the process event queue is full all events will be redirected to EventLog
(see the corresponding section).
schedulerConfig:
numberOfWorkers
- number of workers; default = availableProcessorsSafty properties:
Liveness properties:
To start using algorithms implemented in Parapet you need to add algorithms
library to your project's dependencies.
libraryDependencies += "io.parapet" %% "algorithms" % version
A distributed system consists of a set of processors that are connected by a communication network. The communication network provides the facility of information exchange among processors.
Parapet provides a messaging module that consists of an API - set of events that can be used to implement basic communication protocols and messaging abstractions, e.g.: client/server. Implementations based on concrete libraries reside in corresponding subprojects.
In order to start using messaging module you need to add two dependencies: messaging-api
and messaging-{specific_library}
.
By default, Parapet provides implementations of basic messaging components based on ZMQ library.
libraryDependencies += "io.parapet" %% "messaging-api" % version libraryDependencies += "io.parapet" %% "messaging-zmq" % version
ZmqSyncClient
- based on REQ
socket type
ZmqSyncServer
- based on REP
socket type
Example:
import java.net.ServerSocket import cats.effect.IO import io.parapet.CatsApp import io.parapet.core.Event.Start import io.parapet.core.{Encoder, Event, Process} import io.parapet.messaging.api.MessagingApi.{Failure, Request, Success} import io.parapet.messaging.{ZmqSyncClient, ZmqSyncServer} object SyncClientSyncServer extends CatsApp { import dsl._ // Application API case class TestRequest(data: Any) extends Event case class TestResponse(data: Any) extends Event // Required for encoding/decoding events private val encoder = Encoder.json( List( // Messaging Api classOf[Request], classOf[Success], classOf[Failure], // Application Api classOf[TestRequest], classOf[TestResponse] ) ) override def processes: IO[Seq[Process[IO]]] = IO { val port = new ServerSocket(0).getLocalPort val echoService = Process[IO](_ => { case TestRequest(id) => withSender(sender => TestResponse(s"echo: $id") ~> sender) }) val zmqServer = ZmqSyncServer[IO](s"tcp://*:$port", echoService.ref, encoder) val zmqClient = ZmqSyncClient[IO](s"tcp://localhost:$port", encoder) val client = Process[IO](_ => { case Start => Request(TestRequest("hello")) ~> zmqClient case Success(TestResponse(data)) => eval(println(data)) }) Seq(zmqClient, zmqServer, echoService, client) } }Output:
echo: hello
ZmqAsyncClient
- based on DEALER
socket type
ZmqAsyncServer
- based on ROUTER
socket type
Example:
import java.net.ServerSocket import cats.effect.IO import io.parapet.CatsApp import io.parapet.core.Event.Start import io.parapet.core.{Encoder, Event, Process, ProcessRef} import io.parapet.messaging.api.MessagingApi.{Failure, Request, Success} import io.parapet.messaging.api.ServerAPI.Envelope import io.parapet.messaging.{ZmqAsyncClient, ZmqAsyncServer} object AsyncClientAsyncServer extends CatsApp { import dsl._ // Application API case class TestRequest(data: Any) extends Event case class TestResponse(data: Any) extends Event private val encoder = Encoder.json( List( // Messaging Api classOf[Request], classOf[Success], classOf[Failure], // Application Api classOf[TestRequest], classOf[TestResponse]) ) override def processes: IO[Seq[Process[IO]]] = IO.suspend { val port = new ServerSocket(0).getLocalPort val nClients = 5 val nClientWorkers = 5 val nServerWorkers = 5 for { echoService <- IO(Process[IO](_ => { case Envelope(requestId, TestRequest(id)) => fork(withSender(sender => Envelope(requestId, TestResponse(id)) ~> sender)) })) zmqClient <- ZmqAsyncClient[IO](s"tcp://localhost:$port", encoder, nClientWorkers) zmqServer <- IO(ZmqAsyncServer[IO](s"tcp://*:$port", echoService.ref, encoder, nServerWorkers)) clients <- IO((0 until nClients).map(createClient(_, zmqClient.ref))) } yield Seq(echoService, zmqClient, zmqServer) ++ clients } def createClient(id: Int, zmqClient: ProcessRef): Process[IO] = Process[IO](_ => { case Start => Request(TestRequest(id)) ~> zmqClient case Success(TestResponse(data)) => eval(println(s"client-$id received $data")) }) }Output:
client-2 received 2 client-3 received 3 client-0 received 0 client-4 received 4 client-1 received 1
The Freelance Protocol (FLP) defines brokerless reliable request-reply dialogs across an N-to-N network of clients and servers. Protocol specification
Example:
import java.net.ServerSocket import cats.effect.{Concurrent, IO} import io.parapet.CatsApp import io.parapet.core.Dsl.DslF import io.parapet.core.Event.{Start, Stop} import io.parapet.core.{Channel, Encoder, Event, Process, ProcessRef} import io.parapet.messaging.api.{FLProtocolApi, HeartbeatAPI, MessagingApi, ServerAPI} import io.parapet.messaging.{FLProtocol, ZmqAsyncServer} import scala.concurrent.duration._ object FLProtocolExample extends CatsApp { import dsl._ // Application API case class TestRequest(data: Any) extends Event case class TestResponse(data: Any) extends Event val encoder = Encoder.json(List( // Messaging Api classOf[MessagingApi.Request], classOf[MessagingApi.Success], classOf[MessagingApi.Failure], // Application Api classOf[TestRequest], classOf[TestResponse], // Heartbeat Api HeartbeatAPI.Ping.getClass, HeartbeatAPI.Pong.getClass, // FL Api classOf[FLProtocolApi.Connect], // Server Api classOf[ServerAPI.Envelope] )) class FLTestClient[F[_] : Concurrent](flprotocol: ProcessRef, nRequests: Int, servers: List[String]) extends Process[F] { import dsl._ val ch = new Channel[F]() override def handle: Receive = { case Start => register(ref, ch) ++ servers.map(endpoint => FLProtocolApi.Connect(endpoint)) ~> flprotocol ++ delay(1.second, generateRequests(nRequests, ch)) case Stop => unit } def generateRequests(n: Int, ch: Channel[F]): DslF[F, Unit] = { (0 until n).map { i => ch.send(MessagingApi.Request(TestRequest(i.toString)), flprotocol, { case scala.util.Success(res) => eval(println(res)) case scala.util.Failure(err) => eval(println(err)) }) }.fold(unit)(_ ++ _) } } override def processes: IO[Seq[Process[IO]]] = IO.suspend { val availableServerPort = new ServerSocket(0).getLocalPort val nRequests = 5 val flprotocol = new FLProtocol[IO](encoder) val service = Process[IO](_ => { case ServerAPI.Envelope(id, TestRequest(body)) => withSender(sender => { ServerAPI.Envelope(id, TestResponse("server-" + body)) ~> sender }) }) val client = new FLTestClient[IO](flprotocol.ref, nRequests, List("tcp://localhost:4444", // unavailable server "tcp://localhost:5555", // unavailable server s"tcp://localhost:$availableServerPort")) val server = ZmqAsyncServer[IO](s"tcp://*:$availableServerPort", service.ref, encoder, numOfWorkers = 5, s"tcp://localhost:$availableServerPort") IO(Seq(service, flprotocol, client, server)) } }Output:
Failure(request expired,104) Failure(request expired,104) Failure(request expired,104) Success(TestResponse(server-3)) Success(TestResponse(server-4))
Example:
object MapReduceExample extends CatsApp { import dsl._ val mapper: Record[Unit, String] => Seq[Record[String, Int]] = record => { record.value.split(" ").map(word => Record(word.trim, 1)) } val reducer: (String, Seq[Int]) => Int = (_, values) => values.sum val lines = Seq( "Hello World Bye World", "Hello Map Reduce Goodbye Map Reduce" ) override def processes: IO[Seq[Process[IO]]] = IO { val input = Input(lines.map(line => Chunk(Seq(Record[Unit, String]((), line))))) val mapreduce = new MapReduce[IO, Unit, String, String, Int](mapper, 2, reducer, 1) val client = Process[IO](ref => { case Start => input ~> mapreduce case out: Output[String, Int] => eval { out.records.sorted.foreach(println) } }) Seq(mapreduce, client) } }
Output:
Record(Bye,1) Record(Goodbye,1) Record(Hello,2) Record(Map,2) Record(Reduce,2) Record(World,2)
Before we start comparing any frameworks and libraries I'd like to highlight some unique features in Parapet that might change your perception about how distributed systems can be developed using functional programming in Scala.
In general, regardless of a programming paradigm, we have to deal with side effects. In purely functional languages like Haskell, we would use the I/O
system that allows describing the computational effects as pure values. Luckily there are some decent implementations of effect system for Scala: Cats Effect, Monix, FS2, and others. One of the key features of Effect system is that it allows you cleanly separate a side effect actions from a purely functional code. Having a special type that describes computational effects makes it easier to reason about your code. If you see a function that returns IO[A]
it should instantly become clear that this function is not pure and performs some side effects, synchronously or asynchronously.
If you aren't familiar with effect system you are probably using Scala Future
. However, Future is not referential transparent because it evaluates eagerly and memorizes its results. This is why Future is not a suitable type for encapsulating effects. Constructing a Future that will evaluate a side-effect is itself a side-effect.
Parapet can be viewed as effect system for event-driven and concurrent systems. It provides a set of operations that allow describing program flows as data. Then flows interpreted into low-level effect types such as IO
or Task
. Since program flows in Parapet represented as data it gives us the ability to verify distributed algorithms without needing to run them. For instance, having this feature made it possible to implement a preemptive model in Scheduler.
Parapet gives you the freedom to choose an implementation of effect system. If you already using Cats Effect in your project you just need to add interop-cats
into your project. If tomorrow you decide that ZIO is more suitable for your needs you only need to replace interop-cats
with interop-scalaz-zio
and change a few lines of code. As a result, it gives you more flexibility and fine-grained control.
If you notice an inaccuracy or something that doesn’t seem quite right, please let us know by opening an issue.
Messages: 1000000 Consumers: 1 Producers: 1Parapet: 1642 ms
Copyright [2019] The Parapet Project Developers Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0