In-Memory Logical Data Warehouse for accelerating Machine Learning Pipelines on top of Spark and Alluxio

Abstract:

Legacy enterprise architectures still rely on relational data warehouse and require moving and syncing with the so-called “Data Lake” where raw data is stored and periodically ingested into a distributed file system such as HDFS.

Moreover, there are a number of use cases where you might want to avoid storing data on the development cluster disks, such as for regulations or reducing latency, in which case Alluxio (previously known as Tachyon) can make this data available in-memory and shared among multiple applications.

We propose an Agile workflow by combining Spark, Scala, DataFrame (and the recent DataSet API), JDBC, Parquet, Kryo and Alluxio to create a scalable, in-memory, reactive stack to explore data directly from source and develop high quality machine learning pipelines that can then be deployed straight into production.

In this talk we will:

* Present how to load raw data from an RDBMS and use Spark to make it available as a DataSet

* Explain the iterative exploratory process and advantages of adopting functional programming

* Make a crucial analysis on the issues faced with the existing methodology

* Show how to deploy Alluxio and how it greatly improved the existing workflow by providing the desired in-memory solution and by decreasing the loading time from hours to seconds

* Discuss some future improvements to the overall architecture

Original meetup event: http://www.meetup.com/Alluxio/events/233453125/

Posted in Agile, Big Data, Machine Learning, Open Source, Scala, Spark | Tagged , , , , , , , | Leave a comment

The Barclays Data Science Hackathon: Building Retail Recommender Systems based on Customer Shopping Behaviour

From Data Science Milan meetup event:

In the depths of the last cold, wet British winter, the Advanced Data Analytics team from Barclays escaped to a villa on Lanzarote, Canary Islands, for a one week hackathon where they collaboratively developed a recommendation system on top of Apache Spark. The contest consisted on using Bristol customer shopping behaviour data to make personalised recommendations in a sort of Kaggle-like competition where each team’s goal was to build an MVP and then repeatedly iterate on it using common interfaces defined by a specifically built framework.
The talk will cover:

• How to rapidly prototype in Spark (via the native Scala API) on your laptop and magically scale to a production cluster without huge re-engineering effort.

• The benefits of doing type-safe ETLs representing data in hybrid, and possibly nested, structures like case classes.

• Enhanced collaboration and fair performance comparison by sharing ad-hoc APIs plugged into a common evaluation framework.

• The co-existence of machine learning models available in MLlib and domain-specific bespoke algorithms implemented from scratch.

• A showcase of different families of recommender models (business-to-business similarity, customer-to-customer similarity, matrix factorisation, random forest and ensembling techniques).

• How Scala (and functional programming) helped our cause.

Posted in Agile, Machine Learning, recommender systems, Scala, Spark | Tagged , , , , , , , , , , | Leave a comment

Surfing and Coding in Lanzarote, the Barclays Data Science hackathon

This post has been published on the Cloudera blog and summurises the results and takeaways of a week-long hackathon happened in Lanzarote in December 2015. The goal was to prototype a recommender systems for retail customers of shops in Bristol in Bristol, UK. The article shows how the stack composed by Scala and Spark was great for quickly writing some prototyping code to run locally in a single laptop and at the same time scalable for larger dataset to process in the cluster.

man with laptop on colorful beach of island

Please continue reading at http://blog.cloudera.com/blog/2016/05/the-barclays-data-science-hackathon-using-apache-spark-and-scala-for-rapid-prototyping/.

Posted in Agile, Machine Learning, Scala, Spark | Tagged , , , , , , | Leave a comment

Robust and declarative machine learning pipelines for predictive buying

Proof of concept of how to use Scala, Spark and the recent library Sparkz for building production quality machine learning pipelines for predicting buyers of financial products.

The pipelines are implemented through custom declarative APIs that gives us greater control, transparency and testability of the whole process.

The example followed the validation and evaluation principles as defined in The Data Science Manifesto available in beta at http://www.datasciencemanifesto.org

Posted in Big Data, Classification, Machine Learning, Scala, Spark | Tagged , , , , , , | Leave a comment

Lessons learnt from building data-driven production systems at Barclays

bart-models.gif

In the last years at Barclays we learnt and tried a lot of stuff that made the Advanced Analytics team very successful inside a large organization where, as such, being a productive data scientist is a tough challenge.

The data science team works on a mix of descriptive, predictive and prescriptive projects that make use of machine learning and big data technologies, mainly on top of Apache Spark. Even though we deliver per-request insights coming from manual analysis, we primarily build automated and scalable systems to be periodically used either internally for a better decision-making or customer-facing in the form of analytics services (e.g. via the web portal).
In this post series I want to share some of the best practices, tools, methodologies and workflows that we experimented and the lessons learnt from them. I will skip a few aspects of machine learning systems, since that I found those to be already well covered in other talks and articles, you can find the reference links at the end of this post.
Moreover not all of the data-driven projects require a machine learning component, at least not at every stage. I would like to quote Peter Norvig from a recent article published at KDnuggets:

“Machine Learning development is like the raisins in a raisin bread: 1. You need the bread first 2. It’s just a few tiny raisins but without it you would just have plain bread.”

Please keep in mind that each scenario is different thus there are not strict rules to advocate. Every data science team should come out with the workflow and stack that best suits their needs. Besides, they should be able to quickly adapt to the business and technical changes of their organization.

To conclude, I summarised the main take home knowledge of my experience in Barclays so far. I hope it will serve as an useful guideline or inspiration source for all of those data science teams focusing on building production systems. Many of those best practices still apply to research-oriented teams that focus more on the prototyping of solutions. Our team is a mix of engineering and modelling background, thus defining a little bit of structure and common workflows helped us being collaborative and productive.

