End-to-end MLOps with Azure Databricks, Azure AKS and Azure EventHubs. Part #1 — model training

Ivan Trusov
12 min readAug 10, 2022

--

Iridescent Interpenetration №7 by Giacomo Balla, source: Wikiart

MLOps is a hot topic nowadays. Whilst the importance and business value of it is well described in various papers, e.g. [1], [2], there seems to be a lack of hands-on examples with explanations on how this could be done.

In this series of blogposts I’m going to demonstrate an example on how to build an ML application following MLOps practices and using Azure Databricks, Azure AKS and Azure EventHubs services. This part is dedicated towards model training and all the small details around it.

Process overview

To better understand the process, let’s take a look at this diagram:

The End-to-End CD4ML Process

This diagram tells us that we have three main components of our model that we shall take care of — code, model and data. This is something we’ll take care of with Azure Databricks platform. As for deployment and monitoring, we’ll also use Azure AKS and Azure EventHubs services.

High-level architecture

Let’s start from outlining the solution architecture in terms of used technologies and their roles:

As per MLOps standards, we need to keep jointed versioning of the following parts:

  • Data — versioned in the Lakehouse, since it’s stored in Delta Lake Format with it’s Time Travel capability
  • Code — versioned in GitHub
  • Model — versioned in MLflow

MLflow also provides a model registry, which allows users to mark specific experiment runs as a model version and easily share the model to the serving layer (Azure AKS).

As for monitoring, we’re going to serve the model as a ReST API, which means we’ll need to collect the inputs and outputs of the model in real time. A good approach towards this is to send input-output pair from the serving interface directly towards Azure EventHubs, and then collect these metrics into a Lakehouse table. On top of this Lakehouse table we’ll be able to compute and visualize the metrics using DBSQL.

Preparing the project

Now it’ s getting to the technical part of implementation, time to roll up the sleeves and write some code. I’m going to use dbx since I prefer to develop ML pipelines in an IDE rather than in a notebook.

Since IDE is going to be used, some setup on the local machine is required:

  1. Install conda
  2. Create a new einvironment
conda create -n e2e-mlops-demo python=3.9

3. Activate a new environment:

conda activate e2e-mlops-demo

4. Install OpenJDK for local Spark tests:

conda install -c conda-forge openjdk=11.0.15

5. Install dbx:

pip install dbx

6. Create a new Databricks API token and configure the CLI:

databricks configure --profile e2e-mlops-demo --token

7. Check that profile is working correctly:

databricks --profile e2e-mlops-demo workspace ls /

Now the preparation is done, let’s generate a project skeleton using dbx init:

dbx init -p \
"cicd_tool=None" \
-p "cloud=Azure" \
-p "project_name=e2e-mlops-demo" \
-p "profile=e2e-mlops-demo" \
--no-input

This command will generate the project template for us, with some useful settings and assets prepared.

8. Since I’m using GitHub, it’s easy to create repository from local machine and push it directly to the GitHub without using UI (this command is valid for MacOS and Linux, for PowerShell it needs to be slightly adopted):

gh repo create "${PWD##*/}" --private --source=. --remote=origin

9. Now it’s time to commit the initial version of the code:

git add . # add generated files to repo
git commit -m "init"
git push -u origin main

10. We also cal install and launch the project unit tests:

pip install -e ".[dev]"
pytest tests/unit --cov

I’ve curated the repository slightly (removed some irrelevant files), you can find the relevant version of it here.

Preparing the data

Before actually running any kind of a model, we’ll need to prepare some data. For demonstration purposes I’ve chosen the task of credit card fraud classification, and took the public dataset from OpenML.

Data is the one of the key parts of the whole MLOps process. On the Databricks Lakehouse platform we indeed store data in Delta Lake — efficient and fully open-sourced format which also supports versioning.

The code to download data into a Delta table is pretty much trivial (we simply download the DataFrame and do some conversions):

Main logic is defined in the launch method. We first prepare a separate database, then downloading the data into memory as a Pandas DataFrame, and do some transformations for convenience.

Final step is to convert this dataset to a Spark DataFrame and then save into a table. This table is also available for other users via SQL statements.

Data versioning

Since it’s a Delta Table, it also contains history of all operations:

Table history in Lakehouse

We’ll use this feature in our ML training pipeline in the following fashion — when we read the table, we’ll check the latest version of it, and then log this metadata together with our model run into MLflow.

For better typing I’ve introduced logical models written in pydantic:

