Skip to main content
Version: Next

Delta Lake

Incubating

Important Capabilities

CapabilityStatusNotes
Extract TagsCan extract S3 object/bucket and Azure blob/container tags if enabled

This plugin extracts:

  • Column types and schema associated with each delta table
  • Custom properties: number_of_files, partition_columns, table_creation_time, location, version etc.
caution

If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.

CLI based Ingestion

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: delta-lake
config:
env: "PROD"
platform_instance: "my-delta-lake"
base_path: "/path/to/data/folder"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
base_path 
string
Path to table (s3 or local file system). If path is not a delta table path then all subfolders will be scanned to detect and ingest delta tables.
platform
Enum
One of: "delta-lake"
Default: delta-lake
platform_instance
string
The instance of the platform that all assets produced by this recipe belong to
relative_path
string
If set, delta-tables will be searched at location '<base_path>/<relative_path>' and URNs will be created using relative_path only.
require_files
boolean
Whether DeltaTable should track files. Consider setting this to False for large delta tables, resulting in significant memory reduction for ingestion process.When set to False, number_of_files in delta table can not be reported.
Default: True
version_history_lookback
integer
Number of previous version histories to be ingested. Defaults to 1. If set to -1 all version history will be ingested.
Default: 1
env
string
The environment that all assets produced by this connector belong to
Default: PROD
azure
Azure
Azure specific configuration
azure.use_abs_blob_properties
boolean
Whether or not to create properties in datahub from the Azure blob
Default: False
azure.use_abs_blob_tags
boolean
Whether or not to create tags in datahub from Azure blob tags
Default: False
azure.use_abs_container_properties
boolean
Whether or not to create properties in datahub from the Azure blob container
Default: False
azure.azure_config
AzureConnectionConfig
Azure configuration
azure.azure_config.account_key
string
Azure storage account access key.
azure.azure_config.account_name
string
Name of the Azure storage account. See Microsoft official documentation on how to create a storage account.
azure.azure_config.base_path
string
Base folder in hierarchical namespaces to start from.
Default: /
azure.azure_config.client_id
string
Azure client (Application) ID for service principal auth.
azure.azure_config.client_secret
string
Azure client secret for service principal auth.
azure.azure_config.container_name
string
Azure storage account container name.
azure.azure_config.sas_token
string
Azure storage account SAS token.
azure.azure_config.tenant_id
string
Azure tenant ID required for service principal auth.
azure.azure_config.use_cli_auth
boolean
Whether to authenticate using the Azure CLI.
Default: False
azure.azure_config.use_managed_identity
boolean
Whether to use Azure Managed Identity authentication.
Default: False
s3
S3
S3 specific configuration
s3.use_s3_bucket_tags
boolean
Whether or not to create tags in datahub from the s3 bucket
Default: False
s3.use_s3_object_tags
boolean
# Whether or not to create tags in datahub from the s3 object
Default: False
s3.aws_config
AwsConnectionConfig
AWS configuration
s3.aws_config.aws_access_key_id
string
AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details.
s3.aws_config.aws_advanced_config
object
Advanced AWS configuration options. These are passed directly to botocore.config.Config.
s3.aws_config.aws_endpoint_url
string
The AWS service endpoint. This is normally constructed automatically, but can be overridden here.
s3.aws_config.aws_profile
string
The named profile to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config.
s3.aws_config.aws_proxy
map(str,string)
s3.aws_config.aws_region
string
AWS region code.
s3.aws_config.aws_retry_mode
Enum
One of: "legacy", "standard", "adaptive"
Default: standard
s3.aws_config.aws_retry_num
integer
Number of times to retry failed AWS requests. See the botocore.retry docs for details.
Default: 5
s3.aws_config.aws_secret_access_key
string
AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details.
s3.aws_config.aws_session_token
string
AWS session token. Can be auto-detected, see the AWS boto3 docs for details.
s3.aws_config.read_timeout
number
The timeout for reading from the connection (in seconds).
Default: 60
s3.aws_config.aws_role
One of string, array
AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as boto3's STS.Client.assume_role.
s3.aws_config.aws_role.union
One of string, AwsAssumeRoleConfig
s3.aws_config.aws_role.union.RoleArn 
string
ARN of the role to assume.
s3.aws_config.aws_role.union.ExternalId
string
External ID to use when assuming the role.
table_pattern
AllowDenyPattern
regex patterns for tables to filter in ingestion.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
table_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
table_pattern.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
table_pattern.allow.string
string
table_pattern.deny
array
List of regex patterns to exclude from ingestion.
Default: []
table_pattern.deny.string
string

Usage Guide

If you are new to Delta Lake and want to test out a simple integration with Delta Lake and DataHub, you can follow this guide.

Delta Table on Local File System

Step 1

Create a delta table using the sample PySpark code below if you don't have a delta table you can point to.

import uuid
import random
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

table_path = "quickstart/my-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)

df = spark.read.format("delta").load(table_path)
df.show()

Step 2

Create a datahub ingestion yaml file (delta.dhub.yaml) to ingest metadata from the delta table you just created.

source:
type: "delta-lake"
config:
base_path: "quickstart/my-table"

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"

Note: Make sure you run the Spark code as well as recipe from same folder otherwise use absolute paths.

Step 3

Execute the ingestion recipe:

datahub ingest -c delta.dhub.yaml

Delta Table on S3

Step 1

Set up your AWS credentials by creating an AWS credentials config file; typically in '$HOME/.aws/credentials'.

[my-creds]
aws_access_key_id: ######
aws_secret_access_key: ######

Step 2: Create a Delta Table using the PySpark sample code below unless you already have Delta Tables on your S3.

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from configparser import ConfigParser
import uuid
import random
def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()


config_object = ConfigParser()
config_object.read("$HOME/.aws/credentials")
profile_info = config_object["my-creds"]
access_id = profile_info["aws_access_key_id"]
access_key = profile_info["aws_secret_access_key"]

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)

table_path = "s3a://my-bucket/my-folder/sales-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()

Step 3

Create a datahub ingestion yaml file (delta.s3.dhub.yaml) to ingest metadata from the delta table you just created.

source:
type: "delta-lake"
config:
base_path: "s3://my-bucket/my-folder/sales-table"
s3:
aws_config:
aws_access_key_id: <<Access key>>
aws_secret_access_key: <<secret key>>

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"

Step 4

Execute the ingestion recipe:

datahub ingest -c delta.s3.dhub.yaml

Note

The above recipes are minimal recipes. Please refer to Config Details section for the full configuration.

Code Coordinates

  • Class Name: datahub.ingestion.source.delta_lake.source.DeltaLakeSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Delta Lake, feel free to ping us on our Slack.