The goal was not advocating a single methodology but showing possible other approaches that could fit well within your organization. We expect those practices to conflict amongst different teams. For example in the Xavier’s articles (see links below), he suggests to do all of the experiments using the notebook and use the same tools in production while in our experience we found this to be chaotic and non scalable for our use cases. There is no God law, try different approaches and stick with the most successful ones for your use cases.

***

A related blog post of “How to do Data Science that is both Exploratory and Production Quality” can be found here: https://www.linkedin.com/pulse/how-do-data-science-both-exploratory-production-quality-harry-powell.

Similar articles:

Seven Steps to Success Machine Learning in Practice https://daoudclarke.github.io/guide.pdf

http://technocalifornia.blogspot.co.uk/2014/12/ten-lessons-learned-from-building-real.html

And more recent additional 10 lessons:

https://medium.com/@xamat/10-more-lessons-learned-from-building-real-life-ml-systems-part-i-b309cafc7b5e#.58g9wrnt4

 

Posted in Agile, Big Data, Machine Learning | Tagged , , , , | 4 Comments

Thoughts about data operations

This is the part 4 of 4 of the “Lessons learnt from building Data Science systems at Barclays” series.

Infrastructure

Get the IT team very close to the data scientists. Ideally one member of the team should be a DevOps, or the new title DataOps. Read this article from InfoWorld: DevOps can take data science to the next level.
Finding the right balance between IT workarounds and clean solutions is difficult especially when involves long tedious processes. It is good practice to “sign” contracts with the IT team of what you are about to deliver and what requirements you need in order to do so.
As a general advice you want to operate in your familiar environment where you have available all of the tools you like and proper cluster resources. Unfortunately data is always fragmented into multiple systems. Try to get the data periodically ingested into your Data Lake (typically a Hadoop cluster). When this is not possible make sure you have the permissions to sqoop it yourself. Data virtualization technologies also come particularly handy to create view of a dataset into your Big Data environment.
Don’t implement solutions that are tight to the underlying infrastructure. Spark DataFrame API for example does an excellent job on abstracting away the I/O operations. See this blog post of how to logically map tables from reltational database into a Spark cluster: https://dzone.com/articles/Accelerate-In-Memory-Processing-with-Spark-from-Hours-to-Seconds-With-Tachyon.
Requesting admin rights on a dev cluster will massively affect productivity and will let the team mastering their unix skills. Trustiness and transparency are essential. Security should be enforced during the interviews process by hiring competent and smart people and personnel trainings instead of killing productivity with non-sense restrictions.

Release process

A data products at the end of the day is a software that takes data as input and produce data as output that contains insights consumable either via a visual dashboard or by integrating them inside an existing IT system.

A few options we recommend for releasing are:

  • Continuous delivery. Ideally one pull request per project per day.
  • Continuous integration. That would be ideal, a Jenkins box that runs your tests and automated scripts every single time a new ticket is merged into develop and especially every single time a new release is done in master. If the box can access to a data cluster then can even run the end-to-end evaluation and store the results for you.
  • Every end of sprint should be matched with a new release consisting of:
    • taking the develop branch and merge it back into master (either manually or through an automated script such as the gitflow command line)
    • publishing your package containing source code and scripts to a common repository like Nexus.
    • Reporting latest results in Confluence (see documentation section).
    • Releasing all of the merged tickets from Jira so that they don’t show up in the board but are still accessible for reference.
    • Demo-ing inside the team and/or to your stakeholders if changes are relevant .
    • Celebrating in a pub.

It would make sense to plan the release on the last day of the sprint afternoon (typically Friday) but sometime might be advantageous to release on Thursday so that you can have Friday for hot-fixes if something goes down.

Deployment

Operations1

Very hard to give guidelines here since that each project have its own deployment process that depends on many factors such as the business context and practical issues associated with it.

If your application is deployed end-to-end from external teams of which you don’t have control of the workflow and data sources they are using, you will find extremely helpful to have some Data Sanity checks performed at every single run. Those checks make sure that the people running your application don’t accidentally input data which is not conformed with the schema and/or model assumptions. Throwing an exception with some context information is fundamental to make your system production-ready.
A typical example is validating the values of categorical fields. We packed in our jars the reference files containing all of the possible values and their descriptions. If the specified dataset contains values that don’t find any match, the data sanity check will throw an exception.
These steps of handling incorrect data may be handled during the ETL process and is generally not needed if the training is done by the data science team itself. In this latter case the the deployment only regards the trained model.

Deployment is the stage with the highest number of blockers and technical issues. The final measure of success is by the way only determined upon deployment in production, thus deployment issues should be top priority.

Posted in Big Data, Uncategorized | Tagged , | 1 Comment

The balance of exploratory analysis and development

This is the part 3 of 4 of the “Lessons learnt from building Data Science systems at Barclays” series.

Exploratory Data Analysis / Research

eda

Exploratory analysis should precede and follow any task from the modelling, design and development to the benchmarking. Major problem is how do you share, track and monitor your findings? How do you make your analysis repeatable and scrutinizable from the outside? This is still an open problem.

Notebooks tend to be the best tools for the job, careful though. EDA is an open research/investigation task, thus you need a criteria to draw the line of when to stop. My suggestion is avoiding scope-free analysis but always accompany EDA with a well define goal/task. In a small and clearly defined task you know what you want to achieve, you just don’t know how to and what obstacles you may find.

