Sling
Slingdata.ioBlogGithubHelp!
  • Introduction
  • Sling CLI
    • Installation
    • Environment
    • Running Sling
    • Global Variables
    • CLI Pro
  • Sling Platform
    • Sling Platform
      • Architecture
      • Agents
      • Connections
      • Editor
      • API
      • Deploy from CLI
  • Concepts
    • Replications
      • Structure
      • Modes
      • Source Options
      • Target Options
      • Columns
      • Transforms
      • Runtime Variables
      • Tags & Wildcards
    • Hooks / Steps
      • Check
      • Command
      • Copy
      • Delete
      • Group
      • Http
      • Inspect
      • List
      • Log
      • Query
      • Replication
      • Store
      • Read
      • Write
    • Pipelines
    • Data Quality
      • Constraints
  • Examples
    • File to Database
      • Custom SQL
      • Incremental
    • Database to Database
      • Custom SQL
      • Incremental
      • Backfill
    • Database to File
      • Incremental
  • Connections
    • Database Connections
      • Athena
      • BigTable
      • BigQuery
      • Cloudflare D1
      • Clickhouse
      • DuckDB
      • DuckLake
      • MotherDuck
      • MariaDB
      • MongoDB
      • Elasticsearch
      • MySQL
      • Oracle
      • Postgres
      • Prometheus
      • Proton
      • Redshift
      • StarRocks
      • SQLite
      • SQL Server
      • Snowflake
      • Trino
    • Storage Connections
      • AWS S3
      • Azure Storage
      • Backblaze B2
      • Cloudflare R2
      • DigitalOcean Spaces
      • FTP
      • Google Drive
      • Google Storage
      • Local Storage
      • Min.IO
      • SFTP
      • Wasabi
Powered by GitBook
On this page
  • Full Refresh with Custom SQL
  • Incremental with Custom SQL
  1. Examples
  2. Database to Database

Custom SQL

Full Refresh with Custom SQL

Using CLI Flags

# Using inline SQL
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where status = "active"' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# Using SQL from a file
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  my_schema.my_table.1:
    sql: |
      select 
        id,
        first_name,
        last_name,
        email,
        status
      from my_schema.my_table 
      where status = 'active'
    object: target_schema.active_users

  my_schema.my_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.custom_table

Incremental with Custom SQL

Using CLI Flags

# Using inline SQL with incremental variables
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where updated_at > {incremental_where_cond}' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'

# Using SQL file with incremental loading
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/incremental_query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  my_schema.orders:
    sql: |
      select 
        o.id,
        o.order_date,
        o.customer_id,
        o.status,
        o.updated_at,
        c.name as customer_name
      from my_schema.orders o
      join my_schema.customers c on o.customer_id = c.id
      where {incremental_where_cond}
      order by o.updated_at asc
    object: target_schema.enriched_orders

  my_schema.transactions:
    sql: |
      with ranked_transactions as (
        select 
          *,
          row_number() over (partition by transaction_id order by modified_at desc) as rn
        from my_schema.transactions
        where modified_at > coalesce({incremental_value}, '2001-01-01')
      )
      select * from ranked_transactions 
      where rn = 1
    object: target_schema.latest_transactions
    update_key: modified_at  # override default update_key

  my_schema.daily_metrics:
    sql: file:///path/to/daily_metrics.sql
    object: target_schema.daily_metrics
    primary_key: [date, metric_id]

The examples above demonstrate:

  • Using both inline SQL and SQL files

  • Joining multiple tables in custom SQL

  • Handling duplicates with window functions

  • Overriding default primary keys and update keys

PreviousDatabase to DatabaseNextIncremental

Last updated 4 months ago

Using incremental variables ({incremental_value} and {incremental_where_cond}). See for details.

here