I delve into a diverse range of topics, spanning programming languages, machine learning, data engineering tools, and DevOps. Our articles are enriched with practical code examples, ensuring their applicability in real-world scenarios.
Showing posts with label Monads. Show all posts
Showing posts with label Monads. Show all posts
Sunday, November 3, 2024
Posts History
Labels:
Airflow,
BERT,
Big Data,
ChatGPT,
Data pipelines,
Deep learning,
Docker,
Genetic Algorithm,
Java,
Kafka,
Machine learning,
Monads,
NoSQL,
Python,
PyTorch,
Reinforcement Learning,
Scala,
Spark,
Streaming
Tuesday, July 22, 2014
Akka Actors Blocking on Futures
Target audience: Intermediate
Estimated reading time: 6'
This is a brief introduction to distributed computing using blocking Scala/Akka futures.
Table of contents
Overview
Akka is actor based and message-driven framework for building concurrent and reliable applications. As such Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously in the future. The code snippets in our two examples omits condition test or supporting methods for clarity sake. The code compiles and executes with Akka 2.2.4 and 2.3.6
Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted
Concurrent processing
One simple and common usage of Akka/Scala futures is to have some computation performed concurrently without going through the pain of creating actors.
Not every computation requires a sequential execution for which the input of one task depends on the output of another tasks. Let's consider the case of the evaluation of a model or function against a predefined time series or dataset.
The first step is to create a controller to manage the concurrent tasks, FuturesController (line 3). The controller takes the input time series xt (line 4) and a list of model candidates, xs as function of time Function1[Double] (line 5). The time series uses a single variable (dimension 1), so the models are simply defined as a one variable function (x: Double) => f(x).
Not every computation requires a sequential execution for which the input of one task depends on the output of another tasks. Let's consider the case of the evaluation of a model or function against a predefined time series or dataset.
The first step is to create a controller to manage the concurrent tasks, FuturesController (line 3). The controller takes the input time series xt (line 4) and a list of model candidates, xs as function of time Function1[Double] (line 5). The time series uses a single variable (dimension 1), so the models are simply defined as a one variable function (x: Double) => f(x).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | case class Start()
final class FuturesController(
xt: Array[Double],
xs: List[Double => Double])
(implicit timeout: Timeout) extends Actor {
override def receive = {
case s: Start => {
val futures = createFutures
val results = futures.map(
Await.result(_, timeout.duration)
)
sender ! results.zipWithIndex.minBy( _._1 )._2
}
case _ => logger.error("Message not recognized")
}
def createFutures: List[Future[Double]]
}
|
The event handler receive (line 8) for the message Start creates as many future as needed (equal to the number of models to evaluate) (line 10). The controller/actor blocks by invoking Await until each of the future tasks completes (line 12). Finally, the controller returns the result of the computation (line 15), in this case the fittest of the models xs. The handler logs a simple error message in case a message other than Start is received (line 17).
The futures are created through the method createFutures. The implementation of createFutures consists of computing the least squared error for each model relative to the input dataset using a map transformation and a fold aggregator (lines 4 - 7).
The futures are created through the method createFutures. The implementation of createFutures consists of computing the least squared error for each model relative to the input dataset using a map transformation and a fold aggregator (lines 4 - 7).
1
2
3
4
5
6
7
8
9
10
11
12 | def createFutures: List[Future[Double]] =
xs.map(f => Future[Double] {
val yt = Range(0, xt.size).map( f(_) )
val r = (xt, yt).zipped./:(0.0)((sum, z) => {
val diff = z._1 - z._2
sum + diff*diff
})
Math.sqrt(r)
})
}
|
Evaluation
Let's consider an input data set generated with the following model
y = m(m-1) + r // r [0. 1]
where r is a random value between 0 and 1, representing noise.
val TIME_SERIES_LEN = 200
val xt = Array.tabulate(TIME_SERIES_LEN)(
m => m*(m - 1.0) + Random.nextDouble
)
The following code snippet lists all key packages to be imported for most common applications using Akka actors and futures (lines 1 to 6).
The driver program instantiates the Actor system (line 12), creates the controller actor master using the Akka actor factory actorOf (lines 13, 14). It sends a ask request (line 18) and returns the best model (line 22) if the controller completes it tasks within a predefined timeout (line 10). The code print the stack trace in case of failure (line 23).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | import akka.actor.{Actor, Props, ActorSystem}
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import akka.pattern.ask
private val duration = Duration(2000, "millis")
implicit val timeout = new Timeout(duration)
val actorSystem = ActorSystem("system")
val master = actorSystem.actorOf(
Props(new FuturesController(xt, approx)), "Function_eval"
)
Try {
val future = master ? Start()
Await.result(future, timeout.duration)
}
match {
case Success(n) => logger.info(s"Best fit is $n")
case Failure(e) => e.printStackTrace
}
actorSystem.shutdown
|
It is critical that the application shuts down the the Akka system before it exits (line 26).
References
- Akka Essentials - G Manish - Packt Publishing - Oct 2012
- Scala for Machine Learning - Chap 12 Scalable Frameworks / Akka P. Nicolas - Packt Publishing - 2014
Thursday, September 6, 2012
Don't Fear the Monad: Theory
Target audience: Advanced
Estimated reading time: 6'
Are you familiar with Category Theory, Monads, or Functors? Monads, in particular, are gaining popularity as functional languages like Clojure, Haskell, and Scala become more widely accessible.
This article marks the initial segment of our series, delving into the theory behind Monad implementation. The subsequent post, Don't fear the Monad: Scala will cover the practical implementation in Scala.
According to the dictionary:
"Monad: an elementary, fundamental substance that mirrors the structure of the world and serves as the foundation for deriving material properties."
In the realm of computer science, a Monad is a construct used to represent operations that can be either nested or chained for a specific data type. Consequently, software engineers can create pipelines to process data, with each step executing predefined rules defined by the monad. Monads are employed to handle tasks such as I/O operations, exceptions, and concurrency (or Futures).
Monads are grounded in the category theory, a field related to the more familiar group theory. The following section assumes some level of familiarity with mathematics, topology, and transformations.
Monoids
A monoid is a structure
(S,*,1)
consisting of an associative binary operation *
over some set S
, which contains an identity element 1
for \[\forall s_{i}\in S\,\,\, s_{i}*s_{j} \in S\\(s_{i}*s_{j})*s_{k} = s_{i}*(s_{j}*s_{k})\\1*s_{i} = s_{i}*1=s_{i}\]
Categories
Sets, Arrays, Vectors, Matrices, Lists or Hash tables constitute category which support morphism. Let consider two operations, sqr and log on arrays of floating point values x: \[sqr : \{x_{i}\} \mapsto \{x_{i}^2 \} \,\,\, log : \{y_{i}\} \mapsto \{log(y_{i})\}\,\,\,\Rightarrow \,\, sqr \circ log : \{x_{i}\} \mapsto \{2.log(x_{i})\}\]
Categories are a generalization of monoids. A category consists of objects and morphisms. A morphism is a transformation that operates on objects. Let's consider a morphism f, related to objects x and y: \[f: x \mapsto y\] Morphisms have the following properties
* Transitivity: \[f: x \mapsto y\,\,,\,g: y \mapsto z\,\Rightarrow f\circ g: x \mapsto z\] * Omnipotence: \[id:x \mapsto x\] * Associativity: \[f\circ (g\circ h) = (f\circ g) \circ h\]
Sets, Arrays, Vectors, Matrices, Lists or Hash tables constitute category which support morphism. Let consider two operations, sqr and log on arrays of floating point values x: \[sqr : \{x_{i}\} \mapsto \{x_{i}^2 \} \,\,\, log : \{y_{i}\} \mapsto \{log(y_{i})\}\,\,\,\Rightarrow \,\, sqr \circ log : \{x_{i}\} \mapsto \{2.log(x_{i})\}\]
Functors
Functors are a special class of morphisms called homomorphisms. A Functor F define a structure-preserving mapping, between two categories, X and Y, F: X->Y. If f and g are operations on object on categories X/Y, 1 the identity function then \[f: x\in X \mapsto y \in Y,\,\,\,F:X \mapsto Y \Rightarrow F(f): F(x) \mapsto F(y)\\ F(1_{x}) = 1_{F(x)}\\ F(f \circ g) = F(f) \circ F(g)\] A functor that maps a category to itself is called an endofunctor. A endofunctor, denoted as 1x, maps every element and morphism of X to itself
.
Natural Transformation
Natural transformations are mappings, from one functor to another. Let's denote two Functors F, G between two categories X, Y. A natural transformation phi between F & G is defined as \[\phi[x\in X]: F(x) \mapsto G(x)\\ \forall f \in F : x \mapsto y\,\,,\phi[y] \circ F(f) = G(f) \circ \phi[x]\]
Natural transformations are mappings, from one functor to another. Let's denote two Functors F, G between two categories X, Y. A natural transformation phi between F & G is defined as \[\phi[x\in X]: F(x) \mapsto G(x)\\ \forall f \in F : x \mapsto y\,\,,\phi[y] \circ F(f) = G(f) \circ \phi[x]\]
Monads
A monad T = <T,1,m> on a category X consists of
- A endofunctor T on category X T:X->X
- A unit natural transformation n: 1 -> T
- A join (or multiplication) natural transformation mu: T*T -> T
A monad obeys to the associativity axiom:\[If\,\, T^{2}(X) = T \circ T (X)\,\,and\,\, T^{3}(X) = T \circ T \circ T (X)\\T(\mu X) : T^{3}(X) \mapsto T^{2}(X)\\\mu X : T^{2}(X) \mapsto T(X)\\\mu T(X) : T^{3}(X) \mapsto T^{2}(X)\] as well as the unit axiom: \[T(\eta X) : T(X) \mapsto T^{2}(X)\\\eta T(X) : T(X) \mapsto T^2(X)\\ I_{X} : T(X) \mapsto T(X)\] Example:
Let's consider the category, list of integer denoted List<Integer>. The monad related to the List<Integer> category is defined as \[Square : List \left \langle Int \right \rangle \mapsto List \left \langle Int\right \rangle\,\,\,\,\,\,\,\left \{ .. x .. \right \} \mapsto \left \{ .. x^{2} .. \right \}\\\eta : x \mapsto List\left \{ x \right \}\\\mu : List(List\left \langle Int \right \rangle ) \mapsto List \left \langle Int \right \rangle \,\,\left \{ \left \{ x_{0} .. x_{n}\right \}, \left \{y_{0} ..y_{m} \right \} \right \} \mapsto \left \{ x_{0},y_{0} .. x_{n},y_{n} .. y_{m} \right \}\] Monads are used to manage sequence of computations.
- Exception: may throw exceptions or generated known errors.
- Option: may fail or return a result
- State: may use mutable state
- Reader: may maintain a context during computation
- Writer: may produce side-effect during computation
- I/O: may success or fails depends on global state or variables.
A monad T = <T,1,m> on a category X consists of
- A endofunctor T on category X T:X->X
- A unit natural transformation n: 1 -> T
- A join (or multiplication) natural transformation mu: T*T -> T
A monad obeys to the associativity axiom:\[If\,\, T^{2}(X) = T \circ T (X)\,\,and\,\, T^{3}(X) = T \circ T \circ T (X)\\T(\mu X) : T^{3}(X) \mapsto T^{2}(X)\\\mu X : T^{2}(X) \mapsto T(X)\\\mu T(X) : T^{3}(X) \mapsto T^{2}(X)\] as well as the unit axiom: \[T(\eta X) : T(X) \mapsto T^{2}(X)\\\eta T(X) : T(X) \mapsto T^2(X)\\ I_{X} : T(X) \mapsto T(X)\] Example:
Let's consider the category, list of integer denoted List<Integer>. The monad related to the List<Integer> category is defined as \[Square : List \left \langle Int \right \rangle \mapsto List \left \langle Int\right \rangle\,\,\,\,\,\,\,\left \{ .. x .. \right \} \mapsto \left \{ .. x^{2} .. \right \}\\\eta : x \mapsto List\left \{ x \right \}\\\mu : List(List\left \langle Int \right \rangle ) \mapsto List \left \langle Int \right \rangle \,\,\left \{ \left \{ x_{0} .. x_{n}\right \}, \left \{y_{0} ..y_{m} \right \} \right \} \mapsto \left \{ x_{0},y_{0} .. x_{n},y_{n} .. y_{m} \right \}\] Monads are used to manage sequence of computations.
- Exception: may throw exceptions or generated known errors.
- Option: may fail or return a result
- State: may use mutable state
- Reader: may maintain a context during computation
- Writer: may produce side-effect during computation
- I/O: may success or fails depends on global state or variables.
Subscribe to:
Posts (Atom)