Sling
Slingdata.ioBlogGithubHelp!
  • Introduction
  • Sling CLI
    • Installation
    • Environment
    • Running Sling
    • Global Variables
    • CLI Pro
  • Sling Platform
    • Sling Platform
      • Architecture
      • Agents
      • Connections
      • Editor
      • API
      • Deploy from CLI
  • Concepts
    • Replications
      • Structure
      • Modes
      • Source Options
      • Target Options
      • Columns
      • Transforms
      • Runtime Variables
      • Tags & Wildcards
    • Hooks / Steps
      • Check
      • Command
      • Copy
      • Delete
      • Group
      • Http
      • Inspect
      • List
      • Log
      • Query
      • Replication
      • Store
      • Read
      • Write
    • Pipelines
    • Data Quality
      • Constraints
  • Examples
    • File to Database
      • Custom SQL
      • Incremental
    • Database to Database
      • Custom SQL
      • Incremental
      • Backfill
    • Database to File
      • Incremental
    • Sling + Python 🚀
  • Connections
    • Database Connections
      • Athena
      • BigTable
      • BigQuery
      • Cloudflare D1
      • Clickhouse
      • DuckDB
      • DuckLake
      • MotherDuck
      • MariaDB
      • MongoDB
      • Elasticsearch
      • MySQL
      • Oracle
      • Postgres
      • Prometheus
      • Proton
      • Redshift
      • StarRocks
      • SQLite
      • SQL Server
      • Snowflake
      • Trino
    • Storage Connections
      • AWS S3
      • Azure Storage
      • Backblaze B2
      • Cloudflare R2
      • DigitalOcean Spaces
      • FTP
      • Google Drive
      • Google Storage
      • Local Storage
      • Min.IO
      • SFTP
      • Wasabi
Powered by GitBook
On this page
  1. Examples

Database to Database

Examples of using Sling to load data from one database to another

PreviousIncrementalNextCustom SQL

Last updated 1 hour ago

We first need to make sure our connections are available in our environment. See and for more details.

export MY_SOURCE_DB='...'
export MY_TARGET_DB='...'

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_SOURCE_DB  | DB - PostgreSQL  | env variable    |
| MY_TARGET_DB  | DB - Snowflake   | env variable    |
+---------------+------------------+-----------------+
# using windows Powershell
$env:MY_SOURCE_DB = '...'
$env:MY_TARGET_DB = '...'

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_SOURCE_DB  | DB - PostgreSQL  | env variable    |
| MY_TARGET_DB  | DB - Snowflake   | env variable    |
+---------------+------------------+-----------------+
Database ⇨ Database (Full Refresh)

Using

sling.sh
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

Using

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  object: '{target_schema}.{stream_schema}_{stream_table}'
  mode: full-refresh

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true

env:
  SLING_THREADS: 3 # run streams concurrently

Using

database_to_database.py
from sling import Replication, ReplicationStream, Mode

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        object='{target_schema}.{stream_schema}_{stream_table}',
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
Database ⇨ Database (Custom SQL)

See also .

Using

$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where col1 is not null' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# we can also read from a SQL file (/path/to/query.sql)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

Using

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  source_schema.source_table.1:
    sql: |
      select *
      from my_schema.my_table
      where col1 is not null
    object: target_schema.target_table

  source_schema.source_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.target_table

Using

replication.py
from sling import Replication, ReplicationStream, Mode

# Single stream example with inline SQL
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example with SQL file reference
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table'
        ),
        'source_schema.source_table.2': ReplicationStream(
            sql='file:///path/to/query.sql',
            object='target_schema.target_table'
        )
    }
)

replication.run()
Database ⇨ Database (Incremental / Backfill)

See also .

Using

# limit to 10M records at a time. Will be sorted by update_key ASC.
# just loop command until all data is transferred / caught up.
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode incremental \
  --limit 1000000 -d

# Backfill specific date range
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode backfill \
  --range '2021-01-01,2021-02-01'