A proposed sequential but iterative workflow is:

  1. The planned story defines the high level goal you are working towards to.
  2. Then start your EDA and as soon as you find something interesting you can stop investigating and define a development sub-ticket.
  3. You now start developing the minimum amount of code that implements the specified requirements defined during the analysis step.
    Those requirements should not change after the first definition, you want to complete that and then refactor it later into another sub-ticket or in a different iteration.
  4. Before to send it for review and/or solving the sub-ticket you should perform another EDA step to verify that the newly created branch meets the intended requirements. You are not solving the greater problem but you only care about the just-defined sub-problem. It is very dangerous to mix development and analysis at the same time since that you may end up into an infinite loop where you keep changing your requirements as you analyse and never get to an end.
  5. After completion of the subtask, you can switch back to the main workflow thread.

Suggestions are to time box any open-ended task. Say, you are going to spend no more than X hours/days on this research and before then you will come out with some development requirements or insights reporting that move the project towards the final story goal. Remember you will have to solve the story by the end of the sprint. Scope the problems small enough so that you reduce the risks of not meeting the expectations.

Get to an end-to-end as quick as possible and postpone any complication, ideas or new features to the next iterations. EDA/Research is generally a good place for filling your backlog for future scoping.

I leave it as an open question what to do with those notebooks after the investigation is completed. They are a bit tricky to maintain. When you produce a change to the codebase or a new dataset comes in, the notebooks become obsolete. We don’t want to refactor them every time to make sure they still work. I personally see notebooks more as a one-off analysis that are archived after being used.

I tend to translate all of my findings and assumptions in the form of project requirements so that they don’t get lost. In my opinion only the automated tasks should be maintained over time. Results from manual tasks that cannot be automated should be documented, stamped and archived in the wiki.

Evaluation

Unit tests make sure that the code does what is meant to do but that does not imply solving the right problem in an acceptable way. The evaluation strategy typically reflects the real-business scenario in which the model will be used. The choice of performance metrics must have a meaningful explanation within the business context. Metrics should be of easy interpretation from your stakeholders who generally are not data scientists and only speak the company business language.

Good tips is to create a Kaggle-like framework that:

  • defines the APIs reflecting your custom data types
  • use some abstract interface representing the particular implementation (could be split into multiple components, e.g. transformer, trainer, model)
  • knows how to robustly validate the given implementation (e.g. cross-fold, domain specific splitting avoiding data leakage, mix of timestamp and customerId partitioning…)
  • Produce one or a pool of interpretable performance metrics such as: mean average precision @ N, uplift, spam rate, loss rate, retention rate. Avoid abstract concepts like area under the curve or F-score.

Sooner you will find a blog post of our team regarding an offsite in Lanzarote where following the Kaggle-like structure we prototyped 6 different models for a recommender system in less than a week.

When building the evaluation framework, a few questions you want to ask are:

  • What a positive/negative sample represent in this business scenario?
  • Is recall important? Why do you care about accuracy?
  • What actions can be taken upon prediction?
  • In which form the model can be used? How the insights can be presented/visualized? Can it be integrated into an existing IT system?
  • What are the capabilities/practical issues of following the decisions suggested by the model?
  • What is the uplift of the data-driven solution compared to the traditional business as usual performance?
  • How can you test the trained model in the live environment (is A/B testing possible or the bad scenario would cause a lot of damages)?
  • Does the effectiveness of your solution only depends on your model or also from other parties? (e.g. predicting customers to contact for marketing purposes relies on the conversion rate of the marketing team as well)
  • How can you feed-back the results for updating the model? At which rate? Is the model easy to update or must be re-trained for every new collected data? Can you re-train it within the update interval?
  • Will the triggered actions influence the upcoming data (e.g. a recommender system can change the distribution of the future population)? Are there any amplification effect (if you recommend most popular items, those will become even more popular and so on…).

My experience suggests that the more the time spent in implementing a robust and exhaustive evaluation framework the easier and reliable will be maintaining and improving the system later. Time spent here is a good investment and requires a lot of thinking from all of the 3 data science aspects: business, statistics and engineering.

Demo

It is a good practice to demo advances and new results to the team and/or stakeholder at the end of the sprint. Feeling the continuous pace of delivering and improvement is an excellent psychological element and increase trustiness and confidence.

Moreover is the place where scrutiny comes in and you can have your methodology and interpretations challenged. Any deliverable or document presented during the demo should be stored in the wiki with a date associated to it.

Posted in Agile, Machine Learning, Uncategorized | Tagged , | 2 Comments

Coding practices for data products development

This is the part 2 of 4 of the “Lessons learnt from building Data Science systems at Barclays” series.

Coding practices

Code should be developed in a proper IDE and make use of advanced tools for re-factoring, auto-completion, syntax highlighting and auto-formatters; at least.

Notebooks should use routine libraries from the main codebase. As soon as some code is developed in a notebook and is reusable, it should be moved into a codebase. Rule of thumb might be each notebook cell should not exceed the 10 lines, after that either needs refactoring or it should be pulled away. Only exception is long code used only and specifically for the one off investigation that does not make sense outside that particular context.

Do not introduce unnecessary dependencies in the codebase (e.g. plotting libraries). Keep the code repository lean and add dependencies to your particular use case rather than the project repository.

During development is recommended to do frequent git commits. When the ticket is ready to go, the developer should first run a git diff develop and review its own code before to create the pull request (PR).

The pull request should only contain the minimum amount of code specified in the corresponding ticket requirements. You don’t anticipate functions that you know will need in the future even though this future is a couple of hours later. Avoid abstractions or general-purpose methods. First a working code for your specific use case then you will refactor it.

Agile manifesto says:

“Simplicity–the art of maximizing the amount
of work not done–is essential.”

Make your code structure flat:

  • data containers
  • static classes containing functions/methods/utils
  • entry point classes defining the end-to-end job and putting all of the pieces together

Copy and paste the same code if needed, duplication is not always bad if it makes the design simpler. Only extract methods and abstract classes if you have at least 3 use cases.

