In-Memory Logical Data Warehouse for accelerating Machine Learning Pipelines on top of Spark and Alluxio

Abstract:

Legacy enterprise architectures still rely on relational data warehouse and require moving and syncing with the so-called “Data Lake” where raw data is stored and periodically ingested into a distributed file system such as HDFS.

Moreover, there are a number of use cases where you might want to avoid storing data on the development cluster disks, such as for regulations or reducing latency, in which case Alluxio (previously known as Tachyon) can make this data available in-memory and shared among multiple applications.

We propose an Agile workflow by combining Spark, Scala, DataFrame (and the recent DataSet API), JDBC, Parquet, Kryo and Alluxio to create a scalable, in-memory, reactive stack to explore data directly from source and develop high quality machine learning pipelines that can then be deployed straight into production.

In this talk we will:

* Present how to load raw data from an RDBMS and use Spark to make it available as a DataSet

* Explain the iterative exploratory process and advantages of adopting functional programming

* Make a crucial analysis on the issues faced with the existing methodology

* Show how to deploy Alluxio and how it greatly improved the existing workflow by providing the desired in-memory solution and by decreasing the loading time from hours to seconds

* Discuss some future improvements to the overall architecture

Original meetup event: http://www.meetup.com/Alluxio/events/233453125/

The Barclays Data Science Hackathon: Building Retail Recommender Systems based on Customer Shopping Behaviour

From Data Science Milan meetup event:

In the depths of the last cold, wet British winter, the Advanced Data Analytics team from Barclays escaped to a villa on Lanzarote, Canary Islands, for a one week hackathon where they collaboratively developed a recommendation system on top of Apache Spark. The contest consisted on using Bristol customer shopping behaviour data to make personalised recommendations in a sort of Kaggle-like competition where each team’s goal was to build an MVP and then repeatedly iterate on it using common interfaces defined by a specifically built framework.
The talk will cover:

• How to rapidly prototype in Spark (via the native Scala API) on your laptop and magically scale to a production cluster without huge re-engineering effort.

• The benefits of doing type-safe ETLs representing data in hybrid, and possibly nested, structures like case classes.

• Enhanced collaboration and fair performance comparison by sharing ad-hoc APIs plugged into a common evaluation framework.

• The co-existence of machine learning models available in MLlib and domain-specific bespoke algorithms implemented from scratch.

• A showcase of different families of recommender models (business-to-business similarity, customer-to-customer similarity, matrix factorisation, random forest and ensembling techniques).

• How Scala (and functional programming) helped our cause.

Surfing and Coding in Lanzarote, the Barclays Data Science hackathon

This post has been published on the Cloudera blog and summurises the results and takeaways of a week-long hackathon happened in Lanzarote in December 2015. The goal was to prototype a recommender systems for retail customers of shops in Bristol in Bristol, UK. The article shows how the stack composed by Scala and Spark was great for quickly writing some prototyping code to run locally in a single laptop and at the same time scalable for larger dataset to process in the cluster.

man with laptop on colorful beach of island

Please continue reading at http://blog.cloudera.com/blog/2016/05/the-barclays-data-science-hackathon-using-apache-spark-and-scala-for-rapid-prototyping/.

Robust and declarative machine learning pipelines for predictive buying

Proof of concept of how to use Scala, Spark and the recent library Sparkz for building production quality machine learning pipelines for predicting buyers of financial products.

The pipelines are implemented through custom declarative APIs that gives us greater control, transparency and testability of the whole process.

The example followed the validation and evaluation principles as defined in The Data Science Manifesto available in beta at http://www.datasciencemanifesto.org

Lessons learnt from building data-driven production systems at Barclays

bart-models.gif

In the last years at Barclays we learnt and tried a lot of stuff that made the Advanced Analytics team very successful inside a large organization where, as such, being a productive data scientist is a tough challenge.

The data science team works on a mix of descriptive, predictive and prescriptive projects that make use of machine learning and big data technologies, mainly on top of Apache Spark. Even though we deliver per-request insights coming from manual analysis, we primarily build automated and scalable systems to be periodically used either internally for a better decision-making or customer-facing in the form of analytics services (e.g. via the web portal).
In this post series I want to share some of the best practices, tools, methodologies and workflows that we experimented and the lessons learnt from them. I will skip a few aspects of machine learning systems, since that I found those to be already well covered in other talks and articles, you can find the reference links at the end of this post.
Moreover not all of the data-driven projects require a machine learning component, at least not at every stage. I would like to quote Peter Norvig from a recent article published at KDnuggets:

“Machine Learning development is like the raisins in a raisin bread: 1. You need the bread first 2. It’s just a few tiny raisins but without it you would just have plain bread.”

Please keep in mind that each scenario is different thus there are not strict rules to advocate. Every data science team should come out with the workflow and stack that best suits their needs. Besides, they should be able to quickly adapt to the business and technical changes of their organization.