ModelData is the logical class which encapsulates train and test data, as well as providing the source metadata information (version, database and table).

The source metadata can be easily collected at a time of data reading:

Using Delta APIs it retrieves the last_version of the dataset, and then using Spark APIs it reads the dataset of a given version into a pandas DataFrame.

Noticing the imbalance

When it comes to the data, it’s easy to notice that this dataset could easily fit into memory:

It also seems like there is a huge class imbalance — pretty expected situation in fraud detection cases:

This minimal exploration provides us some valuable info:

  • Non-standard metric for the classification model is required
  • There is a potential for some over-sampling to balance classes
  • There is no need for data or model parallelism, but we could speed-up the hyper-parameters search

Metric choice

Let’s start thinking about the metric first. There are many metrics that support the imbalanced learning, and one of them is Cohen’s Kappa. There are various discussions whenever it’s a best-in-class metric for imbalanced cases.

I’ve seen this metric being used a couple of times in real-world applications, and it’s already implemented in sklearn. You can read some relevant considerations and discussion here.

We’ll also calculate more standard F1 and ROC AUC scores for better introspection.

Smart over-sampling

To make training process more effective, I’m also going to add some smart oversampling from the great imbalanced-learn library.

Let’s take a look at the code that is responsible for re-balancing the dataset:

Although it looks a bit too verbose from the first sight, it’s not that scary in detail. After trivial split into X and y variables, we first split our data into train and test datasets with stratification by y variable.

Stratification is important to make sure we preserve the percentage of the target variable in both training and testing datasets.

Next step is to actually apply over-sampling technique, I’m using the ADASYN since it’s fast and pretty robust. Important to mention — we only generate the resampled data for training data, and we don’t touch the test to avoid test leakage.

At this point we’ve covered both topics of metric choice and over-sampling. However, we also have ability to scale out the process of hyper-parameter search. But there are so many scalability patterns, which one is to choose in this specific case?

Here I make a humble reverence to the scalability in ML tasks.

Different kinds of scalability

There are many various ways to scale the ML training process:

  • Frameworks that support both data out-of-memory data and can scale with amount of computational resources —for example SparkML, H2O.ai, SynapseML.
  • For deep learning, there are Horovod + Tensorflow/Pytorch.
  • Most-popular boosting algorithms support running via Spark API, providing scalability both in terms of data size and compute(e.g. LightGBM, XGBoost, CatBoost)
  • Another option is to parallelise training is to split the dataset horizontally and train a separate model per each partition. Partitioning key could be for example region, or segment. This functionality is available via Pandas UDF functionality and is well described here.
  • If the dataset fits into memory, its possible to parallelise the process of hyper-parameter search. This functionality is covered by hyperopt integration.

All of these scalability patterns are available in the Databricks Lakehouse Platform.

Side note: check how many of these are available in the typical CDW scenario without exporting data out of it. After thinking about this, take a look at this blogpost from Jason Pohl.

Pipeline and search space

But let’s get back to the model building topic. For the purpose of this example I’m going to use the latter option and train my model using scikit-learn-based pipeline with hyperopt for hyperparameter search.

This approach requires to define the search space and the relevant model pipeline. I’ve factored out this code for better readability into a separate class:

As for model, I’m wrapping multiple steps into a chain using Pipeline API from sklearn. This approach unifies the model interface — now it becomes possible to replace the XGBClassifier with any other sklearn-compatible classifier and there is no need to fiddle around training interfaces.

Search space definition is pretty simple, it’s merely an example of how it could be configured. There is also one interesting thing about hyperopt API logic — after running the fmin function it will return a dictionary with a slightly different structure:

best_params = fmin(...)
print(best_params)
# example output:
{
"max_depth": 3,
"n_estimators": 100,
"learning_rate": 0.001
"reg_alpha": 0.03
"base_score": 0.02
}

It’s better to be aware of this when applying the best_params object to build the final model.

Training the model

With the considerations above it becomes pretty easy to factor out the code for model training into a separate class — Trainer . You can find the full code of this class in this repo, I’m going to point attention to some details of it.

To log parameters, metrics and the model in a convenient way a good idea is to create a structure of mlflow runs:

In the mlflow UI this structure will look like this, allowing ML engineers to manage and compare the runs in a simple fashion:

I would not recommend saving the model instance of each hyper-parameter run, although it’s possible and might be required in some cases.

