Building a Docker Container Orchestrator with Akka

Nowadays, Docker containers have become the core behind service-oriented architectures (microservices). With this approach, there’s a need of some distributed applications which are able to run these containers at scale, like Kubernetes or Docker Swarm. Maybe one day you wonder how difficult it would be to build such a system, or even how it can be done, and this post tries to shed some light on it.

In this post, we are going to explain how to build a very simple Docker container orchestrator with the Akka framework, with emphasis in the tools that makes building distributed applications easier. The complete code is available on the following GitHub repository, so the post will be more focused on a design level:

https://github.com/roberveral/docker-akka

But, what do we want to build?

Like any other project, the first step is to define what we want to do and the requirements of the system. We use Scrum as work methodology so, let’s write it as an epic user story.

AS an user,

I WANT to launch Docker container based services in a cluster without being care about where exactly is the container running (as if all the nodes of the cluster were one), with ability to scale out the service, the cluster and with fault tolerance and error handling,

SO THAT my services will remain up and running at scale.

Sounds easy to do, right?

Architecture Overview

If we take a closer look at the user story, we could see that we need a kind of endpoint which allows the user to launch the container based services, some mechanism of Service Discovery which is an abstraction over the location of the containers and, in the core, we need a distributed system which allows to launch tasks (the containers) and manages the work that is being done, which in fact is close to a traditional master-worker system.

As we said at the beginning, the core we are going to use is Akka Cluster. The Akka framework helps to build concurrent, distributed and resilient message-driven applications by using actor systems. Inside the Akka framework resides Akka Remote, a module for communicating actors that are not running in the same machine using TCP transport protocol. These approach makes remoting easy, but it is static, there is no ability to scale to new nodes without restarting or changing the application code. For solving this problem, the Akka Cluster module is an abstraction over the Akka Remote module which allows to make a cluster of nodes and distribute the actors among them, being able to scale out the cluster and with health checking and fault tolerance mechanisms. In fact, it gives full location transparency, your code doesn’t need to concern itself about actor placement. We are going to take advantage of this module by giving each node in the cluster a role, which could be either a master or a worker. This role is going to be used to know which kind of tasks can be done in its nodes. From a 10.000 ft view, the master nodes will have actors which manage the running services and will send tasks (containers to run) to the workers by sending messages to worker actors.

For the other two components, we will use Akka HTTP for the REST API endpoint and Consul as Service Discovery tool.

This is how the application architecture looks, including the design of the actor system (which will be explained with more detail later):

Overview

That’s the overview of the system (no one said it would be easy). Don’t worry if you feel overwhelmed, Akka makes it easier than it sounds. In the following sections we will go through the main design aspects and how Akka Cluster helps to accomplish them.

Cluster bootstrapping and membership

The first problem we have to solve is how the cluster nodes are going to conform a real cluster. Taking a look at the Akka Cluster guide, it says that special nodes called “seed” are used for this purpose. All the cluster nodes perform a join operation against the seed nodes to be part of the cluster. Which seed nodes to use is defined in the application properties in the following way:

akka.cluster {
  seed-nodes = [
    "akka.tcp://ClusterSystem@127.0.0.1:2551",
    "akka.tcp://ClusterSystem@127.0.0.1:2552"]
}

Wait, we are talking about location transparency and containers in a world of cloud computing and dynamic environments, and we have to set the address of some nodes statically? Wouldn’t it be better if we could delegate the task of finding nodes to join to our Service Discovery tool (Consul)?

Akka allows us to leave the seed-nodes property empty and perform a manual join to the cluster using the following method:

Cluster(system).joinSeedNodes(seedNodes)

Given that, we can use Consul to get the list of healthy masters (we don’t want workers joining each other without a master guiding them) and use the previous method to join the cluster using this list as seed nodes. Prior to that, we need to register the node in Consul. For interacting with Consul in Scala (JVM) we use the orbitz Consul Client library.

/**
    * Gets the healthy seed addresses from Consul. The bootstrapping is performed against the
    * masters, so only masters are considered.
    * @param config configuration to use to extract values
    * @param system ActorSystem that is going to be joined
    * @return list of healthy master addresses
    */
  def getSeedAddresses(implicit config: Config, system: ActorSystem): List[Address] = {
    // Gets a consul agent to interact with the service
    val consul: Consul = getConsulAgent
    // Set query options for Consul
    val queryOpts = ImmutableQueryOptions
      .builder()
      .consistencyMode(ConsistencyMode.CONSISTENT)
      .build()
    // Get registered master nodes (only master nodes are used for bootstrapping)
    val serviceNodes = consul.healthClient().getHealthyServiceInstances("docker-akka-master", queryOpts)

    // Create a list of remote addresses with the healthy master nodes
    serviceNodes.getResponse.toList map { node =>
      Address("akka.tcp", system.name, node.getNode.getAddress, node.getService.getPort)
    }
  }

