In their first week with Commit, each new Engineering Partner takes on a hackathon onboarding project. They build a project, present it to the Commit community, and write a short summary of their experience. There are no restrictions, no limits, no joke.
For my hackathon onboarding project, I built a configurable machine learning pipeline with FastAPI, Celery and Redis. The goal of this project is to create an API endpoint that takes request data that represents the features a house would have, and the endpoint returns an estimate on what the house is worth. There would also be a machine learning model which would be trained and pickled and used as the source of our estimates. The model would be trained in a long running process that would run outside the request/response cycle. If a long running process is part of your application workflow, rather than blocking the response, you should handle it in the background, outside of the request/response cycle.
This is the case for many web applications leveraging machine learning to provide value. In this project I’m building a machine learning regression problem. A regression problem involves making predictions based on previous data points.
A lot of blog posts talk about building machine learning models, but in this post we’ll be taking a step further by building a machine learning model and building a scalable and reusable production-ready system where predictions can be served from an API endpoint.
FastAPI and Python dependencies
FastAPI is a python web framework which provides asynchronous capabilities easily and removes points of failure in your application. I chose to use FastAPI because I wanted to learn how it works and because it’s considered the standard framework for machine learning operations.
We’ll be using SQLAlchemy to interface our database tables, where we’ll be storing data about predictions we make.
We’ll be using Celery (with Redis) to run long-running jobs in the background, like training the ML model.
Objectives
- Load training data and train machine learning model.
- Create FastAPI and create API endpoints.
- Integrate Celery with FastAPI app and create tasks.
- Run processes in the background with a separate worker process.
You can find the repository for this project here.
Folder structure
– app/
– scripts/
– __init__.py
– training.py
– utils/
– __init__.py
– helpers.py
– utils.py
– tests/
– __init__.py
– sql/
– __init__.py
– tasks.py
– database.py
– views.py
– schema.py
– ml_pipelines
– config.ini
– Dockerfile
– docker-compose.yml
– logging.conf
– main.py
– requirements.txt
- The main.py will be the entry point to our application.
- The scripts folder would house the code for training the ML model. It can also house further scripts we may need to run one time or continuously.
- The schema.py will house existing database tables with SQLAlchemy. We need this layer to serve as a SQL abstraction for easy insertion and querying.
The models.py file contains pydantic classes, which we’ll use for validation of data coming from the API endpoint we’ll be building.
Overview of the system
The problem and machine learning
Imagine you’re trying to sell your house and you’re trying to determine the appropriate sale price based on a few parameters, like number of bathrooms, size of your yard, etc.
We’ll use Boston housing data provided by sklearn as test data for this use case. We’ll be training our machine learning model with Ridge regression. The properties of the house, such as the number of bathrooms and bedrooms and yard size, are data points we’ll fit into our Ridge model. Ridge models allow us to shrink coefficients made by linear regression. This means that the estimated coefficients are pushed towards 0, to make them work better on new datasets (or “optimized for prediction”). This allows us to work with many features in our data and avoid overfitting at the same time.
def get_data():
boston = load_boston()
x, y, features = boston.data, boston.target, boston.feature_names
df = DataFrame(data=x, columns=features)
return df, y
The code above shows the functions we’ll use to pull the Boston housing data.
You should take note of the columns in the dataset. Those are the parameters our API endpoint will be taking. There are 13 features in the dataset: 11 numerical features and two categorical features.
Data pre-processing
def build_pipeline(features_numerical: list, features_categorical: list) -> Pipeline:
transformer_numerical = StandardScaler()
transformer_categorical = OneHotEncoder(handle_unknown='ignore')
preprocessor = ColumnTransformer(transformers=[('scaler', transformer_numerical, features_numerical),
('ohe', transformer_categorical, features_categorical)])
pipeline = Pipeline(steps=[('preprocessor', preprocessor),
('regressor', Ridge(random_state=SEED))])
return pipeline
We’ve been working with numerical data since we started this project. We’ll be encoding the numerical data with a StandardScaler encoder and the categorical data with the One-Hot Encoder. The StandardEncoder standardizes features by removing the mean and scaling to unit variance. Standardization is a common requirement for most machine learning models. They may behave poorly if the individual features don’t more or less look like standard normally distributed data.
The CHAS (Charles River dummy variable) and RAD (index of accessibility to radial highways) columns in our dataset represent categorical data. We’ll be parsing both fields through a one-hot encoder. In the one-hot encoding, each value in the columns is assigned to either 1 or 0.
Putting it all together
def train() -> Pipeline:
df, y = get_data()
features_categorical = ['CHAS', 'RAD']
features_numerical = ['INDUS', 'TAX', 'RM', 'CRIM', 'DIS', 'PTRATIO', 'LSTAT', 'AGE', 'ZN', 'B', 'NOX']
logger.info(f'Training data: {df.shape}')
pipeline = build_pipeline(features_numerical, features_categorical)
logger.info('ML build successful')
pipeline.fit(df, y)
logger.info('new ML model trained')
return pipeline
In the next section we’ll be using this function to create a Celery task, which we can run periodically as a cronjob, but in this tutorial it’s only a Celery task, which can be triggered by the POST /train/ endpoint.
Tasks
We need to wrap the machine learning training task with celery.task decorator for it to be discoverable by the Celery queue.
from app.scripts.training import run_trainer
from app.utils.utils import get_config_params
from celery import Celery
celery = Celery(__name__)
postgres_config = get_config_params(config_path='config.ini', section='redis')
celery.conf.broker_url = postgres_config['url']
celery.conf.result_backend = postgres_config['url']
@celery.task(name="ml_trainer")
def run_ml_trainer():
run_trainer()
Health endpoint
Let’s find out the current state of database connection and the current version of the machine learning pipeline.
A GET request to the /health endpoint returns data that shows the current state of the web app.
Machine training endpoint
We need to train our machine learning first before we can use the prediction capability of our web application. We make a POST request to the /train endpoint.
From the response, it’s obvious this endpoint returns a task ID which we can use to know the status of the training task we just triggered.
Task results
Once a task has been queued, we can check the status of the task with a GET request to results/{task_id}.
Making predictions
This is a crucial part of our web app and this is a POST request to the predict/ which takes in features as defined by the Boston housing dataset and returns predictions on what the price of a house might be based on the input.
Now we can get predictions based on the request data.
Conclusion
I learned a lot doing this hackathon onboarding project, and I hope you learned something from this article.
Kingsley Torlowei is a Commit engineer who specializes in API design, automation, machine-learning and all things DevOps. In his spare time, he follows Arsenal Football Club and enjoys working out.