Friday, October 27, 2017

Implement Reinforcement Learning in Scala

Target audience: Advanced
Estimated reading time: 8'

Ever pondered how robots, self-operating systems, or software-based game players acquire knowledge? 
The secret is nestled within an AI branch called reinforcement learning. In this piece, we delve into a widely recognized reinforcement learning technique - Q-learning, and illustrate its implementation in Scala.

What you will learn: Implementation of Q-Learning in Scala

Notes:
  • EnvironmentsScala: 2.12.4, Java JDK 11
  • This article assumes that the reader is familiar with machine learning and computer programming.

Overview

Numerous techniques exist within the field of reinforcement learning. A widely adopted approach involves exploring the value function space through the temporal difference method. Despite their diversity, all reinforcement learning methods converge on a common goal: to solve the challenge of identifying the best sequence of decision-making tasks [ref 1]. In these tasks, an agent engages with a dynamic system, choosing actions that influence state transitions, all with the aim of optimizing a specified reward function.

Basic components of reinforcement learning

At any given step i, the agent selects an action a(i) on the current state s(i). The dynamic system responds by rewarding the agent for its optimal selection of the next state:\[s_{i+1}=V(s_{i})\]
The learning agent infers the policy that maps the set of states {s} to the set of available actions {a}, using a value function  \[V(s_{i})\] The policy is defined at \[\pi :\,\{s_{i}\} \mapsto \{a_{i}\} \left \{ s_{i}|s_{i+1}=V(s_{i}) \right \}\] For example, a robot navigating through a maze makes its next move based on its present location and past actions. It's impractical to instruct the robot on every possible move for each location within the maze, rendering supervised learning techniques insufficient for this task.

Temporal difference

The most common approach of learning a value function V is to use the Temporal Difference method (TD). The method uses observations of prediction differences from consecutive states, s(i) & s(i+1). If we note r the reward for selection an action from state s(i) to s(i+1) and n the learning rate, then the value V is updated as \[V(s_{i})\leftarrow V(s_{i})+\eta .(V(s_{i+1}) -V(s_{i}) + r_{i})\]
Therefore the goal of the temporal difference method is to learn the value function for the optimal policy. The Q 'action-value' function represents the expected value of action a on a state s and defined as \[Q(s_{i},a_{i}) = r(s_{i}) + V(s_{i})\] where r is the reward value for the state.


On-policy vs. Off-policy

The Temporal Difference method calculates an estimated final reward for each state. This method comes in two variations: On-Policy and Off-Policy:
  • The On-Policy approach learns the value of the policy it employs for decision-making. Its value function is based on the outcomes of actions taken under the same policy, but it incorporates historical data.
  • The Off-Policy approach, on the other hand, learns from a variety of potential policies. 
As such, it bases its estimates on actions that have yet to be taken.
A widely used formula in the Temporal Difference approach is the Q-learning formula [ref 2]. This introduces a discount rate to lessen the influence of initial states on policy optimization. It operates without needing a model of the environment. In the action-value method, the selection of the next state involves choosing the action with the highest reward, known as exploitation. In contrast, the exploration approach emphasizes maximizing the total expected reward.
The update equation for the Q-Learning is \[Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .max_{a_{i+1}}Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))\]
Q(s,a)expected value action a on state s
eta: learning rate
alpha: discount rate 
One of the most commonly used On-Policy method is Sarsa which does not necessarily select the action that offer the most value.The update equation is defined as\[Q(s_{i},a_{i}) \leftarrow Q(s_{i},a_{i}) + \eta .(r_{i+1} +\alpha .Q(s_{i+1},a_{i+1}) - Q(s_{i},a_{i}))\]

Q-Learning

States & actions

Functional languages are particularly suitable for iterative computation. We use Scala for the implementation of the temporal difference algorithm [ref 3]. We allow the user to specify any variant of the learning formula, using local functions or closures.

Firstly, we have to define a state class, QLState (line 1) that contains a list of actions of type QLAction (line 3) that can be executed from this state. The only purpose of this class is to connect a list of action to a source state. The parameterized class argument property (line 4) is used to "attach" some extra characteristics to this state.

