Building an ETL framework with Akka and Scaldi

Francois Dang Ngoc

Sr. Software Engineer

At Simulmedia, every day we ingest a large amount of data coming from various sources that we process in batch and load into different data stores. We wanted to build a new framework for processing this data and knew we wanted to stay away from Hadoop based on prior experience.

While Hadoop can solve most of the problems we were trying to solve, the Map/Reduce paradigm has its limitation in terms of performance (due to HDFS used as a base for fault tolerance, not using memory in an optimized way, etc). Other architectures based on in-memory data grids (e.g., Oracle Coherence, Hazelcast, Gridgain, Pivotal Gemfire, Infinispan, …) combine the memory of multiple computers together and distribute the processing across the nodes. While this approach is much faster, it is not suitable for bigger datasets where data doesn’t fit in the in-memory datagrid. Also, like Hadoop, data needs to be imported first into the system (in-memory or HDFS). Streaming architectures ( Apache Storm, Apache Samza, Apache S4, …) have recently emerged where data is streamed from data sources, processed in a pipeline fashion and stored in a database. However, despite being popular for real-time platforms, it is not suitable for batch processing. More recently, actor-based systems that originated in 1973 have regained interest due to the popularity of the Akka framework. That has led to new architectures such as Apache Spark that rely on immutable fault tolerant abstraction where data can be recomputed from start in case of failures and to avoid persisting intermediate results back and forth like in Hadoop. Even though Spark has exploded in popularity recently (and used in various data science projects at Simulmedia), we chose to use the Akka framework directly to have more flexibility and control. We also leveraged Scaldi, a dependency injection framework to offer a pluggable architecture with actor composition.

Architecture

Scaldi_1.png

The data coming from our different providers are first centralized into S3 and stored in a compressed format in the raw data bucket. After the data is processed, we store the output compressed in the staging bucket. The data is then loaded into our Redshift cluster and other data stores or consumed directly from S3 in Spark/MLLib by our data science team to test new models.

We separate the Extract-Transform process from the Load process so that we can load the data into multiple database systems. We keep the processed data in the staging bucket for some time (so one can import data easily in their database without having to rerun the computation). We also keep the raw data in case the data needs to be reprocessed later on.

Akka

Akka is an actor based framework. It is based on a simple paradigm where actors don’t share anything with other actors and communicate with each other by message passing. Each actor has a queue of messages and by default, a threadpool takes care of executing the actors to process messages. In this context, thread allocation is optimized as they would only be used when actors have messages to process. In its simplest form, an Extract-Transform system would consist of 2 types of actors:

• Master actor

• Worker actor

Scaldi_2.png

We separate the actor logic from the data processing. In this case, the Master Actor is in charge of distributing the work (URLs of new files to be processed from the S3 bucket) using a Round Robin router and instantiate the Worker actors. In order to determine the work to be distributed, we pass an implementation of MasterProcessor (that will differ from ETL to ETL) whose sole purpose is to return the list of URLs to be processed. When all the work is processed, it’s in charge of shutting down the Akka system.

The WorkerActor is in charge of running a WorkerProcessor while taking care of failures (e.g., retry logic and notify master after a certain number of failures). Upon receiving a message, it will execute the implementation of WorkerProcessor that was passed to it.

In order to link the actors and the different processors together in a flexible way using different implementations depending on the type of processing to be done, we use Scaldi, a dependency injection library for Scala and Akka.

Scaldi

On the surface, Scaldi is quite similar to Google Guice, a popular dependency injection library along with the Spring Framework. It offers a simple DSL to inject dependencies but unlike Spring or Guice, it doesn’t rely on annotations or XML configuration files and instead uses traits and implicit injectors to allow dynamic and conditional binding. For instance:

// here when someone binds Worker later on, it will call the block {new Worker(fileStore, processor)} and so every binding will create a new instance of Worker
bind[Worker].toProvider(new Worker(fileStore, processor))
// here processor is instantiated lazily once and all injections will use the same instance
bind[Processor] to new DailyLogProcessor()

will create a new instance of Worker whenever we inject the class Worker but only one instance of Processor:

class SimpleApp(implicit inj: Injector) extends Injectable {
   val processor = inject[Processor]
   val worker1 = inject[Worker]
   val worker2 = inject[Worker]
}

In Scaldi, binding is done by extending the trait Injectable, having an implicit Injector passed in the constructor and use inject.

Master Actor

In our simple case, the SimpleMasterActor can be implemented as follows:

