Skip to main content

Databricks Connect

Databricks Connect allows you to centralize your Python code in your Dagster project while executing Spark workloads remotely on a Databricks cluster. Unlike job submission approaches, your code runs in the Dagster process, but Spark operations execute on Databricks compute.

When to use Databricks Connect

Databricks Connect is best for:

  • Interactive development and quick iterations
  • Centralized code that doesn't need deployment to Databricks
  • Moderate-sized workloads where you want simpler debugging
  • Greenfield Databricks use cases

It's not suitable for:

  • Large batch jobs that should run independently
  • Long-running workloads that would block the Dagster process
  • Scenarios where network connectivity to Databricks is unreliable

Step 1. Prepare a Dagster project

  1. To begin, you'll need a Dagster project. You can use an existing project or create a new one.

  2. Activate your project virtual environment:

    source .venv/bin/activate
  3. Add the databricks-connect library to your project:

    uv add databricks-connect
  4. Configure your environment:

    export DATABRICKS_HOST=https://dbc-xxxxxxx-yyyy.cloud.databricks.com/
    export DATABRICKS_TOKEN=<your-personal-access-token>

Step 2: Write a script to run Spark operations on Databricks

Next, write a Python script that connects to Databricks to run Spark operations.

In the example below:

  • The Python code runs in your Dagster deployment
  • Spark DataFrame operations execute remotely on Databricks
  • You have direct access to the Spark API within your asset functions
  • There is no job submission overhead for interactive workloads
src/<project-name>/defs/databricks-assets.py
import os

from databricks.connect import DatabricksSession
from pyspark.sql import SparkSession

import dagster as dg

# Create the Databricks session resource
databricks_session = (
DatabricksSession.builder.remote(
host=dg.EnvVar("DATABRICKS_HOST"),
token=dg.EnvVar("DATABRICKS_TOKEN"),
)
.serverless()
.getOrCreate()
)


@dg.definitions
def resources():
return dg.Definitions(resources={"spark": databricks_session})


@dg.asset
def my_spark_asset(
context: dg.AssetExecutionContext, spark: dg.ResourceParam[SparkSession]
):
# This code runs in Dagster, but Spark operations execute on Databricks
df = spark.sql("SELECT * FROM catalog.schema.table")
result = df.filter(df.status == "active").count()
return dg.MaterializeResult(metadata={"row_count": result})