1
2
3
4
5
6
7
8
class QLState[T](
   val id: Int, 
   val actions: List[QLAction[T]] = List.empty, 
    property: T) {
    
  @inline
  def isGoal: Boolean = !actions.isEmpty
}

As described in the introduction, an action of class QLAction has a source state from and a destination state to(state which is reached following the action). A state except the goal state, has multiple actions but an action has only one destination or resulting state.

 case class QLAction[T](from: Int, to: Int) 


The state and action can be processed, created, and overseen using a directed graph or a search domain known as QLSpace. This search domain encompasses all potential states accessible to the agent.

Multiple states from this pool can be designated as objectives, and the algorithm allows the agent to pursue more than one goal state, not just a singular one. The procedure concludes when any of the selected goal states is attained, following an 'OR' logic approach. However, the algorithm does not facilitate the achievement of combined goals, as in an 'AND' logic framework.

Illustration of action-space in reinforcement learning

Let's implement the basic components of the search space QLSpace. The class list all available states (line 2) and one or more final or goal states goalIds (line 3). Although you would expect that the search space contains a single final state (goal) with the highest possible reward, it is not uncommon to have online training using more than one goal states.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class QLSpace[T](
   states: Array[QLState[T]], 
   goalIds: Array[Int]) {

    // Indexed map of states 
  val statesMap: immutable.Map[Int, QLState[T]] = 
        states.map(st => (st.id, st)).toMap
    // List set of one or more goals  
  val goalStates = new immutable.HashSet[Int]() ++ goalIds
 
    // Compute the maximum Q value for a given state and policy
  def maxQ(st: QLState[T], policy: QLPolicy[T]): Double = { 
     val best = states.filter( _ != st).maxBy(s => policy.EQ(s.id, s.id))
     policy.EQ(st.id, best.id)
  }
 
    // Retrieves the list of states destination of state, st
  def nextStates(st: QLState[T]): List[QLState[T]] =
     st.actions.map(ac => statesMap.get(ac.to).get)
 
  def init(r: Random): QLState[T] =  states(r.nextInt(states.size-1))
}

A hash map statesMap maintains a dictionary of all the possible states with the state id as unique key (lines 6, 7). The class QLSpace has three important methods: 
  • init initializes the search with a random state for each training epoch (lines 22, 23)
  • nextStates returns the list of destination states associated to the state st (lines 19, 20)
  • maxQ return the maximum Q-value for this state st given the current policy (lines 12-15). The method filters out itself from the search from the next best action. It then computes the maximum reward or Q(state, action) value according to the given policy.
The next step is to define a policy.

Learning policy

A policy is defined by three components:
  • A reward collected after transitioning from one state to another state (line 2). The reward is provided by the user.
  • A Q(State, Action) value associated to a transition state and an action (line 4).
  • A probability (with default values of 1.0) that defines the obstacles or penalties to migrate from one state to another (line 3).
The estimate combines the Q-value (incentive to move to the best next step) and probability (hindrance to move to any particular state) (line 7).

1
2
3
4
5
6
7
8
class QLData {
   var reward: Double = 1.0
   var probability: Double = 1.0
   var value: Double = 0.0) {
  
   @inline
   final def estimate: Double = value*probability
}

The policy of type QLPolicy is a container for the state transition attributes such as a reward, a probability and a value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class QLPolicy[T](numStates: Int, input: Array[QLInput]) {
   val qlData = {
      val data = Array.tabulate(numStates)(
         _ => Array.fill(numStates)(new QLData)
      )

      input.foreach(
        in => {  
           data(in.from)(in.to).reward = in.reward
           data(in.from)(in.to).probability = in.prob
        }
      )
      data
   }
  
   def setQ(from: Int, to: Int, value: Double): Unit =
       qlData(from)(to).value = value
 
   def Q(from: Int, to: Int): Double = qlData(from)(to).value
}