class SimpleMasterActor(masterProcessor: MasterProcessor)(implicit inj: Injector) extends MasterActor with AkkaInjectable {
  val config = context.system.settings.config
  val numWorkers = config.getInt("num_workers")
  val target = config.getString("target")
  var numPendingWorks: Int = _
  val workerActorProps = injectActorProps[WorkerActor]
  // Round Robin router
  val workerRouters = {
    val routees = Vector.fill(numWorkers) {
      // inject the WorkerActor implementation through Scaldi
      val r = context.actorOf(workerActorProps)
      context watch r
      ActorRefRoutee(r)
    }
    Router(RoundRobinRoutingLogic(), routees)
  }
  override def receive: Receive = {
    case s: StartMasterMessage =>
      val urls = masterProcessor.getNewBatches()
      numPendingWorks = urls.size
      for (url <- urls) {
        val msg = WorkerMessage(url, target)
        workerRouters.route(msg, self)
      }
    case WorkerAckMessage(url, result) =>
      completeWork()
    case WorkerErrorMessage(url, error) =>
      // log error
      completeWork()
  }
  def completeWork() {
      numPendingWorks -= 1
      // if we don't have any pending works then shutdown the system
      if (numPendingWorks == 0) {
        context.system.shutdown()
      }
  }  
}

In this code, the master simply dispatches the URLs returned by the master processor implementation to all the workers. If the number of URLs to process is huge (e.g., over a million), then we might want to implement a logic where the master will take care of sending the URLs to workers in a more controlled way (e.g., send n URLs to workers at a time and wait until they ack). In order to dispatch the messages to the workers, we use a Round Robin router. It's constructed using an actor props injected with injectActorProps that will then be used to create the worker actors. When a worker actor is done processing a URL, it will send back a WorkerAckMessage or WorkerErrorMessage in case of error to the master actor. Using these messages we can track the number of works that have been processed and when they've been completed, we shutdown the Akka system.

Worker Actor

A simple implementation of SimpleWorkerActor consists of an actor that reads a URL from the S3 bucket and processes it using a work processor implementation. We also have a simple retry mechanism so that upon failure, we reprocess the data again. After a certain number of retries, we simply send the exception to the master worker.

class SimpleWorkerActor(fileStore: FileStore, workProcessor: WorkProcessor) extends WorkerActor with AkkaInjectable with LazyLogging {
  val maxRetries = context.system.settings.config.getInt("worker.retries")
  override def receive: Receive = {
    case WorkerMessage(url: String, target: String, retries: Int) =>
      process(url, target, retries)
  }
  private def process(url: String, target: String, retries: Int): Unit = {
    try {
      val is = fileStore.fetch(url)
      val result = workProcessor.process(is, target)
      context.parent ! WorkerAckMessage(url, result)
    } catch {
        case e: Exception =>
          if (retries < maxRetries) {
            // send message to itself
            logger.info(s"Retrying $url $retries / $maxRetries...", e)
            self ! WorkerMessage(url, target, retries + 1)
          } else {
            // give up
            context.parent ! WorkerErrorMessage(url, e)
          }
    }
  }
}

Putting everything together

In order to bind all the actors together, just like with Google Guice, in Scaldi we create a module that contains the bindings:

class ExampleAkkaModule(config: Config) extends Module {
  val system = ActorSystem("example", config)
  val fileStore = new S3FileStore(accessKeyId, secretAccessKey, region)
  val databaseService = new DatabaseService(config, fileStore)
  val awsConfig = config.getConfig("aws")
  val accessKeyId = awsConfig.getString("aws_access_key_id")
  val secretAccessKey = awsConfig.getString("aws_secret_access_key")
  val exampleProcessor = new exampleProcessor(config, fileStore)
  val region = awsConfig.getString("region")
  bind[ActorSystem] toProvider system destroyWith (_.shutdown())
  bind[WorkerActor].toProvider(new SimpleWorkerActor(fileStore, exampleProcessor))

We bind system and SimpleWorkerActor that are used in the App and SimpleMasterActor.

Finally the App that runs the Extract Process can be written as follows:

implicit val appModule = new ExampleAkkaModule(config)
implicit val actorSystem = inject[ActorSystem]
val master = injectActorRef[MasterActor]
master ! StartMasterMessage()

Conclusion

Every ETL is different. There is no one approach that fits all problems. The approach presented here based on Akka is flexible (using Scaldi) and scalable (using Akka Remote or Akka Cluster). Depending on the complexity and time constraints for ETLs, one can decide to go with simpler abstraction (e.g., Hive, Spark SQL, …) by sacrificing performance for simplicity. Other approaches based on DSL like in Spark (based on resilient immutable abstraction that allows to better use memory and handle fault tolerant without having to store intermediate results) or Summingbird (Scala DSL on top of Hadoop or Storm) also offers flexibility with good performance.

Interested in getting the latest from Simulmedia?

News, insights, and events sent straight to your inbox!