To conclude, I summarised the main take home knowledge of my experience in Barclays so far. I hope it will serve as an useful guideline or inspiration source for all of those data science teams focusing on building production systems. Many of those best practices still apply to research-oriented teams that focus more on the prototyping of solutions. Our team is a mix of engineering and modelling background, thus defining a little bit of structure and common workflows helped us being collaborative and productive.

The goal was not advocating a single methodology but showing possible other approaches that could fit well within your organization. We expect those practices to conflict amongst different teams. For example in the Xavier’s articles (see links below), he suggests to do all of the experiments using the notebook and use the same tools in production while in our experience we found this to be chaotic and non scalable for our use cases. There is no God law, try different approaches and stick with the most successful ones for your use cases.

***

A related blog post of “How to do Data Science that is both Exploratory and Production Quality” can be found here: https://www.linkedin.com/pulse/how-do-data-science-both-exploratory-production-quality-harry-powell.

Similar articles:

Seven Steps to Success Machine Learning in Practice https://daoudclarke.github.io/guide.pdf

http://technocalifornia.blogspot.co.uk/2014/12/ten-lessons-learned-from-building-real.html

And more recent additional 10 lessons:

https://medium.com/@xamat/10-more-lessons-learned-from-building-real-life-ml-systems-part-i-b309cafc7b5e#.58g9wrnt4

 

Thoughts about data operations

This is the part 4 of 4 of the “Lessons learnt from building Data Science systems at Barclays” series.

Infrastructure

Get the IT team very close to the data scientists. Ideally one member of the team should be a DevOps, or the new title DataOps. Read this article from InfoWorld: DevOps can take data science to the next level.
Finding the right balance between IT workarounds and clean solutions is difficult especially when involves long tedious processes. It is good practice to “sign” contracts with the IT team of what you are about to deliver and what requirements you need in order to do so.
As a general advice you want to operate in your familiar environment where you have available all of the tools you like and proper cluster resources. Unfortunately data is always fragmented into multiple systems. Try to get the data periodically ingested into your Data Lake (typically a Hadoop cluster). When this is not possible make sure you have the permissions to sqoop it yourself. Data virtualization technologies also come particularly handy to create view of a dataset into your Big Data environment.
Don’t implement solutions that are tight to the underlying infrastructure. Spark DataFrame API for example does an excellent job on abstracting away the I/O operations. See this blog post of how to logically map tables from reltational database into a Spark cluster: https://dzone.com/articles/Accelerate-In-Memory-Processing-with-Spark-from-Hours-to-Seconds-With-Tachyon.
Requesting admin rights on a dev cluster will massively affect productivity and will let the team mastering their unix skills. Trustiness and transparency are essential. Security should be enforced during the interviews process by hiring competent and smart people and personnel trainings instead of killing productivity with non-sense restrictions.

Release process

A data products at the end of the day is a software that takes data as input and produce data as output that contains insights consumable either via a visual dashboard or by integrating them inside an existing IT system.

A few options we recommend for releasing are:

  • Continuous delivery. Ideally one pull request per project per day.
  • Continuous integration. That would be ideal, a Jenkins box that runs your tests and automated scripts every single time a new ticket is merged into develop and especially every single time a new release is done in master. If the box can access to a data cluster then can even run the end-to-end evaluation and store the results for you.
  • Every end of sprint should be matched with a new release consisting of:
    • taking the develop branch and merge it back into master (either manually or through an automated script such as the gitflow command line)
    • publishing your package containing source code and scripts to a common repository like Nexus.
    • Reporting latest results in Confluence (see documentation section).
    • Releasing all of the merged tickets from Jira so that they don’t show up in the board but are still accessible for reference.
    • Demo-ing inside the team and/or to your stakeholders if changes are relevant .
    • Celebrating in a pub.

It would make sense to plan the release on the last day of the sprint afternoon (typically Friday) but sometime might be advantageous to release on Thursday so that you can have Friday for hot-fixes if something goes down.

Deployment

Operations1

Very hard to give guidelines here since that each project have its own deployment process that depends on many factors such as the business context and practical issues associated with it.

If your application is deployed end-to-end from external teams of which you don’t have control of the workflow and data sources they are using, you will find extremely helpful to have some Data Sanity checks performed at every single run. Those checks make sure that the people running your application don’t accidentally input data which is not conformed with the schema and/or model assumptions. Throwing an exception with some context information is fundamental to make your system production-ready.
A typical example is validating the values of categorical fields. We packed in our jars the reference files containing all of the possible values and their descriptions. If the specified dataset contains values that don’t find any match, the data sanity check will throw an exception.
These steps of handling incorrect data may be handled during the ETL process and is generally not needed if the training is done by the data science team itself. In this latter case the the deployment only regards the trained model.

Deployment is the stage with the highest number of blockers and technical issues. The final measure of success is by the way only determined upon deployment in production, thus deployment issues should be top priority.