The constructor for QLPolicy takes two arguments:
  • Total number of states numStates used in the search (line 1)
  • Sequence of input of type QLInput to the policy
The constructor creates a numStates x numStates matrix of transition of type QLData (lines 3 - 12), from the input. 
The type QLInput wraps the input data (index of the input state from, index of the output state to, reward and probability associated to the state transition) into a single convenient class.

case class QLInput(
   from: Int,  
   to: Int, 
   reward: Double = 1.0, 
   prob: Double = 1.0)


Model training

The initial phase involves establishing a model for the reinforcement learning. During training, a model, known as QLModel, is constructed and consists of two key components:
  • 'bestPolicy', which delineates the transitions from any starting state to a target state.
  • 'coverage', which indicates the proportion of training cycles that successfully reach the goal state.
class QLModel[T](val bestPolicy: QLPolicy[T], val coverage: Double) 

The QLearning class takes 3 arguments:
  • A set of configuration parameters config.
  • The search/states space qlSpace.
  • The initial policy associated with the states (reward and probabilities) qlPolicy.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class QLearning[T](
   config: QLConfig, 
   qlSpace: QLSpace[T], 
   qlPolicy: QLPolicy[T]) 

    //model in Q-learning algorithm
   val model: Option[QLModel[T]] = train.toOption
    
    // Generate a model through multi-epoch training
   def train: Try[Option[QLModel[T]]] {}
   private def train(r: Random): Boolean {}

     // Predict a state as a destination of this current 
     // state, given a model
   def predict : PartialFunction[QLState[T], QLState[T]] {}

     // Select next state and action index
   def nextState(st: (QLState[T], Int)): (QLState[T], Int) {} 
}

The model of type QLModel (line 7) is created as optional by the method train (line 10). Its value is None if training failed.

The training method train consists of executing config.numEpisodes cycle or episode of a sequence of state transition (line 5). The random generator r is used in the initialization of the search space.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def train: Option[QLModel[T]] = {
   val r = new Random(System.currentTimeMillis)

   Try {
      val completions = Range(0, config.numEpisodes).filter(train(r) )

      val coverage = completions.toSize.toDouble/config.numEpisodes
      if(coverage > config.minCoverage) 
          new QLModel[T](qlPolicy, coverage)
      else 
          QLModel.empty[T]
    }.toOption
}

The training process exits with the model if the minimum minCoverage (number of episodes for which the goal state is reached) is met (line 8).

The method train(r: scala.util.Random) uses a tail recursion to transition from the initial random state to one of the goal state. The tail recursion is implemented by the search method (line 4). The method implements the recursive temporal difference formula (lines 14-18). 
The state for which the action generates the highest reward R given a policy qlPolicy (line 10) is computed for each new state transition. The Q-value of the current policy is then updated qlPolicy.setQ before repeating the process for the next state, through recursion (line 21).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def train(r: Random): Boolean = {
   
   @scala.annotation.tailrec
   def search(st: (QLState[T], Int)): (QLState[T], Int) = {
      val states = qlSpace.nextStates(st._1)
      if( states.isEmpty || st._2 >= config.episodeLength ) 
         (st._1, -1)
    
      else {
         val state = states.maxBy(s => qlPolicy.R(st._1.id, s.id))
         if( qlSpace.isGoal(state) )
             (state, st._2)
         else {
             val r = qlPolicy.R(st._1.id, state.id)   
             val q = qlPolicy.Q(st._1.id, state.id)
                    // Q-Learning formula
             val deltaQ = r + config.gamma*qlSpace.maxQ(state, qlPolicy) -q
             val nq = q + config.alpha*deltaQ
        
             qlPolicy.setQ(st._1.id, state.id,  nq)
             search((state, st._2+1))
         }
      }
   } 
   
   r.setSeed(Random.nextLong(System.currentTimeMillis))

   val finalState = search((qlSpace.init(r), 0))
   if( finalState._2 != -1) 
       qlSpace.isGoal(finalState._1) 
   else 
       false
}

