AWS Loft London

Back in October 2015 Metail hosted the 3rd Cambridge AWS User Group Meetup and in addition to Ian Massingham‘s review of AWS re:Invent 2015 I was given the opportunity to talk about our use of AWS for our big data processing pipeline. After this I was pleased to be invited to give an Elastic MapReduce (EMR) specific version of this talk at an AWS EMR master class. Roll on March and the AWS loft London with me on the agenda for the EMR Master Class session 🙂

After a busy week and some concentrated talk preparations I almost didn’t make it. I caught the train from Cambridge to Liverpool street with the intention of walking from there to Old Street. Unfortunately there were problems with the power lines on the Liverpool Street line which lead to everyone getting off at Harlow Town. After a taxi ride to Epping and a nervous ride into Liverpool Street on the central line, I finally arrived only five minutes after the session started. This meant I missed my opportunity to introduce myself to Abhishek Sinha (the session leader) but after catching his eye during his talk I was back on the agenda 🙂

late-tweet                                                         made-it-tweet

Elastic MapReduce Master Class

Abhishek gave a very interesting and well-presented guide to EMR and its best practices. As ever when I attend a talk by someone from AWS I learn plenty of new things and start re-evaluating our use of their tools. In this case, these were mainly around the use of spot instance task nodes and taking advantage of EMRFS.

The spot instance task nodes are nodes that only perform MapReduce tasks, having no HDFS storage, and come from the EC2 spot instance market. Using the spot instance market you can get the nodes at a lower price but if you’re outbid you lose the node. Any compute tasks running when you lose the node fail, but Hadoop was built with this in mind and simply reschedules the task on another node. With no HDFS storage, no data re-replication need be done. It’s common to set a bid price of 100% of the on-demand cost, you still get the EC2 node at a lower bid price and at worst you pay the normal cost. Further, by picking nodes that are less commonly used, you are less likely to be outbid. For example, if you normally request two m3.2xlarge task nodes but on the on the spot market the m3.xlarge were less commonly used, then requesting four task nodes would give you equivalent power but with a greater saving. This is an imaginary example, you can find out real data for spot market here.

The other feature of EMR we are not yet taking advantage of is EMRFS. AWS have decoupled the compute from storage by allowing EMR clusters to make very efficient use of S3. The main/only drawback here is that S3 has eventual consistency for overwrites and deletes of objects in the S3 file system. The EMR nodes are not aware of the delays and thus when one job takes as input the output of a previous one there is a chance of seeing an inconsistent view of the data. EMRFS uses a DynamoDB table to keep a record of the expected state of S3 and the EMRFS file system will retry if a request is made for an object that does not match the expected state. Currently we work around this limitation by having things set up in such a way that it isn’t a problem (more by luck than design ;)). Another common solution is to create two copies: one in the cluster’s HDFS file system and the other in S3. The copy in HDFS is lost when the cluster shuts down. We are currently redesigning our pipeline and it may become a greater problem in the next iteration so we’re keeping EMRFS in mind, noting that you do pay for the DynamoDB usage.

My First Big Data Application

As for my own talk, I think it was well received. I was asked some interesting questions at the end and I’m taking that as a good sign. After my talk and some lunch I stayed for the next session “My First Big Data Application” which was introduced as a modern big data pipeline. This was a great session where a pipeline was setup to collect, process and analyse web logs. This was strikingly similar to the pipeline I’d described in my talk, however theirs is indeed more modern 🙂 I think it’s interesting to compare the two pipelines and to contrast their different strengths and weakness.

Starting with my talk and the beginning of our pipeline, events are recorded by making GET requests for a Cloudfront-hosted pixel and Cloudfront logs all the requests to an S3 bucket. Here AWS do the hard work of distributing our pixel around the globe to ensure fast access to the user. They also batch up the request logs, writing them to the configured bucket after some time/size. We’ve never done any measurements but I believe the latency is typically less than an hour and we get logs of the order 10MB in size although they can be KB in size. For the demonstration Toby Knight (the speaker) set up an Apache web server on an EC2 node which saved its logs locally. He then used an AWS Kinesis collector to stream the logs in real time into the Kinesis Firehose which records the data in an S3 bucket. Here you can see the more modern event collector which is a real-time streaming system compared to our batch. For the following purposes it’s not really clear why Kinesis Firehose is better than our Cloudfront solution. I’m not sure how you scale out the Apache web server (fairly easily I imagine, it’s just not my area of expertise) but that’s work you’ll have to do yourself and when the second step is a batch system I’m not sure the latency matters. However, talking of latency this is where Kinesis has potential the Cloudfront solution clearly doesn’t. In Metail we don’t have any real time monitor of our event stream (it’s never been a critical requirement) but with Kinesis you can connect to a topic and trigger some processing on each new event. This increased flexibility is clearly a win.

For the next step both we and Toby turned to EMR for a ‘model on read’ batch Event Transform and Load (ETL). We are using MapReduce in Clojure (Cascalog at the moment but switching over to Parkour) to read in our Cloudfront logs, validate the events and format them in a schema that can be loaded into Redshift. Here ‘model on read’ means that Cloudfront doesn’t enforce a schema on the data, it will quite happily write some quite corrupt events to file. It’s only if we try to format that event as, say, an order that we start requiring it to have certain properties. Toby’s talk used Spark to process the events, perhaps just as an opportunity to show EMR supports the latest cool MapReduce technology 😉 It does have some advantages over MapReduce and should be a lot faster than our ETL as Spark uses in-memory data structures, it’s written in Scala though (but there are Clojure bindings with Flambo or Sparkling). For the next step Metail is keeping up with the Joneses and we do the modern thing and copy the output of the EMR batch stage into Redshift. Redshift is a petabyte scale data warehouse where you use a PostgreSQL-like language to query your data. After some initial teething troubles we think our new schema will allow us to make much better use of Redshift’s strengths. We use a product called Looker to model the data in Redshift, produce dashboards for both internal and external use, gain insights into our data through dynamic queries and quite a few other things. For the talk they demonstrated the use of AWS QuickSight which is in a limited preview. Although it will compete with Looker (and similar tools like Tableau) it’s aiming to be less full featured and much cheaper, allowing companies to give everyone access to the data with only a few people using more expensive tools like Tableau. I suspect for us it would never replace Looker, it seemed like it wouldn’t have the client facing support we require, and our more powerful data analysis tools come largely from the open source Python and R community 🙂 Still I’m very excited about SPICE (Super-fast, Parallel, In-memory Calculation Engine) which gives each QuickSight user a local in-memory DB for very fast data modelling and exploration. This should be available to partners like Looker and Tableau next year.

And that’s it, after mentioning only a ‘few’ technologies I’ve raced through Metail’s big data pipeline and compared it to a more modern equivalent. For anyone looking to build their first pipeline I think it is worth looking at the streaming solution as that technology is advancing fast and windowing over the streams give much more powerful batches. It’s something we’re planning to look into with Onyx for the next iteration.