The ScrumBan Jira board

This is the part 1 of 4 of the “Lessons learnt from building Data Science systems at Barclays” series.

Agile board

Let’s start with one of the core tool of the agile workflow. We use a Jira board for tracking and organizing all of our projects. We developed a custom board which uses the sprints concept of Scrum but in a more flexible way as in Kanban.

jira-board

 

The Scrumban board is configured as following:

  • Horizontally divided in swimlanes (top-down in order of priority):
    • Critical / Blockers
    • Current work
    • Stories backlog
    • Sub-tickets backlog
    • Completed
  • The columns are:
    • To do
    • In progress
    • In review
    • Done / resolved
    • You can optionally have “Ready to release”
  • Quick filters should at least have one filter for each member of the team filtering on its own assigned tickets.

The idea is that during the planning you select from the backlog which high level stories you want to deliver by the end of the sprint (typically 2 weeks long) and then you create subtasks as-you-need.
Reason is that in data science you don’t know what you are about to implement beforehand. Thus you need to investigate-implement-test all the time and as you do it, you discover what to do next. Important is that whatever subtasks is created it is done by the end of the sprint so that the story is completed.

Define stories with a clear goal and a small scope. They should not span over multiple sprints and since that they come with the uncertainty of what tasks will be required, you really need to break a big problem into smaller well-defined problems that are accomplishable no-matter-what.

Avoid having tasks for exploratory analysis or for adding unit tests. Each task should bring some value, potentially a new feature. Each task will then require an exploratory analysis as well as some development and testing. Those steps are already part of the definition of “Done”. See below sections for more explanations about tests and exploratory analysis.

Plan always less than your capabilities. Delivering your stories a few days earlier is a very good sign. Delaying them is bad. If you manage to get your work done by Thursday, spend the whole day of Friday in a pub celebrating your amazing delivery.

In Jira, you must assign each story to one individual but remember that in an agile team either the whole team succeeds or fails. If that person does not manage to finish his tasks on time, it is a team failure. That’s what you have the morning standup for, to make sure everything is under control and team resources are allocated in a way that the sprint is going to be successful.

Never change the scope of your sprints or add tasks that were not planned, unless are required hotfixes. If you are asked to do something else then invite the product owners to join your next sprint planning and only then you can allocate resources for them.
Remember the goal of a sprint is to have a working, even if simplistic, deliverable not solving sparse tasks.

At the end of the sprint have a retrospective meeting to discuss what went well and what not. Make sure to take actions in order to avoid that blockers may appear again in future.

Documentation

Documentation should be as simple as possible.

  • Releases notes, a page where you can note the major changes since previous version, the list of new tickets that have been merged  (linking to Jira) and a link to a more detailed report.
  • The detailed report contains snapshots of the most recent logs, results, observations, limitations, assumptions and performances of the model/etl/application. Often it contains some charts that can quickly explaining how good the product is. We can use those detailed but concise reports to track how the product is evolving. The release detailed report also contains the help messages of how to run the application and all of the command line interface (CLI) options.
    If all of your tests and procedures are fully automated then this page is simply a copy and paste of the results.
  • The usage of a particular job class or a script with the list of CLI arguments and default values is also accessible using –help argument, many libraries helps you doing that (bash getops, scala Scallop…).
  • Other pages are used to explain the complex part of the logic. Try to reduce those pages only when the logic is very complicated and hard to understand by just reading the code.

Documentation is hard to keep in sync that’s way we want to document what’s new since the last release rather than going through the whole wiki and updating every single page.

Ideally the documentation comes from the source code, unit tests and jira tickets. Individual analysis, findings and insights can be documented separately but they should represent static reports rather than project documentation.

In the hierarchical structure of the pages, we limit the maximum depth to 2. Which means we have the root-level pages with at most one level of children pages. Nested structures make it very hard to find contents when you need them.

Branching and versioning

Code should always and only exist in a git repository. Sparse snippets or random script files should be avoided.

We follow the gitflow branching model where each ticket is mapped as features branch. If you integrate Jira with Stash then from the ticket web page you can automatically create the corresponding branch in the repository using develop as branch base.

You do not need to use the complete gitflow branching model but at least the master, develop and features branches. It’s up to the deployment strategy defining how to handle hotfixes, bugfixes and releases branches. Make sure this strategy is clearly defined and is consistently enforced. See deployment.

Story tickets generally don’t have a branch associated, their sub-tasks have.

Install a git hook that every commit will include as prefix the ticket code (that you can parse out from the branch name). Tracking each commit with the corresponding ticket is a life-saver when in future you will try to reverse engineer what a method is doing and why has been created in first place. Then you can access the whole git history and access the corresponding tickets that touched that piece of code.

Discussions