From the API perspective, logging logic will look like this (follow the comments below):

  • In the Trainer initialization step the experiment_id and the parent_run_id is obtained. No additional params or metrics are added in the run at this step, this will be done in the end of training process
  • The entrypoint method train launches the hyperopt search with the self._objective method as function
  • Inside the objective, self.setup_mlflow() method is called. This is only required for local testing, and this method will not have any effect when this code is launched on Databricks. For local testing it’s required to provide the MlflowInfo object during Trainer initialization— it will point the child process spawned by self._objective to use the local MLflow registry.
  • The two-levelled with construction in _objective creates a nested run and provides context in which parameters and metrics will be saved by _train_model_and_log_results function.
  • After running the fmin function, a dictionary of best_params is generated. At this point the model is re-trained once again with the best_params and then the model instance is saved into the parent run and registered into MLflow Model Registry as a new version.
  • Point out the _register_model method which demonstrates how the package code is collected and stored together with the model object. Please note that the code_paths option is available in mlflow>=1.27.0 and on Databricks Runtime 11.0 ML and higher.

This concludes all the small details around model and code versioning. The main logic of the training is stored inside the _train_model_and_log_results method:

This function contains all the logic relevant to actually training and logging the model parameters and metrics. Whilst the pipeline.fit seems pretty standard for ML engineers, here are some additions in this code:

  • standard sklearn method get_params() is pretty verbose when it generates parameters dictionary for Pipeline. Therefore I’ve slightly customized it, removing the steps section and auxiliary steps description from it.
  • We explicitly save the source information to keep track of the source data version and location

Local testing

The provided Trainer class is easily testable on the local machine:

This test block runs the trainer with provided parameters and fixtures for dataset and mlflow_local instances. In the end it asserts that the model has been created and registered in the local temporary mlflow instance.

By writing a set of tests, it’s possible to cover the whole data loading and model training process with a decent amount of tests:

pytest tests/unit --cov --cov-report=html

There is indeed some place for improvement, but as a first shot it’s a good coverage metric.

Deploying the model to the Model Registry

The final step for this part is to train the model at scale and make it available in the Model Registry.

To do this, let’s define a Databricks workflow which consists of two steps:

This deployment file describes all the components we’ll need for this workflow, specifically:

  • Cluster definitions
  • Task definitions
  • Relationship between tasks and clusters
  • Order of execution between tasks

This file follows the Databricks Jobs API 2.1 specification, with some minor alterations that are features of dbx, for example:

  • instance_pool_name and driver_instance_pool_name will be automatically replaced with their respective IDs
  • Locally referenced files (e.g. file:fuse://conf/tasks/builder.yml ) will be automatically uploaded to the DBFS and referenced in the job definition.
  • Package containing all the project code will also be automatically built and uploaded to DBFS

All these actions are to be performed when the following command is launched:

dbx deploy --job=e2e-mlops-demo-main

After running it, check out the job in Workflows tab:

When this job is getting launched, data will be first downloaded and then a new hyperopt search process will be launched. At the end of builder step, a new model version will be created in the MLflow Models Registry. This registry is available via the Models tab in Machine Learning lens:

This model version is not yet in any stage, because we haven’t yet defined any stages for it. This is something we’ll do in the next part when the model will be deployed to a serving layer.

Parameter search scalability

We all like performance graphs, aren’t we? Here is another one — training a 100 of model evaluations using various amount of worker nodes, specifically 1,2,6,10 and 14 workers configuration.

Before running the performance tests, some general considerations were taken into account to produce comparable results:

  • All tests were performed on Azure Databricks Jobs cluster with Databricks ML Runtime 11.1.x-cpu-ml-scala2.12.
  • All cluster nodes (both driver and workers) were pre-warmed to avoid VM startup time effects.
  • Average worker node utilization was around 85–90% during the active training phase.
  • Since num_estimators was also one of the parameters in XGBoost, it might happen that some tasks were heavier than others. However, most of the training results chose num_estimators in range from 80 to 100.

You could also read more about some of configuration details below:

The stats for the runs are described below:

From these values we could see that in this specific case hyperopt brings pretty good scalability in terms of parallelising the hyper-parameter search process.

Summary

In this part of the series the following topics were discussed:

  • high-level architecture for MLOps process
  • quickstart and project generation
  • Various ML training scalability patterns
  • Training process with hyperopt

The code relevant for this version could be found here (please note the 0.0.1 tag). In the upcoming part we’re going to deep dive into the serving topic. Stay tuned!

--

--

Ivan Trusov

Senior Specialist Solutions Architect @ Databricks. All opinions are my own.