Comments in the code is very likely to cause out-of-sync documentation. Clean code, good design and self-explaining namings will make your code self-documenting. The only exception to comments are TODO, FIXME and annotations explaining why an hack was needed and in which conditions the current implementation might fail. Obviously avoiding hacks in the first place is the best solution but sometime we need to cope with them. Abuse of TODOs but do not leave non-working code without annotations.

Extreme attention should be paid to the code style and conventions. Having bad formatted code or inconsistent patterns make the code very hard to read and maintain.

After the PR is sent for review, chase your reviewer to review your code asap. Resist from starting a new task until the review is not finished and the PR merged into the develop branch. Do one thing per time and move to the next only when the previous is 100% done.

Reviewers should not accept justification regarding bad practices. Code reviews is the only way to guarantee a convergence of the team towards the excellence. It definitely pays off in the long term. The process of code reviewing should go forth and back until both the two parties are satisfied.

Testing

notestnobeer

You should always come up with smart ways of testing your code. Laziness or “I know it works” approaches should not be accepted. Only code that may not require tests are one-off analysis since that are humanly supervised and are not going into production.

A code without tests is risky, cannot be refactored and cannot be maintained since that unit tests serve as documentation. If someone changes your code than you can still be blamed and be responsible of the failure even though your code used to work. Tests are the only way of protecting validity of your solutions. Time spent in testing is the greatest long-term investment you can do for your project.

If you spot a bug that was not found in your tests, that is an indicator that this test case should be added. Don’t just fix it, make sure you first have the failing test for it. Debug your code by adding unit tests and breaking down end-to-end methods into smaller composable functions. Debugging by adding unit tests will give you a much safer and repeatable way to make your code robust.

Read-eval-print-loop (REPL) debugging is just another type of exploratory analysis, if you want to follow that way then remember to turn your manual techniques into automated tests.

Obviously all of the above problems would not exist in case of TDD.

When your fantasy of creating manual test cases is about to finish or you are too tired of keeping adding tests that always succeed, consider also adding a few property-based tests with random generators.

Unit tests are necessary but is the whole end-to-end that matters. Make sure you have at least a few integration tests in place. The best is if those integration tests actually maps to real use cases.

Pair working

We found pair working to be much more productive than working as isolated individuals. A data science team generally is cross-functional with people ranging from a more engineering background to more theoretical analytical/statistics background. Good rule is to pair opposite individuals together and swap their competencies so that who is good at coding will do the modelling and vice versa. Code review process still applies as usual even though the code was written together, it might be worth to involve someone else with no priori knowledge of the project to review the code and methodology.

Functional Programming

Function programming offers a few advantages over the other paradigms and we found it to suit very well with Data Munging and Machine Learning algorithms. Just to name few:

  • Implementing any complex logic as combination of simple first-order functions instead of long and non reusable methods.
  • No state, no side-effect, the same code will return the same output at every single cal. No debugging is needed.
  • Close match with math. You can implement any algorithm same way you read them from academic papers.
  • No need to think of how make your code to execute efficiently. Focus on functionalities only.
  • High abstract level, keep your brain trained on lateral thinking instead of following mechanical procedures.
  • Conciseness, you will be surprised of how many algorithms (single node or distributed) can be implemented in a single line.
  • Higher readability, you only needs to understand what the functions aim to do and not what the values of each variable represent at each step.
  • Concurrency for free at no extra cost. Full parallelism.
  • Same code for local implementations magically scales up in a distributed environment. That means you can prototype locally without have to re-engineer your solution for the big data system.
  • Type system, you know what functions can be used and what the form of intermediate transformations are. No need of read-eval-print loops or hacky print calls. Easy to implement, reasoning and refactoring complex algorithms without introducing bugs.
  • No explicit loops, you know how your algorithm is converging via recursion.
  • Flat and minimal structure, no need to create tons of classes or verbose notations. You can use anonymous functions, pattern matching and wildcard notations.

Popular languages in Data Science are not always natively functional but most of them offer their functional extension or some external library does. See for example this project of introducing the functional APIs of Scala to Python collections: http://pedrorodriguez.io/blog/2015/03/14/functional-programming-collections-python/.

If you work in Data Science or Big Data and have never done functional programming before, you should really look into it. You might find it a bit steeply at the beginning but after you master it you will be superbly productive.

Posted in Agile, Machine Learning, Software Development | Tagged , , , , | 1 Comment

The ScrumBan Jira board

This is the part 1 of 4 of the “Lessons learnt from building Data Science systems at Barclays” series.

Agile board

Let’s start with one of the core tool of the agile workflow. We use a Jira board for tracking and organizing all of our projects. We developed a custom board which uses the sprints concept of Scrum but in a more flexible way as in Kanban.

jira-board

 

The Scrumban board is configured as following:

  • Horizontally divided in swimlanes (top-down in order of priority):
    • Critical / Blockers
    • Current work
    • Stories backlog
    • Sub-tickets backlog
    • Completed
  • The columns are:
    • To do
    • In progress
    • In review
    • Done / resolved
    • You can optionally have “Ready to release”
  • Quick filters should at least have one filter for each member of the team filtering on its own assigned tickets.

The idea is that during the planning you select from the backlog which high level stories you want to deliver by the end of the sprint (typically 2 weeks long) and then you create subtasks as-you-need.
Reason is that in data science you don’t know what you are about to implement beforehand. Thus you need to investigate-implement-test all the time and as you do it, you discover what to do next. Important is that whatever subtasks is created it is done by the end of the sprint so that the story is completed.

Define stories with a clear goal and a small scope. They should not span over multiple sprints and since that they come with the uncertainty of what tasks will be required, you really need to break a big problem into smaller well-defined problems that are accomplishable no-matter-what.