Discussions of specific tasks should go into the corresponding jira ticket web page. This will make the conversation public, tracked and anyone can jump into the discussion with the full context available. Also reference files or supporting documents should be attached to the jira ticket itself or in the wiki if they serve as a general purpose. Remember each jira ticket can be linked from the releases wiki page, that means we never lose track of them. Moreover the query engine is quite good.

We found emails to be the worst place for discussions to happen, especially for sharing files that will become soon out-of-date.

When someone sends you an Excel file, reply saying that your laptop does not have an Office installation on it. If you are sharing small data files, tsv or json is way to go.
Avoid comma separated files with quotes wrapping text fields. You want to make your file editable using simple bash commands rather than loading into a csv parsing library.

We tried also mounted shared drives, but confluence is a much better collaborative way to share and organize files with an integrated version control and metadata.

Avoid meetings as much as you can, invent some excuse, ask for a clear agenda beforehand. Educate your colleagues to communicate with you by raising issues. Leave meetings only for important discussions and spend your meeting time for presenting and checkpointing with your stakeholders more frequently.

 

 

Functional Data Validation using monads and applicative functors

http://envirostructure.ignite.lexblog.com/wp-content/uploads/sites/386/2014/10/Oil-Pipeline-at-Sunset.jpg
http://envirostructure.ignite.lexblog.com/wp-content/uploads/sites/386/2014/10/Oil-Pipeline-at-Sunset.jpg

ETL is probably the most time consuming part of every Data Science project. The quality of extracted and crunched data is one of the major factor affecting the final results. In facts, real world data is always messy and inconsistent. Data Validation is a must for enforcing the correctness of the proposed solution and to make sure the underlying data represent the true business scenario.

When performing a data validation, the following issues often arise:

  • We want to track of how much information we lose for debugging and reporting scopes.
  • Sometime we want to cleanse invalid data instead of filtering out.
  • Part of the validation logic depends on the project requirements and/or model assumptions. They change often and the re-factoring may introduce bugs.

In this tutorial we are showing how to use monads, applicative functors and other functional programming concepts to safely and elegantly define the validation logic using a modular pattern. Each rule is defined individually and the final logic is built by using two types of composition:

  • Monad-composition. One rule after the other, if one fails the next rule is not applied.
  • Applicative-composition. All of the rules are applied independently and the validation results are collected and merged together.

Moreover, the data that do not pass the validation tests is not discarded but moved into a separate pipeline with all of the needed meta-data information attached to it explaining why this particular record was discarded. This allows us to:

  • Log all of the specific causes of data loss.
  • Easily recover previously invalidated data if the validation rules change.
  • Re-use part of the discarded data further down in the data pipeline. The whole ETL workflow is aware of what has been discarded before.

Spark, Scala, Scalaz and Sparkz

The tutorial is part of the open source project Sparkz which aims to extend the Apache Spark framework providing more functional APIs. The implementation is in Scala and leverage Scalaz, which is the framework from where Sparkz was inspired from.

Scalaz provides a Validation data structure that is similar to Either (where an object can either be Left/Failure or Right/Success) but is not a monad but an applicative functor because instead of chaining the result from first event to the next, Validation validates all events. Since that in case of a failure there must be at least one error message, we enforce the failure type to be a non empty list of error messages. For this purpose scalaz already provides a data structure ValidationNel to accumulate all of the error messages into a Nel (non empty list).

See this page for documentation: http://eed3si9n.com/learning-scalaz/Validation.html

The concepts and methodology can be applied to any data computation framework and programming language. You will just have to re-implement yourself part of the boiling-plate code you will see in this tutorial that does all of the magic for you.

The user/events data validation use case

The use case we are using for this example is a simple data type consisting of a triple of userId, eventCode and timestamp:


case class UserEvent(userId: Long, eventCode: Int, timestamp: Long)

Each UserEvent can either be marked as correct or as invalid. For the latter case we will wrap it into another case class InvalidEvent containing the invalid event as well as some meta information regarding the error cause:

sealed trait InvalidEventCause

case class InvalidEvent(event: UserEvent, cause: InvalidEventCause)

The goal is to build a function that takes an UserEvent and returns a ValidationNel of either the correct event or the non empty list of all of the causes:


UserEvent => ValidationNel[InvalidEvent, UserEvent]

The reason why we want to return ValidationNel[InvalidEvent, UserEvent] instead of ValidationNel[InvalidEventCause, UserEvent] is because we want to keep the original datum in case of further recovery instead of only storing the error causes. This implies that the same object is duplicated multiple times which is not efficient but we are not addressing optimisation issues in this tutorial, we will leave it for future posts.

Validation Rules

The easiest way to define each rule was via partial functions that map an UserEvent into an InvalidEventCause. A Partial function is a function that is only defined for a subdomain of the input arguments. In our case is a function which tries to invalidate a datum and is not defined for correct records. The full validation logic will be expressed as a List of partial functions such as:


val validationRules: List[PartialFunction[UserEvent, InvalidEventCause]]

