Databricks Connect
Databricks Connect allows you to centralize your Python code in your Dagster project while executing Spark workloads remotely on a Databricks cluster. Unlike job submission approaches, your code runs in the Dagster process, but Spark operations execute on Databricks compute.
When to use Databricks Connect
Databricks Connect is best for:
- Interactive development and quick iterations
- Centralized code that doesn't need deployment to Databricks
- Moderate-sized workloads where you want simpler debugging
- Greenfield Databricks use cases
It's not suitable for:
- Large batch jobs that should run independently
- Long-running workloads that would block the Dagster process
- Scenarios where network connectivity to Databricks is unreliable
Step 1. Prepare a Dagster project
-
To begin, you'll need a Dagster project. You can use an existing project or create a new one.
-
Activate your project virtual environment:
source .venv/bin/activate -
Add the
databricks-connectlibrary to your project:- uv
- pip
uv add databricks-connectpip install databricks-connect -
Configure your environment:
export DATABRICKS_HOST=https://dbc-xxxxxxx-yyyy.cloud.databricks.com/
export DATABRICKS_TOKEN=<your-personal-access-token>
Step 2: Write a script to run Spark operations on Databricks
Next, write a Python script that connects to Databricks to run Spark operations.
In the example below:
- The Python code runs in your Dagster deployment
- Spark DataFrame operations execute remotely on Databricks
- You have direct access to the Spark API within your asset functions
- There is no job submission overhead for interactive workloads
src/<project-name>/defs/databricks-assets.py
import os
from databricks.connect import DatabricksSession
from pyspark.sql import SparkSession
import dagster as dg
# Create the Databricks session resource
databricks_session = (
DatabricksSession.builder.remote(
host=dg.EnvVar("DATABRICKS_HOST"),
token=dg.EnvVar("DATABRICKS_TOKEN"),
)
.serverless()
.getOrCreate()
)
@dg.definitions
def resources():
return dg.Definitions(resources={"spark": databricks_session})
@dg.asset
def my_spark_asset(
context: dg.AssetExecutionContext, spark: dg.ResourceParam[SparkSession]
):
# This code runs in Dagster, but Spark operations execute on Databricks
df = spark.sql("SELECT * FROM catalog.schema.table")
result = df.filter(df.status == "active").count()
return dg.MaterializeResult(metadata={"row_count": result})