Avoid having tasks for exploratory analysis or for adding unit tests. Each task should bring some value, potentially a new feature. Each task will then require an exploratory analysis as well as some development and testing. Those steps are already part of the definition of “Done”. See below sections for more explanations about tests and exploratory analysis.

Plan always less than your capabilities. Delivering your stories a few days earlier is a very good sign. Delaying them is bad. If you manage to get your work done by Thursday, spend the whole day of Friday in a pub celebrating your amazing delivery.

In Jira, you must assign each story to one individual but remember that in an agile team either the whole team succeeds or fails. If that person does not manage to finish his tasks on time, it is a team failure. That’s what you have the morning standup for, to make sure everything is under control and team resources are allocated in a way that the sprint is going to be successful.

Never change the scope of your sprints or add tasks that were not planned, unless are required hotfixes. If you are asked to do something else then invite the product owners to join your next sprint planning and only then you can allocate resources for them.
Remember the goal of a sprint is to have a working, even if simplistic, deliverable not solving sparse tasks.

At the end of the sprint have a retrospective meeting to discuss what went well and what not. Make sure to take actions in order to avoid that blockers may appear again in future.

Documentation

Documentation should be as simple as possible.

  • Releases notes, a page where you can note the major changes since previous version, the list of new tickets that have been merged  (linking to Jira) and a link to a more detailed report.
  • The detailed report contains snapshots of the most recent logs, results, observations, limitations, assumptions and performances of the model/etl/application. Often it contains some charts that can quickly explaining how good the product is. We can use those detailed but concise reports to track how the product is evolving. The release detailed report also contains the help messages of how to run the application and all of the command line interface (CLI) options.
    If all of your tests and procedures are fully automated then this page is simply a copy and paste of the results.
  • The usage of a particular job class or a script with the list of CLI arguments and default values is also accessible using –help argument, many libraries helps you doing that (bash getops, scala Scallop…).
  • Other pages are used to explain the complex part of the logic. Try to reduce those pages only when the logic is very complicated and hard to understand by just reading the code.

Documentation is hard to keep in sync that’s way we want to document what’s new since the last release rather than going through the whole wiki and updating every single page.

Ideally the documentation comes from the source code, unit tests and jira tickets. Individual analysis, findings and insights can be documented separately but they should represent static reports rather than project documentation.

In the hierarchical structure of the pages, we limit the maximum depth to 2. Which means we have the root-level pages with at most one level of children pages. Nested structures make it very hard to find contents when you need them.

Branching and versioning

Code should always and only exist in a git repository. Sparse snippets or random script files should be avoided.

We follow the gitflow branching model where each ticket is mapped as features branch. If you integrate Jira with Stash then from the ticket web page you can automatically create the corresponding branch in the repository using develop as branch base.

You do not need to use the complete gitflow branching model but at least the master, develop and features branches. It’s up to the deployment strategy defining how to handle hotfixes, bugfixes and releases branches. Make sure this strategy is clearly defined and is consistently enforced. See deployment.

Story tickets generally don’t have a branch associated, their sub-tasks have.

Install a git hook that every commit will include as prefix the ticket code (that you can parse out from the branch name). Tracking each commit with the corresponding ticket is a life-saver when in future you will try to reverse engineer what a method is doing and why has been created in first place. Then you can access the whole git history and access the corresponding tickets that touched that piece of code.

Discussions

Discussions of specific tasks should go into the corresponding jira ticket web page. This will make the conversation public, tracked and anyone can jump into the discussion with the full context available. Also reference files or supporting documents should be attached to the jira ticket itself or in the wiki if they serve as a general purpose. Remember each jira ticket can be linked from the releases wiki page, that means we never lose track of them. Moreover the query engine is quite good.

We found emails to be the worst place for discussions to happen, especially for sharing files that will become soon out-of-date.

When someone sends you an Excel file, reply saying that your laptop does not have an Office installation on it. If you are sharing small data files, tsv or json is way to go.
Avoid comma separated files with quotes wrapping text fields. You want to make your file editable using simple bash commands rather than loading into a csv parsing library.

We tried also mounted shared drives, but confluence is a much better collaborative way to share and organize files with an integrated version control and metadata.

Avoid meetings as much as you can, invent some excuse, ask for a clear agenda beforehand. Educate your colleagues to communicate with you by raising issues. Leave meetings only for important discussions and spend your meeting time for presenting and checkpointing with your stakeholders more frequently.

 

 

Posted in Agile, Big Data | Tagged , , , , , | 4 Comments

Functional Data Validation using monads and applicative functors


ETL is probably the most time consuming part of every Data Science project. The quality of extracted and crunched data is one of the major factor affecting the final results. In facts, real world data is always messy and inconsistent. Data Validation is a must for enforcing the correctness of the proposed solution and to make sure the underlying data represent the true business scenario.

When performing a data validation, the following issues often arise:

  • We want to track of how much information we lose for debugging and reporting scopes.
  • Sometime we want to cleanse invalid data instead of filtering out.
  • Part of the validation logic depends on the project requirements and/or model assumptions. They change often and the re-factoring may introduce bugs.

In this tutorial we are showing how to use monads, applicative functors and other functional programming concepts to safely and elegantly define the validation logic using a modular pattern. Each rule is defined individually and the final logic is built by using two types of composition:

  • Monad-composition. One rule after the other, if one fails the next rule is not applied.
  • Applicative-composition. All of the rules are applied independently and the validation results are collected and merged together.

Moreover, the data that do not pass the validation tests is not discarded but moved into a separate pipeline with all of the needed meta-data information attached to it explaining why this particular record was discarded. This allows us to:

  • Log all of the specific causes of data loss.
  • Easily recover previously invalidated data if the validation rules change.
  • Re-use part of the discarded data further down in the data pipeline. The whole ETL workflow is aware of what has been discarded before.