Note
There is no guarantee that one of the goal states is reached from any initial state chosen randomly. It is expected that some of the training epoch fails. This is the reason why monitoring coverage is critical. Obviously, you may choose a deterministic approach to the initialization of each training epoch by picking up any state beside the goal state(s), as a starting state.


State prediction

Once trained, the model is used to predict the next state with the highest value (or probability) given an existing state. The prediction is implemented as a partial function.

def predict : PartialFunction[QLState[T], QLState[T]] = {
    case state: QLState[T] if(model != None) => 
        if( state.isGoal) state else nextState(state, 0)._1
}

The method nextState does the heavy lifting. It retrieves the list of states associated with the current state st through its actions set (line 2). The next most rewarding state qState is computed using the reward matrix R of the best policy of the QLearning model (lines 6 - 8).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def nextState(st: (QLState[T], Int)): (QLState[T], Int) =  {
    val states = qlSpace.nextStates(st._1)
    if( states.isEmpty || st._2 >= config.episodeLength) 
        st
    else {
        val qState = states.maxBy(
           s => model.map(_.bestPolicy.R(st._1.id, s.id)).getOrElse(-1.0)
        )

        nextState( (qState, st._2+1))
    }
}

Q-Learning limitations

So far, this article has focused on Q-Learning algorithm that has some significant drawbacks:
  • Non-stationary environments: Q-Learning assumes a stationary environment, where the rules and dynamics don't change over time. 
  • Continuous state and action spaces: Q-Learning struggles in environments with very large or continuous state and action spaces. 
  • Costly experiments: The algorithm typically requires a lot of experiences (state-action-reward sequences) to converge to an optimal policy, which can be impractical in real-world scenarios where collecting data is expensive or time-consuming.
  • Biased Q-values: Q-Learning tends to overestimate Q-values because it uses the maximum value for the next state. 
  • Lack of generalization: Traditional Q-Learning does not generalize across states. It treats each state-action pair as unique, which is inefficient in complex environments.
  • Hyper-parameters tuning: The final policy can be highly dependent on initial conditions,  learning rate and discount factor.
Deep learning addresses some of these limitations.

Deep reinforcement learning

While this article won't delve into the complexities of deep reinforcement learning, we will explore some methods that address the constraints of Q-Learning.

As described previously, traditional reinforcement learning involves an agent learning to make decisions by interacting with an environment. The agent performs actions and receives feedback in the form of rewards or penalties. Its goal is to maximize cumulative rewards over time. This process involves learning a policy that dictates the best action to take in a given state.

In Deep Reinforcement Learning (DRL), neural networks are used to approximate the functions crucial in Q-Learning, which action to take or the value function [ref 4]. DRL can handle environments with high-dimensional input spaces, like visual data from cameras or complex sensor readings, which traditional RL struggles with.

DRL has been successfully applied in various domains like gaming, robotics , financial trading, recommendation systems, and simulation.

The most commonly used DRL algorithms are: 
  • Deep Q-Networks (DQN)
  • Policy gradient methods like REINFORCE
  • Proximal Policy Optimization (PPO)
  • Actor-Critic 
  • Trust Region Policy Optimization
DRL has its own challenges such as high computational costs, data inefficiency, and difficulty to adapt to various environments [ref 5].



An article or blog post cannot feasibly cover every aspect and strategy of reinforcement learning, ranging from K-armed bandits to deep learning in its entirety. Nonetheless, this chapter aims to offer a guide for implementing a basic reinforcement learning algorithm, Q-Learning in Scala.


References

[4] Deep Reinforcement Learning Hands-on. - M. Kapan - Packt Publishing - 2018




---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. 
He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3


Tuesday, August 22, 2017

Scala Futures with Callbacks

Target audience: Intermediate
Estimated reading time: 6'

Futures are a critical concept in Scala and parallel computing. This post describes and illustrates the futures in Scala and Akka as a non-blocking mechanism in distributed computing.


Table of contents
Follow me on LinkedIn
Notes
  • Implementation relies on Scala 2.11.8
  • For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 