In order to reduce the boilerplate code the PartialFunction returns an InvalidEventCause and then our implicit logic will wrap it together with the original UserEvent object into an InvalidEvent container.

Some rules are simply pre-defined, such as checking that a timestamp is in a min-max range or that the userId is in a white list and so on. Others are more complicated and are derived from the underlying raw data (before validation).
The method generating the final validation function takes as argument the RDD with the raw data plus a bunch of parameters and objects used for defining the single rules:

def validationFunction(events: RDD[UserEvent],
 eligibleUsers: Set[Long],
 validEventCodes: Set[Int],
 blackListEventCodes: Set[Int],
 minDate: String, maxDate: String): UserEvent => ValidationNel[InvalidEvent, UserEvent] 

In order to compile the snippets of code you will have to add some dependencies in the imports:

 
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.joda.time.{DateTime, Interval, LocalDate}
import sparkz.utils.Pimps._
import scalaz.Scalaz._
import scalaz.ValidationNel

First thing we grab the SparkContext from the events RDD:


val sc = events.context

Valid event code

We want to filter out all of the events whose code does not belong to the validEventCodes set.

 

case object NonRecognizedEventType extends InvalidEventCause

val validEventCodesBV: Broadcast[Set[Int]] = sc.broadcast(validEventCodes)
val notRecognizedEventCode: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if !validEventCodesBV.value.contains(event.eventCode) => NonRecognizedEventType
}

We could have enclosed the set directly in the partial function but we rather prefered to broadcast it and retrieve it using the value API.

Why using Broadcast variables in Spark explained here: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Eligible users

Just like event codes we want to make sure that we only select data from a pool of predefined eligible users:


case object NonEligibleUser extends InvalidEventCause

val eligibleUsersBV: Broadcast[Set[Long]] = sc.broadcast(eligibleUsers)
val customerNotEligible: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if !eligibleUsersBV.value.contains(event.userId) => NonEligibleUser
}

N.B. that if the eligibleUsers set is large you cannot broadcast it as a shared variable but you want to rather turn into a paired RDD and use the userId as a key for the join. If you want to preserve the information of why you discarded a particular user you will have to perform an outer join instead of the inner join.

Blacklist users

This logic is slightly more complicated. We want to filter out all of the events of those users for which we observed at least one event in the black list. We will have to first scan the raw dataset in order to create the blacklist user ids.


case object BlackListUser extends InvalidEventCause

val blackListEventCodesBV: Broadcast[Set[Int]] = sc.broadcast(blackListEventCodes)
// Users for which we observed a black list event
val blackListUsersBV: Broadcast[Set[Long]] = sc.broadcast(
  events.filter(event => blackListEventCodesBV.value.contains(event.eventCode))
  .map(_.userId).distinct().collect().toSet
)
val customerIsInBlackList: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if blackListUsersBV.value.contains(event.userId) => BlackListUser
}

We first run the distinct() to reduce the size of the RDD before to collect() and turn into a set.

Timestamp out of global interval

We specified the minDate and maxDate as strings in ISO format and we want to filter out all of the timestamps outside this range. For this logic we don’t need any pre-computation we can directly implement it as:


case object OutOfGlobalIntervalEvent extends InvalidEventCause

val eventIsOutOfGlobalInterval: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if !new Interval(DateTime.parse(minDate), DateTime.parse(maxDate)).contains(event.timestamp) =>
    OutOfGlobalIntervalEvent
}

 

We use joda-time to parse strings and timestamp epoch numbers into more manageable classes.

First day to consider of the user

This time we want to have a stricter rule regarding the event timestamps. We want to avoid border effects by removing all of the events on the same date from where we started observing the first event of a particular user. That could be because the first date may be incomplete and do not contain all of the events and can invalidate our data assumptions. Since that we also introduced the concept of global min/max interval, the first date to consider for a particular user is the max between the first date we observed from the data and the start of the global interval.


case object FirstDayToConsiderEvent extends InvalidEventCause

// max between first date we have ever seen a customer event and the global min date
val customersFirstDayToConsiderBV: Broadcast[Map[Long, LocalDate]] =
  sc.broadcast(
    events.keyBy(_.userId)
    .mapValues(personalEvent => new DateTime(personalEvent.timestamp).toLocalDate)
    .reduceByKey((date1, date2) => List(date1, date2).minBy(_.toDateTimeAtStartOfDay.getMillis))
    .mapValues(firstDate => List(firstDate, LocalDate.parse(minDate)).maxBy(_.toDateTimeAtStartOfDay.getMillis))
    .collect().toMap
  )
val eventIsFirstDayToConsider: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if customersFirstDayToConsiderBV.value(event.userId).isEqual(event.timestamp.toLocalDate) =>
    FirstDayToConsiderEvent
}

We reduced the events rdd into the minimum date for each userId using the reduceByKey monoidal aggregation. Then we applied the max function between the minimum reduced date and the global one.

