Translate

Friday, 25 November 2022

Shahed

Trino - On Demand Clusters for ADHOC Large ETL Jobs on GCP Google Cloud Platform using Python Flask

 

Creating a Basic Trino Service to Start On Demand Clusters for ADHOC Large ETL Jobs on GCP Google Cloud Platform using Python Flask.

 

We were working on a client site where a customer of ours wanted to have a set of Trino clusters that would be used for specific high memory ETL/ ELT jobs. They wanted to send a signal to a http server and in the background, it will start a Trino Cluster up. The main purpose is to overcome the static user query memory limits on a cluster that is being used 24x7. So went to work on a quick and easy fix.

 


 

The Main components required from the GCP infrastructure side are as follows.

  • 1.      Trino Master Machine VM
  • 2.      Dynamic Instance Group that can be scaled with a Trino worker image. (You can contact DELIVERBI team for this, if not already in place.

 

So now lets Assume you have a cluster that can Start and Stop from a command line. The clusters we design can be started and stopped as-well as have the capability to state the number of workers we require at any time with graceful shutdowns and quick start-ups.

Lets Have a look at the Service itself. Very basic idea but works a treat. Invoke the starting and stopping of clusters from any ETL or Orchestration tool such as Apache Airflow.

All scripts are available on our GitHub : https://github.com/deliverbi/trino-api-gateway-on-demand-clusters

Let’s get to work and put together a small Virtual Machine with the following components

 

#Installations required Debian - Linux 

apt get-install python3-pip

apt-get install jq

pip3 install --upgrade pip

pip3 install flask flask_shell2http

pip3 install waitress

 

You will require 2 shell scripts to start and stop the Trino Cluster.

trino-start-cluster , trino-stop-cluster these can be found in our GitHub at the location above.

 

Now lets move onto creating a small python flask program. Nothing difficult, This will execute shell scripts as per the URL we use . Lets call it trino_app.py


 

from flask import Flask

from flask_executor import Executor

from flask_shell2http import Shell2HTTP


# Flask application instance


app = Flask(__name__)

executor = Executor(app)

shell2http = Shell2HTTP(app=app, executor=executor, base_url_prefix="/commands/")


 def my_callback_fn(context, future):

  # optional user-defined callback function

  print(context, future.result())

  return 


#shell2http.register_command(endpoint="saythis", command_name="echo", callback_fn=my_callback_fn, decorators=[])

shell2http.register_command(endpoint="m1start", command_name="bash /trinoapp/trino-start-cluster.sh 1 5", callback_fn=my_callback_fn, decorators=[])

shell2http.register_command(endpoint="m1stop", command_name="bash /trinoapp/trino-stop-cluster.sh 1 5", callback_fn=my_callback_fn, decorators=[])

shell2http.register_command(endpoint="m2start", command_name="bash /trinoapp/trino-start-cluster.sh 2 3", callback_fn=my_callback_fn, decorators=[])

shell2http.register_command(endpoint="m2stop", command_name="bash /trinoapp/trino-stop-cluster.sh 2 3", callback_fn=my_callback_fn, decorators=[])

 

Start the Webserver to host the flask script. Create a script called: run_trino_app.sh




#!/bin/bash

cd "$(dirname "$0")"

/usr/local/bin/waitress-serve --port=5000 trino_app:app


We can setup a systemd service for this so it will be easier to start and stop the flask application.


# /etc/systemd/system/trino_app.service

[Unit]

Description=My Trino API API

After=network.target

[Service]

Type=simple

User=nanodano

WorkingDirectory=/root/trinoapp

ExecStart=/root/trinoapp/run_trino_app.sh

Restart=always

[Install]

WantedBy=multi-user.target

#Start and Stop the Trino App - Background process is waitress.

systemctl start trino_app

systemctl stop trino_app

systemctl status trino_app

 

 

Check Service is up, and you can invoke a cluster manually here are some sample commands. Here is a start command – just use m1stop to stop once or whatever you have used for your URL path.

 

#Make a Call - initiate shell script.

 

curl -X POST http://ip:5000/commands/m1start

 

#Check Result - You cant make another call to same api unless result comes back.Use the key from the above call

 

curl http://ip:5000/commands/m1stop?key=864d8355

 

The the main server for taking the incoming HTTP API calls is all done above and should be working.

 

Now let’s move onto the client side or airflow. We want the call to the API to hold whilst the server is coming up as a first step in the ETL process. So for this we introduced a client side script. This will wait until the Trino Cluster is fully up and then the ETL process can begin Once the ETL process has completed as a last step we can invoke the cluster to be shutdown.

 

Client-side script called client-side-call-example can be found in the GitHub repo.

 

We hope this helps other clients and organisations to save on cluster costs by using on demand clusters. We have Gracefull shutdown and Automatic Scaling solutions too for Trino using GCP. Our Auto scaling solutions monitor the cluster and adust the number of workers throughout the day based on memory usage and number of queries etc.

 

DELIVERBI

 

Shahed and Krishna

 

 

 

 

 

 

 

 

 

 

 

 

 

Note: only a member of this blog may post a comment.



About Authors

Shahed Munir

Krishna Udathu

Shahed and Krishna are Oracle / Big Data Experts