Overview

Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously back to the client controller.
In a previous post, we looked into Akka futures for which the client is blocked until all the concurrent tasks (futures) have completed. In this post, we look into the non-blocking approach that relies on onSuccess and onFailure callback handlers. We reuse the same example as with the blocking design


Futures callback design

The first step is to create a controller to manage the concurrent tasks, FuturesController. The controller takes the input time series xt and a list of model candidates. As the time series uses a single variable, the model are simply defined as a one variable function  (x: Double) => f(x).

 1
 2
 3
 4 
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
case class Launch()
 
final class FuturesController(
    xt: Array[Double],
    xs: List[Double => Double])
    (implicit timeout: Timeout) extends Actor {
   
    // Event/message handling loop/thread
    // implemented as a partial function
    override def receive = {
        case s: Launch => 
            sender ! processFutures(createFutures)
    }
 
    def createFutures: List[Future[Double]]
    def processFutures(futures: List[Future[Double]]): Double
}

The event handler receive (line 10) for the message Launch (line 1) creates as many futures as needed (equal to the number of models to evaluate) createFutures (line 12). The futures are then passed to the method processFutures for handling the callbacks notification (line 16).
The futures are created with their computation (or execution) unit. In this particular case, the task consists of computing the least squared error for each model relative to the input dataset (lines 5 to 9)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def createFutures: List[Future[Double]] = 
   xs.map(f => Future[Double] { 
    
       val yt = Range(0, xt.size).map( f(_) )
       val r = (xt, yt).zipped.foldLeft(0.0)((sum, z) => {
           val diff = z._1 - z._2
           sum + diff*diff
       })
       Math.sqrt(r)
  })

The method processFutures iterates through the notification (success/failure) from all the futures [1]. It filters out the future task(s) which fails [2] then select the model with the lowest least square error [3].
This version allows to catch an exception before applying the filter. The filter catches the incorrect values marked at -1 (2).

def processFutures(futures: List[Future[Double]]): Double =

   futures.map(f => { //1 
       f onSuccess { case y => y }
       f onFailure { case e: Exception => { 
           println(e.toString)
           -1.0 
       }}
   }).filter( _ != -1.0) // 2
     .zipWithIndex.minBy( _._1)._2 //3


Evaluation

Let's consider a very simple application which consists of distributing the following computation
   s(x) = f(x) + g(x) + h(x)  // ff(0), ff(1), ff(2)
Each computation relies on a different ff polynomial functions with an increasing order of complexity. Each of the three computations are assigned to one actor running on a dedicated CPU core.

val ff = List[Double => Double](
   (x: Double) => 3.0*x,
   (x: Double) => x*x + x,
   (x: Double) => x*x*x + x*x + x
)

The compute method create a list of futures from the list ff of computational functions. Then the method process the call back onSuccess and onFailure
The first version is a detailed implementation of the creation and processing of futures

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def compute(x: Double): Double = {
    // Create a future for each of the component of ff
   val futures = ff.map(f => Future[Double] { f(x) } )
 
     // processes the results from each future
   var sum = 0.0
   futures.foreach( f => {
       f onSuccess { case x: Double => sum += x }
       f onFailure { case e: Exception => println(e.toString) }
    })
    sum
}
 
val x = 2.0
println(s"f($x) = ${compute(x)}")

The appropriate implicit conversions needs to be imported (line 2). A future is created from each of the three polynomial function ff (line 6). Then the output of each of the future execution is aggregated (line 11). This version is relies on a two step process
  • Creation of futures (line 6)
  • Computation and aggregation (lines 10 - 14)
A more compact version relies on the foldLeft higher order method to combine the creation and execution of futures.

1
2
3
4
5
6
7
8
9
def compute(x: Double): Double = {

   ff.map(f => Future[Double] { f(x) } )./:(0.0)((s, f) => {
       var y = 0.0
       f onSuccess { case x: Double => y = s + x  }
       f onFailure { case e: Exception =>  y = s }
       y
   })
}


References