Validation rules composition

All we have to do is take the individually defined partial functions (that in Scala are objects like everything else) and put them into a List.

We realised that eventIsFirstDayToConsider is dependending on eventIsOutOfGlobalInterval. If an event is filtered because outside the global interval there is no need of putting it through the first-day-to-consider rule. Thus we can create a monad of the two rules by using the orElse method on the first partial function which takes as argument the second partial function which is applied if and only if the first one is not defined (in our case is not already outside the global min-max range). The orElse method returns a new partial function which will be treated as a single validation rule and internally combines the two of them.

val validationRules: List[PartialFunction[UserEvent, InvalidEventCause]] =
  List(customerNotEligible, notRecognizedEventCode, customerIsInBlackList,
    eventIsOutOfGlobalInterval.orElse(eventIsFirstDayToConsider)
  )

The order of the validation rules does not matter since all of them will be applied independently even if computationally they will be applied sequentially. If you want to computationally apply them in parallel then you can use the par method of a list that turns it into a parallel collection.

The final validation function will be generated by a couple of syntactic sugar implicits that we implemented in Sparkz.


(event: UserEvent) => validationRules.map(_.toFailureNel(event, InvalidEvent(event, _))).reduce(_ |+++| _)

The magic operators

In the list of imports we specified:

import sparkz.utils.Pimps._

Pimps are a nice pattern used in Scala to implicitly add methods to classes (the action of pimping). It is particularly useful when you want to use the postfix notation to apply a method to a class that do not expose that method.

In order to implement our validation pattern we had to pimp two classes: the PartialFunction and the ValidationNel. The pimped methods hides the boilerplate logic computed behind the scenes.

implicit class PimpedPartialFunction[X, E](pf: PartialFunction[X, E]) {
  def toFailureNel[W](x: X, toW: E => W = identity _): ValidationNel[W, X] =
    pf.andThen(e => toW(e).failureNel[X]).applyOrElse(x, (_: X).successNel[W])

  def toFailureNel(x: X): ValidationNel[E, X] = toFailureNel(x, identity)
}

The toFailureNel method attempt to apply the partial function (X => E) to the element x and in case the function is defined returns a failureNel (a non empty list of a single failure of type E) where the failure object is the one returned by the original function. In case the function is not defined the pimped method creates a successNel of type X.

The more general method takes also an extra argument toW that converts the error object e returned by the original function (if defined) and wraps it into another type W. It acts as a functor for the failure case. This method allows us to encapsulate the information of the original event that generated the error together with its cause.

In our use case the generics types of X, E and W are:

X: UserEvent
E: InvalidEventCause
W: InvalidEvent

Once we have converted the partial functions into ValidationNel instances, we need to reduce them into a single one. Scalaz provides a monoid binary operator +++ that takes two ValidationNel instances and merge them together. In case of failures, appends all of the failures into a single list of type E. In case of success results, applies an external monoid operator on type X. In other words that operator knows how to reduce failures by concatenating the errors but it requires to specify how to combine the correct results when all of them are Success.

Since that in our use case we are not transforming the correct data but either we discard it or we keep it as it is, the monoid operator is straightforward. We assume that the success results contains always the same original object, aka they never transform it. Thus the monoid operator simply returns the object itself where the two objects to reduce just represent a copy of each other.

Our PimpedValidationNel provides a simplified operator that does not require the semigroup monoid defined for the type X:


implicit class PimpedValidationNel[E, X](x1: ValidationNel[E, X]) {
  // Extension of scalaz.Validation.+++ operator, does not require the semigroup defined for X
  def |+++|(x2: ValidationNel[E, X]) = x1 match {
    case Failure(a1) => x2 match {
      case Failure(a2) => Failure(a1 append a2)
      case Success(b2) => x1
    }
    case Success(b1) => x2 match {
      case b2@Failure(_) => b2
      case Success(b2) if b1 == b2 => Success(b1)
      case Success(b2) => throw new IllegalArgumentException(s"$b1 not equals to $b2")
    }
  }
}

This operator also allows us to merge together results coming from different validation pipelines. If we have two RDDs of the same ValidationNel generic types we could theoretically join them and merge the results of the same objects. This operation is very expensive and is much more prefered to join the lazy functions that generates the validation objects and apply the final combined function to each record in a single pass over the dataset.

What you can do with the ValidationNel API

Simplest thing would be getting only the valid records:

def onlyValidEvents(events: RDD[UserEvent],
                    validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): RDD[UserEvent] =
  events.map(validationFunc).flatMap(_.toOption)

Or the opposite thing, getting only the invalid events and flat-mapping them into their corresponding error wrapping class:

def invalidEvents(events: RDD[UserEvent],
 validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): RDD[InvalidEvent] =
 events.map(validationFunc).flatMap(_.swap.toOption).flatMap(_.toList)

Suppose we want to extract all of the original events that failed because of a particular cause, for instance when their timestamp was out of range:

