Creating a Basic Trino Service to Start On Demand Clusters for ADHOC Large
ETL Jobs on GCP Google Cloud Platform using Python Flask.
We were working on a client site where a customer of ours
wanted to have a set of Trino clusters that would be used for specific high
memory ETL/ ELT jobs. They wanted to send a signal to a http server and in the background,
it will start a Trino Cluster up. The main purpose is to overcome the static
user query memory limits on a cluster that is being used 24x7. So went to work on a quick and easy fix.
The Main components required from the GCP infrastructure side are as follows.
- 1. Trino Master Machine VM
- 2. Dynamic Instance Group that can be scaled with a Trino worker image. (You can contact DELIVERBI team for this, if not already in place.
So now lets Assume you have a cluster that can Start and Stop from a command line. The clusters we design can be started and stopped as-well as have the capability to state the number of workers we require at any time with graceful shutdowns and quick start-ups.
Lets Have a look at the Service itself. Very basic idea but
works a treat. Invoke the starting and stopping of clusters from any ETL or
Orchestration tool such as Apache Airflow.
Let’s get to work and put together a small Virtual Machine with the following components
#Installations required Debian - Linux
apt get-install python3-pip
apt-get install jq
pip3 install --upgrade pip
pip3 install flask flask_shell2http
pip3 install waitress
You will require 2 shell scripts to start and stop the Trino Cluster.
trino-start-cluster , trino-stop-cluster these can be
found in our GitHub at the location above.
Now lets move onto creating a small python flask
program. Nothing difficult, This will execute shell scripts as per the URL we
use . Lets call it trino_app.py
from flask import Flask
from flask_executor import Executor
from flask_shell2http import Shell2HTTP
# Flask application instance
app = Flask(__name__)
executor = Executor(app)
shell2http = Shell2HTTP(app=app, executor=executor, base_url_prefix="/commands/")
def my_callback_fn(context, future):
# optional user-defined callback function
print(context, future.result())
return
#shell2http.register_command(endpoint="saythis", command_name="echo", callback_fn=my_callback_fn, decorators=[])
shell2http.register_command(endpoint="m1start", command_name="bash /trinoapp/trino-start-cluster.sh 1 5", callback_fn=my_callback_fn, decorators=[])
shell2http.register_command(endpoint="m1stop", command_name="bash /trinoapp/trino-stop-cluster.sh 1 5", callback_fn=my_callback_fn, decorators=[])
shell2http.register_command(endpoint="m2start", command_name="bash /trinoapp/trino-start-cluster.sh 2 3", callback_fn=my_callback_fn, decorators=[])
shell2http.register_command(endpoint="m2stop", command_name="bash /trinoapp/trino-stop-cluster.sh 2 3", callback_fn=my_callback_fn, decorators=[])
Start the Webserver to host the flask script. Create a script
called: run_trino_app.sh
#!/bin/bash
cd "$(dirname "$0")"
/usr/local/bin/waitress-serve --port=5000 trino_app:app
We can setup a systemd service for this so it will be easier
to start and stop the flask application.
# /etc/systemd/system/trino_app.service
[Unit]
Description=My Trino API API
After=network.target
[Service]
Type=simple
User=nanodano
WorkingDirectory=/root/trinoapp
ExecStart=/root/trinoapp/run_trino_app.sh
Restart=always
[Install]
WantedBy=multi-user.target
#Start and Stop the Trino App - Background process is
waitress.
systemctl start trino_app
systemctl stop trino_app
systemctl status trino_app
Check Service is up, and you can invoke a cluster manually
here are some sample commands. Here is a start command – just use m1stop to
stop once or whatever you have used for your URL path.
#Make a Call - initiate shell script.
curl -X POST http://ip:5000/commands/m1start
#Check Result - You cant make another call to same api
unless result comes back.Use the key from the above call
curl http://ip:5000/commands/m1stop?key=864d8355
The the main server for taking the incoming HTTP API
calls is all done above and should be working.
Now let’s move onto the client side or airflow. We want
the call to the API to hold whilst the server is coming up as a first step in
the ETL process. So for this we introduced a client side script. This will wait
until the Trino Cluster is fully up and then the ETL process can begin Once the
ETL process has completed as a last step we can invoke the cluster to be
shutdown.
Client-side script called client-side-call-example can be
found in the GitHub repo.
We hope this helps other clients and organisations to
save on cluster costs by using on demand clusters. We have Gracefull shutdown
and Automatic Scaling solutions too for Trino using GCP. Our Auto scaling
solutions monitor the cluster and adust the number of workers throughout the
day based on memory usage and number of queries etc.
DELIVERBI
Shahed and Krishna
Note: only a member of this blog may post a comment.