Spark, Scala, Scalaz and Sparkz

The tutorial is part of the open source project Sparkz which aims to extend the Apache Spark framework providing more functional APIs. The implementation is in Scala and leverage Scalaz, which is the framework from where Sparkz was inspired from.

Scalaz provides a Validation data structure that is similar to Either (where an object can either be Left/Failure or Right/Success) but is not a monad but an applicative functor because instead of chaining the result from first event to the next, Validation validates all events. Since that in case of a failure there must be at least one error message, we enforce the failure type to be a non empty list of error messages. For this purpose scalaz already provides a data structure ValidationNel to accumulate all of the error messages into a Nel (non empty list).

See this page for documentation: http://eed3si9n.com/learning-scalaz/Validation.html

The concepts and methodology can be applied to any data computation framework and programming language. You will just have to re-implement yourself part of the boiling-plate code you will see in this tutorial that does all of the magic for you.

The user/events data validation use case

The use case we are using for this example is a simple data type consisting of a triple of userId, eventCode and timestamp:


case class UserEvent(userId: Long, eventCode: Int, timestamp: Long)

Each UserEvent can either be marked as correct or as invalid. For the latter case we will wrap it into another case class InvalidEvent containing the invalid event as well as some meta information regarding the error cause:

sealed trait InvalidEventCause

case class InvalidEvent(event: UserEvent, cause: InvalidEventCause)

The goal is to build a function that takes an UserEvent and returns a ValidationNel of either the correct event or the non empty list of all of the causes:


UserEvent => ValidationNel[InvalidEvent, UserEvent]

The reason why we want to return ValidationNel[InvalidEvent, UserEvent] instead of ValidationNel[InvalidEventCause, UserEvent] is because we want to keep the original datum in case of further recovery instead of only storing the error causes. This implies that the same object is duplicated multiple times which is not efficient but we are not addressing optimisation issues in this tutorial, we will leave it for future posts.

Validation Rules

The easiest way to define each rule was via partial functions that map an UserEvent into an InvalidEventCause. A Partial function is a function that is only defined for a subdomain of the input arguments. In our case is a function which tries to invalidate a datum and is not defined for correct records. The full validation logic will be expressed as a List of partial functions such as:


val validationRules: List[PartialFunction[UserEvent, InvalidEventCause]]

In order to reduce the boilerplate code the PartialFunction returns an InvalidEventCause and then our implicit logic will wrap it together with the original UserEvent object into an InvalidEvent container.

Some rules are simply pre-defined, such as checking that a timestamp is in a min-max range or that the userId is in a white list and so on. Others are more complicated and are derived from the underlying raw data (before validation).
The method generating the final validation function takes as argument the RDD with the raw data plus a bunch of parameters and objects used for defining the single rules:

def validationFunction(events: RDD[UserEvent],
 eligibleUsers: Set[Long],
 validEventCodes: Set[Int],
 blackListEventCodes: Set[Int],
 minDate: String, maxDate: String): UserEvent => ValidationNel[InvalidEvent, UserEvent] 

In order to compile the snippets of code you will have to add some dependencies in the imports:

 
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.joda.time.{DateTime, Interval, LocalDate}
import sparkz.utils.Pimps._
import scalaz.Scalaz._
import scalaz.ValidationNel

First thing we grab the SparkContext from the events RDD:


val sc = events.context

Valid event code

We want to filter out all of the events whose code does not belong to the validEventCodes set.

 

case object NonRecognizedEventType extends InvalidEventCause

val validEventCodesBV: Broadcast[Set[Int]] = sc.broadcast(validEventCodes)
val notRecognizedEventCode: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if !validEventCodesBV.value.contains(event.eventCode) => NonRecognizedEventType
}

We could have enclosed the set directly in the partial function but we rather prefered to broadcast it and retrieve it using the value API.

Why using Broadcast variables in Spark explained here: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Eligible users

Just like event codes we want to make sure that we only select data from a pool of predefined eligible users:


case object NonEligibleUser extends InvalidEventCause

val eligibleUsersBV: Broadcast[Set[Long]] = sc.broadcast(eligibleUsers)
val customerNotEligible: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if !eligibleUsersBV.value.contains(event.userId) => NonEligibleUser
}

N.B. that if the eligibleUsers set is large you cannot broadcast it as a shared variable but you want to rather turn into a paired RDD and use the userId as a key for the join. If you want to preserve the information of why you discarded a particular user you will have to perform an outer join instead of the inner join.

Blacklist users

This logic is slightly more complicated. We want to filter out all of the events of those users for which we observed at least one event in the black list. We will have to first scan the raw dataset in order to create the blacklist user ids.


case object BlackListUser extends InvalidEventCause

val blackListEventCodesBV: Broadcast[Set[Int]] = sc.broadcast(blackListEventCodes)
// Users for which we observed a black list event
val blackListUsersBV: Broadcast[Set[Long]] = sc.broadcast(
  events.filter(event => blackListEventCodesBV.value.contains(event.eventCode))
  .map(_.userId).distinct().collect().toSet
)
val customerIsInBlackList: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if blackListUsersBV.value.contains(event.userId) => BlackListUser
}

We first run the distinct() to reduce the size of the RDD before to collect() and turn into a set.

Timestamp out of global interval

We specified the minDate and maxDate as strings in ISO format and we want to filter out all of the timestamps outside this range. For this logic we don’t need any pre-computation we can directly implement it as:


case object OutOfGlobalIntervalEvent extends InvalidEventCause

val eventIsOutOfGlobalInterval: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if !new Interval(DateTime.parse(minDate), DateTime.parse(maxDate)).contains(event.timestamp) =>
    OutOfGlobalIntervalEvent
}

 