def outOfRangeEvents(events: RDD[UserEvent],
                     validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): RDD[UserEvent] =
  events.map(validationFunc).flatMap(_.swap.toOption).flatMap(_.toSet).flatMap {
    case InvalidEvent(event, OutOfGlobalIntervalEvent) => event.some
    case _ => Nil
  }

N.B. We are exploiting the implicit conversion from an Option to a Iterable to apply a combination of filter and map into a single flatMap operation.

Now, suppose we would like to print a debug message with the count of invalid events by the set of their error causes.

def causeSetToInvalidEventsCount(events: RDD[UserEvent],
                                 validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): Map[Set[InvalidEventCause], Int] =
  events.map(validationFunc)
  .map(_.swap).flatMap(_.toOption).map(_.map(_.cause).toSet -> 1)
  .reduceByKey(_ + _)
  .collect().toMap

The above method will return a map that looks like:

Map(Set(NonEligibleCustomer, NonRecognizedEventType) -> 36018450,
Set(NonEligibleUser) -> 9037691,
Set(NonEligibleUser, BlackListUser, NonRecognizedEventType) -> 137816,
Set(NonEligibleUser) -> 464694973,
Set(BeforeFirstDayToConsiderEvent, NonRecognizedEventType) -> 5147475,
Set(OutOfGlobalIntervalEvent, NonRecognizedEventType) -> 983478).

Please pay attention that we are not counting by the individual cause but we are grouping by the combination of causes that co-occured together. This gives us much more debugging power with no loss of information as opposed to the traditional monad sequential validation where only the first cause would be recorded.

Moreover, what we might really be interested on is the count of how many users we lost as effect of the events validation. In other words how many users we lost because they had no any event left after validation.

def causeSetToUsersLostCount(events: RDD[UserEvent],
                             validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): Map[Set[InvalidEventCause], Int] = {
  val survivedUsersBV: Broadcast[Set[Long]] =
    events.context.broadcast(events.map(validationFunc).flatMap(_.toOption).map(_.userId).distinct().collect().toSet)

  events.map(validationFunc).flatMap(_.swap.toOption)
  .keyBy(_.head.event.userId)
  .filter(_._1 |> (!survivedUsersBV.value(_)))
  .mapValues(_.map(_.cause).toSet)
  .mapValues(Set(_))
  .reduceByKey(_ ++ _)
  .flatMap(_._2)
  .map(_ -> 1)
  .reduceByKey(_ + _)
  .collect().toMap
}

What the above code does is the following:

  1. Compute the set of “survived” users from the correct events after validation.
  2. Filter only the invalid events and of the users who did not survive.
  3. Each list of failures is turned into a Set of failure (so that the order does not matter).
  4. All of the causes set are grouped by userId and deduplicated in such a way that the single combination of causes set only appears once for each userId.
  5. Then count for how many non-survived users each causes set appears.
  6. Returns a map from causes set to an integer representing the count of lost users.

It should return something like:

Map(Set(NonEligibleCustomer, NonRecognizedEventType) -> 1545,
Set(NonEligibleUser) -> 122,
Set(NonEligibleUser, BlackListUser, NonRecognizedEventType) -> 3224,
Set(NonEligibleUser) -> 4,
Set(BeforeFirstDayToConsiderEvent, NonRecognizedEventType) -> 335,
Set(OutOfGlobalIntervalEvent, NonRecognizedEventType) -> 33)

Conclusions

In this tutorial we showed how we can extend Scalaz to provide a functional and elegant way of applying validation rules to clean a raw dataset. We showed how the whole logic is parallelizable and scalable using the Apache Spark framework. The main advantages of this approach is that we never lose information but we are able to split the data into 2 pipelines (valid/invalid) and mark each invalid record with some metadata. We presented a simple tutorial for a common use case showing how to define validation rules and how to use the API of the generalized ValidationNel objects in order to perform debugging and cleansing tasks.

The source code is available at: https://github.com/gm-spacagna/sparkz/blob/master/src/examples/scala/sparkz/DataValidation.scala.

The whole procedure is not fully optimized for efficiency, the immutable objects creation may create an unnecessary overhead for the garbage collector and the way we wrap the original datum in case of failure creates a lot of duplicated clones of the same object. We leave this task of optimizing to future blog posts.

We hope that this tutorial will inspire Data Scientists and Engineers, regardless of the language and/or technology stack, to approach their coding in a more functional way. Functional programming offers the elegance and conciseness of implementing arbitrary complicated logic as a simple combination of reusable high-order functions as opposed to the classic imperative programming paradigm. We found this way of writing code to be much more suitable for implementing math and data transformation algorithms.

***

Similar articles about Data Validation using Scalaz:

https://github.com/FranklinChen/data-validation-demo
https://www.innoq.com/en/blog/validate-your-domain-in-scala/