Using

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  update_key: last_modified_dt
  source_options:
    limit: 10000000 # limit to 10M records at a time

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: updated_at # override default update key

  # backfill
  source_schema.backfill_table:
    mode: backfill
    object: target_schema.backfill_table
    primary_key: [some_id]
    update_key: updated_at # override default update key
    source_options:
      range: 2021-01-01,2021-02-01 # specific date range
      chunk_size: 7d               # 7-day stream chunking/splitting

  source_schema.another_table:
    target_options:
      delete_missing: soft  # track deletes from source table

Using

replication.py
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode

# Incremental load with limit
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        primary_key=['id'],
        update_key='last_modified_dt',
        source_options=SourceOptions(
            limit=10000000  # limit to 10M records at a time
        )
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='updated_at'  # override default update key
        ),
        # backfill example
        'source_schema.backfill_table': ReplicationStream(
            mode=Mode.BACKFILL,
            object='target_schema.backfill_table',
            primary_key=['some_id'],
            update_key='updated_at',
            source_options=SourceOptions(
                range='2021-01-01,2021-02-01',  # specific date range
                chunk_size='7d'  # 7-day stream chunking/splitting
            )
        ),
        'source_schema.another_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        )
    }
)

replication.run()
Database ⇨ Database (Incremental - New Data Upsert)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'last_modified_dt'

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  update_key: last_modified_dt

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: updated_at # override default update key

  source_schema.another_table:
  
  source_schema.some_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
env:
  SLING_THREADS: 3 # run streams concurrently

replication.py
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            primary_key=['id'],
            update_key='last_modified_dt'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        primary_key=['id'],
        update_key='last_modified_dt'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='updated_at'  # override default update key
        ),
        'source_schema.another_table': {},
        'source_schema.some_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
Database ⇨ Database (Incremental - Full Data Upsert)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id'

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  object: '{target_schema}.{stream_schema}_{stream_table}'
  mode: incremental
  primary_key: [id]

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    primary_key: [ col1, col2 ] # override default primary_key

  source_schema.another_table:
  
  source_schema.some_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
    
env:
  SLING_THREADS: 3 # run streams concurrently

replication.py
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode

# Single stream example (full data upsert - no update_key)
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            primary_key=['id']
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        object='{target_schema}.{stream_schema}_{stream_table}',
        mode=Mode.INCREMENTAL,
        primary_key=['id']
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            primary_key=['col1', 'col2']  # override default primary_key
        ),
        'source_schema.another_table': {},
        'source_schema.some_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
Database ⇨ Database (Incremental - Append Only)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --update-key 'created_dt'

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  update_key: created_dt

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: created_at  # override default update_key

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
env:
  SLING_THREADS: 3 # run streams concurrently

replication.py
from sling import Replication, ReplicationStream, Mode

# Single stream example (append only)
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            update_key='created_dt'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        update_key='created_dt'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='created_at'  # override default update_key
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
Database ⇨ Database (Truncate)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode truncate

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: truncate
  object: '{target_schema}.{stream_schema}_{stream_table}'

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
    
env:
  SLING_THREADS: 3 # run streams concurrently

replication.py
from sling import Replication, ReplicationStream

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode='truncate'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.TRUNCATE,
        object='{target_schema}.{stream_schema}_{stream_table}'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
Database ⇨ Database (Snapshot)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode snapshot

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: snapshot
  object: '{target_schema}.{stream_schema}_{stream_table}'

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
    
env:
  SLING_THREADS: 3 # run streams concurrently

replication.py
from sling import Replication, ReplicationStream

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.SNAPSHOT
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.SNAPSHOT,
        object='{target_schema}.{stream_schema}_{stream_table}'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()

See also .

Using

Using

Using

See also .

Using

Using

Using

See also .

Using

Using

Using

Using

Using

Using

Using

Using

Using

Incremental Examples
Replication
Python
Incremental Examples
Replication
Python
Incremental Examples
Replication
Python
Replication
Python
Replication
Python
Environment
Database Connections
Replication
Python
Custom SQL Examples
Replication
Python
Incremental Examples
Replication
Python
CLI Flags
CLI Flags
CLI Flags
CLI Flags
CLI Flags
CLI Flags
CLI Flags
CLI Flags