Table of Contents
- Introduction to Workflow Orchestration
- Getting Started with Kestra
- Building ETL Data Pipelines
- ELT Pipelines with Google Cloud Platform
- Using AI for Data Engineering
- Deploying to the Cloud
Introduction to Workflow Orchestration
What is Workflow Orchestration?
Think of a music orchestra: multiple instruments with different roles, all coordinated by a conductor to play together in harmony. Workflow orchestration applies the same concept to data tools and platforms.
A workflow orchestrator coordinates multiple tools and systems, ensuring they work together at the right time. It handles:
- Running workflows with predefined steps
- Monitoring and logging errors, with automated responses when they occur
- Scheduling workflows based on time or events
- Managing dependencies between tasks and data flows
In data engineering, we often need to move data from one place to another, sometimes transforming it along the way. An orchestrator manages these steps while providing visibility into the entire process.
Common orchestration scenarios:
- Extract data from an API daily at 6 AM
- Transform and load data after new files arrive in cloud storage
- Retry failed tasks with exponential backoff
- Send alerts when pipelines fail
What is Kestra?
Kestra is an open-source, event-driven orchestration platform designed for building both scheduled and event-driven workflows.
Key features:
- Flow as Code (YAML) - Define workflows declaratively, no complex programming required
- 1000+ Plugins - Integrate with databases, cloud providers, APIs, and more
- Language Agnostic - Run Python, Shell, R, Node.js, or any language
- Flexible Triggers - Schedule-based or event-based execution
- Visual UI - Monitor, debug, and manage workflows from a web interface
- AI Copilot - Generate workflows using natural language
Getting Started with Kestra
Installing Kestra with Docker Compose
Kestra runs as a Docker container alongside a Postgres database for storing metadata. We can extend the setup from Module 1 to include Kestra.
Docker Compose Configuration
The docker-compose.yml file sets up four services:
- pgdatabase - Postgres for our taxi data (from Module 1)
- pgadmin - Web UI for managing Postgres
- kestra_postgres - Dedicated Postgres for Kestra’s internal state
- kestra - The orchestration server
services:
pgdatabase:
image: postgres:17
environment:
POSTGRES_USER: "root"
POSTGRES_PASSWORD: "root"
POSTGRES_DB: "ny_taxi"
volumes:
- "ny_taxi_postgres_data:/var/lib/postgresql/data"
ports:
- "9868:5432"
networks:
- pg-network
healthcheck:
test: [ "CMD-SHELL", "pg_isready -d ny_taxi -U root" ]
interval: 10s
timeout: 5s
retries: 5
pgadmin:
image: dpage/pgadmin4
environment:
PGADMIN_DEFAULT_EMAIL: "admin@admin.com"
PGADMIN_DEFAULT_PASSWORD: "root"
volumes:
- "pgadmin_data:/var/lib/pgadmin"
ports:
- "8085:80"
networks:
- pg-network
depends_on:
- pgdatabase
kestra_postgres:
image: postgres:18
volumes:
- kestra_postgres_data:/var/lib/postgresql
environment:
POSTGRES_DB: kestra
POSTGRES_USER: kestra
POSTGRES_PASSWORD: k3str4
healthcheck:
test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"]
interval: 30s
timeout: 10s
retries: 10
networks:
- pg-network
kestra:
image: kestra/kestra:v1.1
pull_policy: always
user: "root"
command: server standalone
volumes:
- kestra_data:/app/storage
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/kestra-wd:/tmp/kestra-wd
environment:
KESTRA_CONFIGURATION: |
datasources:
postgres:
url: jdbc:postgresql://kestra_postgres:5432/kestra
driverClassName: org.postgresql.Driver
username: kestra
password: k3str4
kestra:
server:
basicAuth:
username: "admin@kestra.io"
password: Admin1234!
repository:
type: postgres
storage:
type: local
local:
basePath: "/app/storage"
queue:
type: postgres
tasks:
tmpDir:
path: /tmp/kestra-wd/tmp
url: http://localhost:8080/
ports:
- "8080:8080"
- "8082:8081"
networks:
- pg-network
depends_on:
kestra_postgres:
condition: service_healthy
volumes:
ny_taxi_postgres_data:
pgadmin_data:
kestra_postgres_data:
kestra_data:
networks:
pg-network:
driver: bridge
Starting Kestra
cd module-2/kestra
docker compose up -d
Access the Kestra UI at http://localhost:8080
Default credentials:
- Username:
admin@kestra.io - Password:
Admin1234!
To shut down:
docker compose down
Troubleshooting:
- If port 8080 is in use (e.g., by pgAdmin), modify the Kestra ports to something like
18080:8080 - Always pin Kestra to version
v1.1for reproducibility; avoiddevelopimages
Kestra Concepts
Understanding these core concepts is essential for building workflows in Kestra:
Flow
A Flow is a container for tasks and their orchestration logic. Flows are defined in YAML.
id: my_first_flow
namespace: zoomcamp
description: A simple hello world flow
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello, World!
Tasks
Tasks are the individual steps within a flow. Each task has:
- A unique
id - A
typethat determines what it does - Configuration properties specific to that task type
tasks:
- id: extract_data
type: io.kestra.plugin.core.http.Download
uri: https://example.com/data.csv
- id: log_result
type: io.kestra.plugin.core.log.Log
message: "Downloaded file: "
Inputs
Inputs are dynamic values passed to a flow at runtime. They allow flows to be reusable with different parameters.
inputs:
- id: taxi_type
type: SELECT
required: true
values:
- yellow
- green
defaults: yellow
- id: year
type: INT
required: true
defaults: 2021
- id: month
type: INT
required: true
defaults: 1
Access inputs in tasks using: ``
Outputs
Outputs pass data between tasks. Each task can produce outputs that subsequent tasks consume.
tasks:
- id: return_value
type: io.kestra.plugin.core.debug.Return
format: "The year is "
- id: log_output
type: io.kestra.plugin.core.log.Log
message: ""
Triggers
Triggers automatically start flow executions based on schedules or events.
Schedule Trigger - Run at specific times (cron syntax):
triggers:
- id: daily_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 9 * * *" # Every day at 9 AM UTC
Flow Trigger - Run when another flow completes:
triggers:
- id: after_extraction
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlowCondition
flowId: extraction_flow
namespace: zoomcamp
Variables
Variables are key-value pairs for reusing values across tasks.
variables:
base_url: "https://github.com/DataTalksClub/nyc-tlc-data/releases/download"
file_name: "_tripdata_-.csv.gz"
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: "//"
Plugin Defaults
Plugin Defaults apply default values to all tasks of a specific type.
pluginDefaults:
- type: io.kestra.plugin.core.log.Log
values:
level: ERROR
Concurrency
Concurrency controls how many executions of a flow can run simultaneously.
concurrency:
limit: 2
behavior: FAIL # Options: QUEUE, CANCEL, FAIL
Execution
An Execution is a single run of a flow. Each execution has:
- A unique ID
- A state (RUNNING, SUCCESS, FAILED, etc.)
- Logs and outputs from each task
- A Gantt chart showing task timing
Orchestrating Python Code
Kestra can execute Python scripts directly within workflows, allowing you to leverage existing Python code or write custom logic.
Running Python in Kestra
id: python_example
namespace: zoomcamp
tasks:
- id: run_python
type: io.kestra.plugin.scripts.python.Commands
docker:
image: python:3.11-slim
beforeCommands:
- pip install requests
commands:
- python main.py
namespaceFiles:
enabled: true
Inline Python with Outputs
You can write Python directly in the YAML and capture outputs:
tasks:
- id: fetch_dockerhub_pulls
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:3.11-slim
beforeCommands:
- pip install requests
script: |
import requests
response = requests.get("https://hub.docker.com/v2/repositories/kestra/kestra")
data = response.json()
pull_count = data.get("pull_count", 0)
# Output the value to Kestra
print(f"::set-output name=pulls::{pull_count}")
- id: log_pulls
type: io.kestra.plugin.core.log.Log
message: "Docker pulls: "
Key points:
docker.imagespecifies the container image for running the scriptbeforeCommandsinstalls dependencies before the main script runs- Use
::set-output name=KEY::VALUEto pass values to Kestra as outputs - Access outputs via ``
Building ETL Data Pipelines
Getting Started Pipeline
Before working with the taxi dataset, let’s understand a simple ETL pipeline pattern:
Extract (HTTP API) → Transform (Python) → Query (DuckDB)
This flow demonstrates:
- Extract - Download data from a REST API
- Transform - Process data with Python
- Query - Analyze with DuckDB (in-memory SQL)
id: getting_started_pipeline
namespace: zoomcamp
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://dummyjson.com/products
- id: transform
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:3.11-slim
beforeCommands:
- pip install pandas
script: |
import pandas as pd
import json
with open("", "r") as f:
data = json.load(f)
df = pd.DataFrame(data["products"])
df.to_csv("products.csv", index=False)
outputFiles:
- products.csv
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
SELECT category, COUNT(*) as count, AVG(price) as avg_price
FROM read_csv_auto('')
GROUP BY category
ORDER BY count DESC
Loading Taxi Data to Postgres
Now we build a real ETL pipeline for NYC Yellow and Green Taxi data.
Pipeline Architecture
Select Year/Month → Extract CSV → Create Tables → Load Data → Merge to Final Table
Flow Definition
id: postgres_taxi
namespace: zoomcamp
inputs:
- id: taxi
type: SELECT
required: true
values:
- yellow
- green
defaults: yellow
- id: year
type: INT
required: true
defaults: 2019
- id: month
type: INT
required: true
defaults: 1
variables:
file_name: "_tripdata_-.csv.gz"
staging_table: "_tripdata__"
final_table: "_tripdata"
data_url: "https://github.com/DataTalksClub/nyc-tlc-data/releases/download//"
tasks:
- id: set_label
type: io.kestra.plugin.core.execution.Labels
labels:
taxi: ""
file: ""
- id: extract
type: io.kestra.plugin.core.http.Download
uri: ""
- id: create_final_table
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://pgdatabase:5432/ny_taxi
username: root
password: root
sql: |
CREATE TABLE IF NOT EXISTS (
unique_row_id TEXT,
filename TEXT,
VendorID INT,
tpep_pickup_datetime TIMESTAMP,
tpep_dropoff_datetime TIMESTAMP,
passenger_count INT,
trip_distance DOUBLE PRECISION,
RatecodeID INT,
store_and_fwd_flag TEXT,
PULocationID INT,
DOLocationID INT,
payment_type INT,
fare_amount DOUBLE PRECISION,
extra DOUBLE PRECISION,
mta_tax DOUBLE PRECISION,
tip_amount DOUBLE PRECISION,
tolls_amount DOUBLE PRECISION,
improvement_surcharge DOUBLE PRECISION,
total_amount DOUBLE PRECISION,
congestion_surcharge DOUBLE PRECISION
);
- id: create_staging_table
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://pgdatabase:5432/ny_taxi
username: root
password: root
sql: |
CREATE TABLE IF NOT EXISTS (
LIKE INCLUDING ALL
);
TRUNCATE TABLE ;
- id: load_data
type: io.kestra.plugin.jdbc.postgresql.CopyIn
url: jdbc:postgresql://pgdatabase:5432/ny_taxi
username: root
password: root
table: ""
from: ""
format: CSV
header: true
delimiter: ","
- id: merge_to_final
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://pgdatabase:5432/ny_taxi
username: root
password: root
sql: |
INSERT INTO
SELECT * FROM
ON CONFLICT (unique_row_id) DO NOTHING;
Understanding the Flow
- Inputs - User selects taxi type, year, and month
- Variables - Construct file names and URLs dynamically
- Extract - Download CSV from GitHub releases
- Create Tables - Ensure destination tables exist
- Load - Use
CopyInfor bulk insert (faster than row-by-row) - Merge - Insert new records, skip duplicates
Scheduling and Backfills
Adding a Schedule Trigger
triggers:
- id: daily_at_9am
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 9 * * *"
inputs:
taxi: green
Backfilling Historical Data
Kestra supports backfilling - running a scheduled flow for past dates as if the schedule had been active.
Steps to backfill:
- Go to the flow in Kestra UI
- Click the Triggers tab
- Select the schedule trigger
- Click Backfill
- Choose the date range (e.g., 2019-01-01 to 2019-12-31)
- Kestra creates executions for each scheduled interval
This is essential for:
- Loading historical data when setting up a new pipeline
- Reprocessing data after fixing bugs
- Filling gaps from downtime
ELT Pipelines with Google Cloud Platform
ETL vs ELT
In Module 1 and earlier sections, we used ETL (Extract, Transform, Load):
- Extract from source
- Transform with Python/Pandas locally
- Load to Postgres
For large datasets and cloud warehouses, ELT (Extract, Load, Transform) is often better:
- Extract from source
- Load raw data to cloud storage (data lake)
- Transform using the warehouse’s compute power
Why ELT for BigQuery?
- BigQuery handles transformations on petabytes of data
- No local memory constraints
- SQL-based transformations are declarative and auditable
- Separation of storage (GCS) and compute (BigQuery)
ETL: Source → Transform (local) → Load (DB)
ELT: Source → Load (GCS) → Transform (BigQuery)
Setting Up GCP
Prerequisites
- GCP Project - Create or use existing project
- Service Account - With permissions for GCS and BigQuery
- API Keys - Download JSON credentials
Required IAM Roles
roles/storage.admin- Create buckets and upload filesroles/bigquery.admin- Create datasets and tables
Configuring Kestra with GCP Credentials
Store credentials in Kestra’s KV Store (key-value storage):
id: gcp_setup_kv
namespace: zoomcamp
tasks:
- id: set_gcp_project
type: io.kestra.plugin.core.kv.Set
key: GCP_PROJECT_ID
value: your-project-id
- id: set_gcp_location
type: io.kestra.plugin.core.kv.Set
key: GCP_LOCATION
value: EU
- id: set_bucket_name
type: io.kestra.plugin.core.kv.Set
key: GCP_BUCKET_NAME
value: your-bucket-name
- id: set_dataset
type: io.kestra.plugin.core.kv.Set
key: GCP_DATASET
value: zoomcamp_dataset
Creating GCS Bucket and BigQuery Dataset
id: gcp_create_resources
namespace: zoomcamp
tasks:
- id: create_bucket
type: io.kestra.plugin.gcp.gcs.CreateBucket
projectId: ""
name: ""
location: ""
- id: create_dataset
type: io.kestra.plugin.gcp.bigquery.CreateDataset
projectId: ""
dataset: ""
location: ""
Loading Taxi Data to BigQuery
ELT Pipeline Architecture
Extract CSV → Upload to GCS → Create External Table → Create Staging Table → Merge to Final
Flow Definition
id: gcp_taxi
namespace: zoomcamp
inputs:
- id: taxi
type: SELECT
values: [yellow, green]
defaults: yellow
- id: year
type: INT
defaults: 2019
- id: month
type: INT
defaults: 1
variables:
file_name: "_tripdata_-.csv.gz"
gcs_path: "gs:////"
project_id: ""
dataset: ""
table_ext: "_tripdata___ext"
table_tmp: "_tripdata__"
table_final: "_tripdata"
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: "https://github.com/DataTalksClub/nyc-tlc-data/releases/download//"
- id: upload_to_gcs
type: io.kestra.plugin.gcp.gcs.Upload
from: ""
to: ""
- id: create_final_table
type: io.kestra.plugin.gcp.bigquery.Query
projectId: ""
sql: |
CREATE TABLE IF NOT EXISTS `..` (
unique_row_id STRING,
filename STRING,
VendorID INT64,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
passenger_count INT64,
trip_distance FLOAT64,
-- ... additional columns
)
PARTITION BY DATE(pickup_datetime);
- id: create_external_table
type: io.kestra.plugin.gcp.bigquery.Query
projectId: ""
sql: |
CREATE OR REPLACE EXTERNAL TABLE `..`
OPTIONS (
format = 'CSV',
uris = [''],
skip_leading_rows = 1
);
- id: create_staging_table
type: io.kestra.plugin.gcp.bigquery.Query
projectId: ""
sql: |
CREATE OR REPLACE TABLE `..` AS
SELECT
GENERATE_UUID() as unique_row_id,
'' as filename,
*
FROM `..`;
- id: merge_to_final
type: io.kestra.plugin.gcp.bigquery.Query
projectId: ""
sql: |
MERGE `..` T
USING `..` S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN INSERT ROW;
- id: cleanup
type: io.kestra.plugin.gcp.gcs.Delete
uri: ""
Key Differences from Postgres Pipeline
| Aspect | Postgres (ETL) | BigQuery (ELT) |
|---|---|---|
| Transform location | Local Python | BigQuery SQL |
| Intermediate storage | None | GCS (data lake) |
| Loading mechanism | COPY command | External tables |
| Scalability | Limited by memory | Serverless, petabyte-scale |
Scheduling GCP Workflows
triggers:
- id: yellow_daily
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 10 * * *" # 10 AM UTC
inputs:
taxi: yellow
- id: green_daily
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 9 * * *" # 9 AM UTC
inputs:
taxi: green
Backfilling in GCP: Since BigQuery handles large datasets efficiently, you can backfill the entire historical dataset (2019-2024) without worrying about local resource constraints.
Using AI for Data Engineering
Why AI for Workflows?
AI tools can accelerate workflow development by:
- Generating boilerplate YAML from natural language descriptions
- Reducing documentation lookup - AI knows plugin syntax
- Catching errors - Suggest corrections for invalid configurations
However, AI quality depends on context. Generic AI assistants often produce outdated or incorrect Kestra syntax because they lack knowledge of recent updates.
Context Engineering
The Problem with Generic AI
When you ask ChatGPT to create a Kestra flow without context:
- It uses training data that may be months/years old
- Plugin names, properties, and syntax may have changed
- The generated YAML may not work
Example prompt:
Create a Kestra flow that loads NYC taxi data from CSV to BigQuery
Common issues in the response:
- Outdated task types (renamed plugins)
- Non-existent properties
- Incorrect YAML structure
The Solution: Context-Aware AI
Provide the AI with:
- Current documentation
- Working examples
- Plugin specifications
This is what Kestra’s AI Copilot does automatically.
Kestra AI Copilot
Kestra’s built-in AI Copilot is trained on current Kestra documentation and plugin specifications.
Setup
Add Gemini API configuration to your Docker Compose:
services:
kestra:
environment:
KESTRA_CONFIGURATION: |
kestra:
ai:
type: gemini
gemini:
model-name: gemini-2.5-flash
api-key: ${GEMINI_API_KEY}
Get an API key from Google AI Studio.
Restart Kestra:
export GEMINI_API_KEY="your-api-key"
docker compose up -d
Using AI Copilot
- Open the Kestra UI
- Create or edit a flow
- Click the sparkle icon (✨) in the code editor
- Describe what you want in natural language
Example prompt:
Create a flow that downloads a CSV file, transforms it with Python to filter rows where amount > 100, and saves to PostgreSQL
The Copilot generates working YAML with correct plugin types and properties.
Retrieval Augmented Generation (RAG)
What is RAG?
RAG (Retrieval Augmented Generation) grounds AI responses in real data:
- Retrieve - Find relevant documents from a knowledge base
- Augment - Add retrieved context to the prompt
- Generate - LLM produces an answer using the context
This eliminates hallucinations by ensuring the AI has access to current, accurate information.
RAG in Kestra
Kestra provides tasks for building RAG pipelines:
id: chat_with_rag
namespace: zoomcamp
tasks:
- id: ingest_docs
type: io.kestra.plugin.ai.rag.Ingest
provider:
type: gemini
modelName: gemini-2.5-flash
apiKey: ""
documents:
- uri: https://kestra.io/docs/release-notes/1.1
type: HTML
vectorStore:
type: kv
prefix: kestra_docs
- id: query
type: io.kestra.plugin.ai.rag.Query
provider:
type: gemini
modelName: gemini-2.5-flash
apiKey: ""
vectorStore:
type: kv
prefix: kestra_docs
prompt: "What are the new features in Kestra 1.1?"
How It Works
Ask Question → Search Vector Store → Find Similar Documents →
Add to Prompt → LLM Generates Grounded Answer
Without RAG: AI may hallucinate features that don’t exist With RAG: AI response is based on actual documentation
RAG Best Practices
- Keep documents updated - Re-ingest when sources change
- Chunk appropriately - Split large documents into meaningful sections
- Test retrieval - Verify correct documents are found for queries
- Use embeddings wisely - Choose embedding models suited to your domain
Deploying to the Cloud
For production workloads, deploy Kestra to a cloud environment.
Options
- Google Cloud Run - Serverless container hosting
- Google Kubernetes Engine (GKE) - Managed Kubernetes
- Compute Engine - Traditional VMs
GitOps Integration
Sync flows from a Git repository:
id: git_sync
namespace: system
triggers:
- id: on_push
type: io.kestra.plugin.core.trigger.Webhook
tasks:
- id: pull_flows
type: io.kestra.plugin.git.Clone
url: https://github.com/your-org/kestra-flows.git
branch: main
- id: deploy_flows
type: io.kestra.plugin.core.flow.Import
directory: "/flows"
Security Considerations
- Never commit credentials to Git
- Use Secrets for sensitive values
- Use KV Store for configuration
- Enable RBAC for multi-user environments
- Use HTTPS in production
Resources
Documentation
- Kestra Docs
- Kestra Blueprints - Pre-built examples
- Kestra Plugins - 600+ integrations
Community
Video Playlist
Key Takeaways
- Workflow orchestration coordinates multiple tools and data pipelines
- Kestra uses declarative YAML for defining workflows
- Core concepts: Flows, Tasks, Inputs, Outputs, Triggers, Variables
- ETL transforms locally before loading; ELT loads first, transforms in the warehouse
- Scheduling and backfills enable automated and historical data processing
- AI Copilot accelerates development with context-aware code generation
- RAG grounds AI responses in current documentation