We use joda-time to parse strings and timestamp epoch numbers into more manageable classes.

First day to consider of the user

This time we want to have a stricter rule regarding the event timestamps. We want to avoid border effects by removing all of the events on the same date from where we started observing the first event of a particular user. That could be because the first date may be incomplete and do not contain all of the events and can invalidate our data assumptions. Since that we also introduced the concept of global min/max interval, the first date to consider for a particular user is the max between the first date we observed from the data and the start of the global interval.


case object FirstDayToConsiderEvent extends InvalidEventCause

// max between first date we have ever seen a customer event and the global min date
val customersFirstDayToConsiderBV: Broadcast[Map[Long, LocalDate]] =
  sc.broadcast(
    events.keyBy(_.userId)
    .mapValues(personalEvent => new DateTime(personalEvent.timestamp).toLocalDate)
    .reduceByKey((date1, date2) => List(date1, date2).minBy(_.toDateTimeAtStartOfDay.getMillis))
    .mapValues(firstDate => List(firstDate, LocalDate.parse(minDate)).maxBy(_.toDateTimeAtStartOfDay.getMillis))
    .collect().toMap
  )
val eventIsFirstDayToConsider: PartialFunction[UserEvent, InvalidEventCause] = {
  case event if customersFirstDayToConsiderBV.value(event.userId).isEqual(event.timestamp.toLocalDate) =>
    FirstDayToConsiderEvent
}

We reduced the events rdd into the minimum date for each userId using the reduceByKey monoidal aggregation. Then we applied the max function between the minimum reduced date and the global one.

Validation rules composition

All we have to do is take the individually defined partial functions (that in Scala are objects like everything else) and put them into a List.

We realised that eventIsFirstDayToConsider is dependending on eventIsOutOfGlobalInterval. If an event is filtered because outside the global interval there is no need of putting it through the first-day-to-consider rule. Thus we can create a monad of the two rules by using the orElse method on the first partial function which takes as argument the second partial function which is applied if and only if the first one is not defined (in our case is not already outside the global min-max range). The orElse method returns a new partial function which will be treated as a single validation rule and internally combines the two of them.

val validationRules: List[PartialFunction[UserEvent, InvalidEventCause]] =
  List(customerNotEligible, notRecognizedEventCode, customerIsInBlackList,
    eventIsOutOfGlobalInterval.orElse(eventIsFirstDayToConsider)
  )

The order of the validation rules does not matter since all of them will be applied independently even if computationally they will be applied sequentially. If you want to computationally apply them in parallel then you can use the par method of a list that turns it into a parallel collection.

The final validation function will be generated by a couple of syntactic sugar implicits that we implemented in Sparkz.


(event: UserEvent) => validationRules.map(_.toFailureNel(event, InvalidEvent(event, _))).reduce(_ |+++| _)

The magic operators

In the list of imports we specified:

import sparkz.utils.Pimps._

Pimps are a nice pattern used in Scala to implicitly add methods to classes (the action of pimping). It is particularly useful when you want to use the postfix notation to apply a method to a class that do not expose that method.

In order to implement our validation pattern we had to pimp two classes: the PartialFunction and the ValidationNel. The pimped methods hides the boilerplate logic computed behind the scenes.

implicit class PimpedPartialFunction[X, E](pf: PartialFunction[X, E]) {
  def toFailureNel[W](x: X, toW: E => W = identity _): ValidationNel[W, X] =
    pf.andThen(e => toW(e).failureNel[X]).applyOrElse(x, (_: X).successNel[W])

  def toFailureNel(x: X): ValidationNel[E, X] = toFailureNel(x, identity)
}

The toFailureNel method attempt to apply the partial function (X => E) to the element x and in case the function is defined returns a failureNel (a non empty list of a single failure of type E) where the failure object is the one returned by the original function. In case the function is not defined the pimped method creates a successNel of type X.

The more general method takes also an extra argument toW that converts the error object e returned by the original function (if defined) and wraps it into another type W. It acts as a functor for the failure case. This method allows us to encapsulate the information of the original event that generated the error together with its cause.

In our use case the generics types of X, E and W are:

X: UserEvent
E: InvalidEventCause
W: InvalidEvent

Once we have converted the partial functions into ValidationNel instances, we need to reduce them into a single one. Scalaz provides a monoid binary operator +++ that takes two ValidationNel instances and merge them together. In case of failures, appends all of the failures into a single list of type E. In case of success results, applies an external monoid operator on type X. In other words that operator knows how to reduce failures by concatenating the errors but it requires to specify how to combine the correct results when all of them are Success.

Since that in our use case we are not transforming the correct data but either we discard it or we keep it as it is, the monoid operator is straightforward. We assume that the success results contains always the same original object, aka they never transform it. Thus the monoid operator simply returns the object itself where the two objects to reduce just represent a copy of each other.

Our PimpedValidationNel provides a simplified operator that does not require the semigroup monoid defined for the type X:


implicit class PimpedValidationNel[E, X](x1: ValidationNel[E, X]) {
  // Extension of scalaz.Validation.+++ operator, does not require the semigroup defined for X
  def |+++|(x2: ValidationNel[E, X]) = x1 match {
    case Failure(a1) => x2 match {
      case Failure(a2) => Failure(a1 append a2)
      case Success(b2) => x1
    }
    case Success(b1) => x2 match {
      case b2@Failure(_) => b2
      case Success(b2) if b1 == b2 => Success(b1)
      case Success(b2) => throw new IllegalArgumentException(s"$b1 not equals to $b2")
    }
  }
}

This operator also allows us to merge together results coming from different validation pipelines. If we have two RDDs of the same ValidationNel generic types we could theoretically join them and merge the results of the same objects. This operation is very expensive and is much more prefered to join the lazy functions that generates the validation objects and apply the final combined function to each record in a single pass over the dataset.