Mapping DataFrame to a typed RDD

I have recently published a blog post on DZone “Making the Impossible Possible with Tachyon: Accelerate Spark Jobs from Hours to Seconds” which describes the workflow and methodology that we use at Barclays to load data from the raw source (relational database) into the Data Science cluster (Spark). One of the described components is the mapping between DataFrame to a typed RDD of a custom case class.

There are a bunch of reasons why you would like to make your DataFrame typed, the following is a summary:

dataframe-vs-rdd

Examples of when is more convenient to use DataFrame Vs. RDD can be found in this workshop: WordPress Blog Posts Recommender

In this tutorial I have pulled out from the Tachyon blog post the part related to the conversion from DataFrame to RDD. The inverted conversion RDD to DataFrame is straightforward and can be found in the same recommender workshop above mentioned.

Typed Case Class Mapping

After we have constructed the DataFrame collection from the raw source we can now map it into an RDD of our ad-hoc case classes. Since a DataFrame is also an RDD of type org.apache.spark.sql.Row, it already provides the map/flatMap methods.

If there are no null values in any row, we could use pattern matching to extract each column from the Row object:

case class MyClass(a: Long, b: String, c: Int, d: String, e: String)
dataframe.map {
  case Row(a: java.math.BigDecimal, b: String, c: Int, _: String, _: java.sql.Date,
           e: java.sql.Date, _: java.sql.Timestamp, _: java.sql.Timestamp, _: java.math.BigDecimal,
           _: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}

This approach will fail for null values due to the casting of the explicit types of each single field in the unapply method of the class Row. You can discard all the rows containing null values by doing:

dataframe.na.drop()

But that will drop records even if the null fields are not the ones we use in our case class.

If you want to handle it using Scala options you could turn the Row object into a List and then use the following pattern:

case class MyClass(a: Long, b: String, c: Option[Int], d: String, e: String)
dataframe.map(_.toSeq.toList match {
  case List(a: java.math.BigDecimal, b: String, c: Int, _: String, _: java.sql.Date,
            e: java.sql.Date, _: java.sql.Timestamp, _: java.sql.Timestamp, _: java.math.BigDecimal,
            _: String) => MyClass(a = a.longValue(), b = b, c = Option(c), d = d.toString, e = e.toString)
}

If the columns you are interested are sparse, then you could fetch them individually either by index or by column name:

row.getAs[SQLPrimitveType](columnIndex: Int)
row.getAs[SQLPrimitveType](columnName: String)

For the list of mapping of SQL primitive types and their corresponding Java/Scala classes, see: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html.

N.B. The described procedure does not take advantage of the recently released DataSet API (http://spark.apache.org/docs/1.6.0/sql-programming-guide.html#datasets) which should automate the whole process of converting between DataFrames and RDDs. At the time we wrote this note we had not yet tested DataSet. Also there are open-source projects like Frameless (https://github.com/adelbertc/frameless) and an ongoing discussion on its gitter channel of how to leverage the awesome Shapeless (https://github.com/milessabin/shapeless) library to make Spark more functional and compile-time type-safe.

Similar articles:

Type safety on Spark Dataframes: http://www.51zero.com/blog/2016/2/24/type-safety-on-spark-dataframes-part-1

Logical Data Warehouse for Data Science: map raw data directly from source to Spark in-memory with Tachyon

Common problems for large organizations dealing with Big Data and Data Science applications are:

  1. Data stored in non scalable infrastructure for analysis and processing
  2. Data governance and security policies

1. Data often resides into central data warehouse and RDBMS of which many legacy applications and analysts depends on.
Data Scientists insteads cannot build their models or perform exploratory analysis by using SQL queries. They need the data to be available into a scalable, programmatic and reactive stack such as Hadoop and Apache Spark and develop their logic using languages such as Python, R, Scala… (for comparison of how Python and Scala compare for Spark, see this post: 6 points to compare Python and Scala for Data Science using Apache Spark).
2. Nevertheless, data cannot just be transferred (in technical terms sqoop-ed) to an Hadoop cluster without incurring into tedious bureaucracy,  ingestion inconsistencies and strict policies. In big corporations that translates to at least a month to decide what tables are interesting and a few more months to write the ETL logic, move the data and test the consistency.

At Barclays we developed a stack to logically map the raw data from the central data warehouse into Spark and use Tachyon for in-memory saving the data for long-term availability. In such stack, we are able to iterate fast with immediate data availability from a scalable Big Data cluster by skipping the data ingestion process and still complying with all of the data policies.

Tachyon was the key enabling technology for us.

Our workflow iteration time decreased from hours to seconds. Tachyon enabled something that was impossible before.

You can find the original article published on DZone in collaboration with Gene Pang, Software Engineer at Tachyon Nexus and Haoyuan Li, CEO of Tachyon Nexus:
Making the Impossible Possible with Tachyon: Accelerate Spark Jobs from Hours to Seconds