Now the only static configuration property required is the Consul agent address:

consul {
  host       = "127.0.0.1"
  port       = 8500
}

For those who want a more generic solution, I recommend to take a look at ConstrucR.

REST API Definition

To accomplish the Definition of Done of the User Story we need to define an endpoint that allows the user to create, scale and destroy services in the cluster, keeping it as simple as possible. We are going to use for this purpose the following routes:

Method Endpoint Description Parameters
GET /members Gets current cluster members
GET /services/:service Gets detailed info about the running instances of the given service
POST /service/:service Creates the given service { "image": "docker-image", "ports": ["list of ports to expose"], "instances": "number of instances to start" }
PUT /service/:service Scales the given service { "instances": "new number of instances of the service" }

Coding these routes with the Akka HTTP DSL is quite straight-forward, as we can see in this example:

// Route to get info of all the running services
def servicesRoute: Route =
  pathPrefix("services") {
    pathEndOrSingleSlash {
      get {
        onSuccess(scheduler ? ServiceScheduler.Status) {
          case ServiceList(ls) => complete(OK, ls)
          case _ => complete(InternalServerError)
        }
      }
    }
  }

Actor System Design

The core system of the application is the Actor System, which distributes the work between nodes and really “orchestrates” the containers. We have to make a design that allows us to distribute the work using actors, and this design is showed in the following diagram. The details about how each actor works are explained below.

Workflow

ServiceMaster actor

These actor manages a service created by the user. It controls the definition of the service, the number of instances (containers) running and which are the worker actors in which each instance is running. There is one of this actors for each service running in the cluster. The ServiceMaster actor is the brain that decides how the work is sent to the workers. When it detects that a worker running an instance of its service has failed, it will try to send that instance to another worker in order to keep the number of running instances.

The biggest problem here is deciding how the ServiceMaster actor will offer work (containers of the service to run) to the worker nodes. Also, we need to spawn the ServiceWorker actors for running the service instances with location transparency. The solution resides in the routers provided by Akka. A router is a special actor that distributes messages between actors (routees) to perform load balancing. A router can be a pool (also spawns the target actors) or a group (the routees must be running before). When working with Akka Cluster, there are a set of cluster-aware routers, which spawn actors in all the nodes of the cluster with a concrete role. In this case, we are interested in the AdaptiveLoadBalancingPool (from the Akka Cluster Metrics extension). This router uses metrics like CPU or memory usage to decide to which actor it forwards the incoming message.

A ServiceMaster actor creates an AdaptiveLoadBalancingPool router which spawns a ServiceWorker actor for this service in each worker node. When the ServiceMaster has instances to run, it sends a message to the router for each remaining instance. The router forwards the message to the ServiceWorker actor running in the node with less workload, providing more fair resource utilization. The work model with this approach is the following:

  • The ServiceMaster offers an instance container to the router.
  • A ServiceWorker for the service receives the message and requests the work to the master if it its idle.
  • The ServiceMaster accepts the worker if it has instances to run and watch it for failure.
  • That ServiceWorker starts running the service.
// Creates a router that will balance the request using metrics of the node as placement strategy
context.actorOf(
  ClusterRouterPool(AdaptiveLoadBalancingPool(MixMetricsSelector, supervisorStrategy = strategy),
    ClusterRouterPoolSettings(totalInstances = 1000, maxInstancesPerNode = 1,
      allowLocalRoutees = false, useRole = Some("worker"))).props(ServiceWorker.props(service)),
  name = s"router-${service.name}")

As the ServiceMaster actor has the definition of a service, it is important that its state is persisted, so in case of failure it could keep the number of running instances. This persistence is accomplished by using the Akka Persistence extension. A PersistentActor in this extension can save some messages as events in an event journal (which is stored in a Cassandra database). When the actor is recreated after a failure or restart, the stored events are reproduced in order to recover the status of the actor. This journal can grow very quickly, so it’s possible to save snapshots in certain points of time, which are backups of the state from which the next recovery will start. In this particular case, the events that affect the number of instances are stored for recovery.

ServiceWorker actor

A ServiceWorker actor does only one thing, run a container. It could be in two states, idle (waiting for an offer from the ServiceMaster) or running a container. When it is running a container, it’s done in a Scala Future, so the actor could receive messages and get information about the running container or even destroy it. There is one ServiceWorker per running ServiceMaster in each worker node in the cluster, even when there are not enough instances to run. This has not performance impact because the Akka actors are very cheap in their resource usage. When it creates a container, it also registers it as a service instance in Consul for Service Discovery.