What you can do with the ValidationNel API

Simplest thing would be getting only the valid records:

def onlyValidEvents(events: RDD[UserEvent],
                    validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): RDD[UserEvent] =
  events.map(validationFunc).flatMap(_.toOption)

Or the opposite thing, getting only the invalid events and flat-mapping them into their corresponding error wrapping class:

def invalidEvents(events: RDD[UserEvent],
 validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): RDD[InvalidEvent] =
 events.map(validationFunc).flatMap(_.swap.toOption).flatMap(_.toList)

Suppose we want to extract all of the original events that failed because of a particular cause, for instance when their timestamp was out of range:

def outOfRangeEvents(events: RDD[UserEvent],
                     validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): RDD[UserEvent] =
  events.map(validationFunc).flatMap(_.swap.toOption).flatMap(_.toSet).flatMap {
    case InvalidEvent(event, OutOfGlobalIntervalEvent) => event.some
    case _ => Nil
  }

N.B. We are exploiting the implicit conversion from an Option to a Iterable to apply a combination of filter and map into a single flatMap operation.

Now, suppose we would like to print a debug message with the count of invalid events by the set of their error causes.

def causeSetToInvalidEventsCount(events: RDD[UserEvent],
                                 validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): Map[Set[InvalidEventCause], Int] =
  events.map(validationFunc)
  .map(_.swap).flatMap(_.toOption).map(_.map(_.cause).toSet -> 1)
  .reduceByKey(_ + _)
  .collect().toMap

The above method will return a map that looks like:

Map(Set(NonEligibleCustomer, NonRecognizedEventType) -> 36018450,
Set(NonEligibleUser) -> 9037691,
Set(NonEligibleUser, BlackListUser, NonRecognizedEventType) -> 137816,
Set(NonEligibleUser) -> 464694973,
Set(BeforeFirstDayToConsiderEvent, NonRecognizedEventType) -> 5147475,
Set(OutOfGlobalIntervalEvent, NonRecognizedEventType) -> 983478).

Please pay attention that we are not counting by the individual cause but we are grouping by the combination of causes that co-occured together. This gives us much more debugging power with no loss of information as opposed to the traditional monad sequential validation where only the first cause would be recorded.

Moreover, what we might really be interested on is the count of how many users we lost as effect of the events validation. In other words how many users we lost because they had no any event left after validation.

def causeSetToUsersLostCount(events: RDD[UserEvent],
                             validationFunc: UserEvent => ValidationNel[InvalidEvent, UserEvent]): Map[Set[InvalidEventCause], Int] = {
  val survivedUsersBV: Broadcast[Set[Long]] =
    events.context.broadcast(events.map(validationFunc).flatMap(_.toOption).map(_.userId).distinct().collect().toSet)

  events.map(validationFunc).flatMap(_.swap.toOption)
  .keyBy(_.head.event.userId)
  .filter(_._1 |> (!survivedUsersBV.value(_)))
  .mapValues(_.map(_.cause).toSet)
  .mapValues(Set(_))
  .reduceByKey(_ ++ _)
  .flatMap(_._2)
  .map(_ -> 1)
  .reduceByKey(_ + _)
  .collect().toMap
}

What the above code does is the following:

  1. Compute the set of “survived” users from the correct events after validation.
  2. Filter only the invalid events and of the users who did not survive.
  3. Each list of failures is turned into a Set of failure (so that the order does not matter).
  4. All of the causes set are grouped by userId and deduplicated in such a way that the single combination of causes set only appears once for each userId.
  5. Then count for how many non-survived users each causes set appears.
  6. Returns a map from causes set to an integer representing the count of lost users.

It should return something like:

Map(Set(NonEligibleCustomer, NonRecognizedEventType) -> 1545,
Set(NonEligibleUser) -> 122,
Set(NonEligibleUser, BlackListUser, NonRecognizedEventType) -> 3224,
Set(NonEligibleUser) -> 4,
Set(BeforeFirstDayToConsiderEvent, NonRecognizedEventType) -> 335,
Set(OutOfGlobalIntervalEvent, NonRecognizedEventType) -> 33)

Conclusions

In this tutorial we showed how we can extend Scalaz to provide a functional and elegant way of applying validation rules to clean a raw dataset. We showed how the whole logic is parallelizable and scalable using the Apache Spark framework. The main advantages of this approach is that we never lose information but we are able to split the data into 2 pipelines (valid/invalid) and mark each invalid record with some metadata. We presented a simple tutorial for a common use case showing how to define validation rules and how to use the API of the generalized ValidationNel objects in order to perform debugging and cleansing tasks.

The source code is available at: https://github.com/gm-spacagna/sparkz/blob/master/src/examples/scala/sparkz/DataValidation.scala.

The whole procedure is not fully optimized for efficiency, the immutable objects creation may create an unnecessary overhead for the garbage collector and the way we wrap the original datum in case of failure creates a lot of duplicated clones of the same object. We leave this task of optimizing to future blog posts.

We hope that this tutorial will inspire Data Scientists and Engineers, regardless of the language and/or technology stack, to approach their coding in a more functional way. Functional programming offers the elegance and conciseness of implementing arbitrary complicated logic as a simple combination of reusable high-order functions as opposed to the classic imperative programming paradigm. We found this way of writing code to be much more suitable for implementing math and data transformation algorithms.

***

Similar articles about Data Validation using Scalaz:

https://github.com/FranklinChen/data-validation-demo
https://www.innoq.com/en/blog/validate-your-domain-in-scala/

Posted in Big Data, Data Munging, Scala, Spark | Tagged , , , , , , | Leave a comment