Sling allows you to use custom DuckDB SQL statements to read from files, giving you more control over the data ingestion process. This is particularly useful when you need to perform transformations or filtering during the read operation.
CLI Flags Examples
Full Refresh Mode
In the example below, when we specify the source connection aws_s3, sling will auto-inject the necessary secrets for proper auth.
# Read CSV files with custom SQL
sling run \
--src-conn aws_s3 \
--src-stream "select * from read_csv('s3://my-bucket/data/*.csv') where amount > 1000" \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.large_transactions' \
--mode full-refresh
# Read Parquet files with custom SQL and aggregation
sling run \
--src-conn aws_s3 \
--src-stream "select date_trunc('month', date) as month, sum(amount) as total
from read_parquet('gs://my-bucket/data/*.parquet')
group by 1" \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.monthly_totals' \
--mode full-refresh
Incremental Mode
# Incremental load using timestamp column
sling run \
--src-stream "select * from read_csv('s3://my-bucket/data/*.csv')
where {incremental_where_cond}" \
--src-options '{"sql": true}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.transactions' \
--mode incremental \
--primary-key id \
--update-key created_at
Replication Configuration
You can also use DuckDB SQL in your replication configuration:
source: AWS_S3
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
# Using SQL to read and transform CSV data
daily_sales_summary:
object: analytics.daily_sales_summary
sql: |
select date,
sum(case when type='sale' then amount else 0 end) as sales,
sum(case when type='refund' then amount else 0 end) as refunds
from read_csv('s3://my-bucket/transactions/*.csv')
group by date
# Incremental load with custom SQL
events:
object: analytics.events
sql: |
select * from read_parquet('s3://my-bucket/events/*.parquet')
where event_timestamp > coalesce({incremental_value}, '2001-01-01')
mode: incremental
update_key: event_timestamp
# Join multiple files
enriched_transactions:
object: analytics.enriched_transactions
sql: |
select t.*, c.category
from read_csv('s3://my-bucket/transactions.csv') t
left join read_parquet('s3://my-bucket/categories.parquet') c
on t.category_id = c.id
Using Python
Using custom SQL to read from files with the Python API:
replication.py
from sling import Replication, ReplicationStream, SourceOptions, Mode
import os
# Set environment variables
os.environ['AWS_S3'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# Full Refresh Mode with custom SQL
replication = Replication(
source='AWS_S3',
target='MY_TARGET_DB',
defaults={'mode': Mode.FULL_REFRESH},
streams={
# Using SQL to read and transform CSV data
'daily_sales_summary': ReplicationStream(
object='analytics.daily_sales_summary',
sql="""
select date,
sum(case when type='sale' then amount else 0 end) as sales,
sum(case when type='refund' then amount else 0 end) as refunds
from read_csv('s3://my-bucket/transactions/*.csv')
group by date
"""
),
# Incremental load with custom SQL
'events': ReplicationStream(
object='analytics.events',
sql="""
select * from read_parquet('s3://my-bucket/events/*.parquet')
where event_timestamp > coalesce({incremental_value}, '2001-01-01')
""",
mode=Mode.INCREMENTAL,
update_key='event_timestamp'
),
# Join multiple files
'enriched_transactions': ReplicationStream(
object='analytics.enriched_transactions',
sql="""
select t.*, c.category
from read_csv('s3://my-bucket/transactions.csv') t
left join read_parquet('s3://my-bucket/categories.parquet') c
on t.category_id = c.id
"""
)
}
)
# Run the replication
replication.run()
# Simple custom SQL example
replication = Replication(
source='AWS_S3',
target='MY_TARGET_DB',
streams={
"my_stream": ReplicationStream(
object='target_schema.large_transactions',
sql="select * from read_csv('s3://my-bucket/data/*.csv') where amount > 1000",
mode=Mode.FULL_REFRESH
)
}
)
replication.run()
# Incremental load with source options
replication = Replication(
source='LOCAL', # When source is LOCAL, need to use source_options
target='MY_TARGET_DB',
streams={
'my_stream': ReplicationStream(
sql="""select * from read_csv('s3://my-bucket/data/*.csv')
where {incremental_where_cond}""",
source_options=SourceOptions(sql=True),
object='target_schema.transactions',
mode=Mode.INCREMENTAL,
primary_key='id',
update_key='created_at'
)
}
)
replication.run()
Features
SQL Functions: Access to DuckDB's rich SQL function library
File Format Support: Works with CSV, Parquet, JSON, and other formats supported by DuckDB
Aggregations: Perform aggregations and transformations during read
Joins: Join data from multiple files
Filtering: Apply filters to reduce data transfer
Type Casting: Use SQL CAST functions for type conversions
Notes
Use DuckDB's read_* functions to specify input files
Sling with auto-download the duckdb binary into the Sling . You can specify the desired duckDB version with env var DUCKDB_VERSION.
For incremental loads, use the placeholder variables such as {incremental_where_cond} and {incremental_value}. See for more details.
File paths support wildcards (*) for matching multiple files. See .
Cloud storage paths (s3://, gs://, etc.) are supported with proper credentials. Make sure to specify the respective source connection, sling will auto-inject the needed before running the query. If you are facing issues with auth not working, please reach out to us at , on or open a Github Issue .