ServiceScheduler and Sharding

The only thing left is the connection between the REST API requests and the ServiceMaster actor of each service. This is the purpose of the ServiceScheduler actor. Every request made against the API results in a message sent to the ServiceScheduler actor. There is one ServiceScheduler in each master node, but, how can it forward the messages to the appropriate ServiceMaster actor for the requested service?

Again, Akka Cluster comes to the rescue, this time with the Akka Sharding extension. Akka Sharding allows to distribute actors between the nodes of a cluster with a concrete role. Each node of the role runs a ShardRegion actor, which keeps track of the created actors. Every message sent to the sharding group is sent to the ShardRegion actor. These actor extracts the name of the target actor from the content of the message and communicates with the ShardingCoordinator to know where it is placed. It the actor doesn’t exist, it is created, and then the message is forwarded to the target actor. The ShardingCoordinator uses the Akka Cluster Singleton extension, which means that there is only one actor of that class running in the cluster, and all the nodes have a proxy reference to it. Akka Sharding also gives more fault tolerance capabilities, because if a node (and its Shard) goes down, it recreates the missed actors in the other nodes. When a new node join the cluster, there could be also a rebalance, where the running actors can be moved to another node (and restarted). Given that, is important that sharded actors are persistent in order to recover its state if a failure or rebalance occurs.

In this case, the ServiceScheduler forwards the messages to the ShardRegion actor, which takes the service name to forward it to the proper ServiceMaster, which could be in any of the master nodes. With this approach, we get a multi-master design, where all the master nodes are handling real work instead of waiting for being the leader.

class ServiceScheduler(implicit timeout: Timeout) extends Actor with ActorLogging {
  import context._
  // Initializes Cluster Sharding for creating Masters
  ClusterSharding(context.system).start(
    ServiceMaster.shardType,
    ServiceMaster.props,
    ClusterShardingSettings(context.system).withRole("master").withRememberEntities(true),
    ServiceMaster.extractEntityId,
    ServiceMaster.extractShardId
  )

  // Defines the shardedActor to talk with
  def shardedMaster: ActorRef = ClusterSharding(context.system).shardRegion(ServiceMaster.shardType)

  override def receive: Receive = {
    case msg: ServiceMaster.Command => shardedMaster forward msg
...

Node downing

By default, when a node crashes, the elected cluster leader detects it as Unreachable and awaits for its comeback. During that time the leader don’t accept new cluster joins and the actors that were running in the crashed node are not marked as Terminated. This leave the cluster in a kind of “locked” state, and the only way to get rid of it is by marking down the crashed node manually. It’s not the most operational scenario, as you can see.

Akka provides an auto-downing feature which allows you to configure a timeout when an Unreachable node is marked Down and removed of the cluster. This option sounds more practical, but it’s strongly not recommended by the Akka team for production use. Yes, another problem to solve.

Why is that? Akka uses a gossip protocol for convergence of cluster state between nodes, and, in case of a network partition, two subsets of nodes will remain separated of the other set but well connected inside. If auto-downing is enabled, both subsets will mark as Down the other set and we will end with two different clusters, with duplicate Shards and Singletons.

Luckily, there is a way to solve this by implementing a new auto-downing strategy smarter than the default one by implementing the DowningProvider class (the default implementation is a very good example of how to do it). Then, you can configure Akka to use this strategy by setting the following parameter in the configuration:

akka.cluster {
  downing-provider-class = "com.github.roberveral.dockerakka.cluster.CustomDowning"
  auto-down-unreachable-after = 5s
}

In our case, we use a modification of the default strategy in which we only mark down unreachable nodes if it detects more up nodes than unreachable nodes. With this approach, if a network partition occurs, only the subset with more nodes will mark the other subset down, so only one cluster remains up and running.

Conclusion

During the post we have gone through the tools that the Akka framework provides to develop a distributed system, and we have seen that they makes it easier, but there still some complexity like fault tolerance or network partitions (distributed computing is hard).

At the end, we manage to build a Docker orchestrator using these tools that is far away of being production ready but it could be a good example of how Akka Cluster works.

References

http://sap1ens.com/blog/2016/11/12/bootstrapping-akka-cluster-with-consul/

http://stackoverflow.com/questions/30575174/how-to-configure-downing-in-akka-cluster-when-a-singleton-is-present

mm

Roberto Veral

I started getting in touch with Big Data, using Spark and the Hadoop ecosystem, working with cloud providers. Now, as a Big Data Developer, I work with IaaS, Terraform, Ansible, Scala, Spark... I am a vocational Computer Scientist and I enjoy learning new technologies and discovering new things.

More Posts

Follow Me:
Twitter