Database to Database
Examples of using Sling to load data from one database to another
Last updated
Examples of using Sling to load data from one database to another
Last updated
We first need to make sure our connections are available in our environment. See and for more details.
export MY_SOURCE_DB='...'
export MY_TARGET_DB='...'
$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+---------------+------------------+-----------------+
| MY_SOURCE_DB | DB - PostgreSQL | env variable |
| MY_TARGET_DB | DB - Snowflake | env variable |
+---------------+------------------+-----------------+
# using windows Powershell
$env:MY_SOURCE_DB = '...'
$env:MY_TARGET_DB = '...'
$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+---------------+------------------+-----------------+
| MY_SOURCE_DB | DB - PostgreSQL | env variable |
| MY_TARGET_DB | DB - Snowflake | env variable |
+---------------+------------------+-----------------+
Using
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Using
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
object: '{target_schema}.{stream_schema}_{stream_table}'
mode: full-refresh
streams:
source_schema.source_table:
object: target_schema.target_table # override default object pattern
source_schema.another_table:
# all tables in schema, except "forbidden_table"
my_schema.*:
my_schema.forbidden_table:
disabled: true
env:
SLING_THREADS: 3 # run streams concurrently
Using
from sling import Replication, ReplicationStream, Mode
# Single stream example
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()
# Multiple streams example
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
object='{target_schema}.{stream_schema}_{stream_table}',
mode=Mode.FULL_REFRESH
),
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table' # override default object pattern
),
'source_schema.another_table': {},
# all tables in schema, except "forbidden_table"
'my_schema.*': {},
'my_schema.forbidden_table': ReplicationStream(
disabled=True
)
},
env={'SLING_THREADS': '3'} # run streams concurrently
)
replication.run()
See also .
Using
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'select * from my_schema.my_table where col1 is not null' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
# we can also read from a SQL file (/path/to/query.sql)
$ sling run --src-conn MY_SOURCE_DB \
--src-stream file:///path/to/query.sql \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Using
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
source_schema.source_table.1:
sql: |
select *
from my_schema.my_table
where col1 is not null
object: target_schema.target_table
source_schema.source_table.2:
sql: file:///path/to/query.sql
object: target_schema.target_table
Using
from sling import Replication, ReplicationStream, Mode
# Single stream example with inline SQL
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
streams={
'source_schema.source_table.1': ReplicationStream(
sql="""
select *
from my_schema.my_table
where col1 is not null
""",
object='target_schema.target_table',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()
# Multiple streams example with SQL file reference
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
mode=Mode.FULL_REFRESH
),
streams={
'source_schema.source_table.1': ReplicationStream(
sql="""
select *
from my_schema.my_table
where col1 is not null
""",
object='target_schema.target_table'
),
'source_schema.source_table.2': ReplicationStream(
sql='file:///path/to/query.sql',
object='target_schema.target_table'
)
}
)
replication.run()
See also .
Using
# limit to 10M records at a time. Will be sorted by update_key ASC.
# just loop command until all data is transferred / caught up.
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--primary-key 'id' \
--update-key 'last_modified_dt' \
--mode incremental \
--limit 1000000 -d
# Backfill specific date range
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--primary-key 'id' \
--update-key 'last_modified_dt' \
--mode backfill \
--range '2021-01-01,2021-02-01'
Using
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: incremental
object: '{target_schema}.{stream_schema}_{stream_table}'
primary_key: [id]
update_key: last_modified_dt
source_options:
limit: 10000000 # limit to 10M records at a time
streams:
source_schema.source_table:
object: target_schema.target_table # override default object pattern
update_key: updated_at # override default update key
# backfill
source_schema.backfill_table:
mode: backfill
object: target_schema.backfill_table
primary_key: [some_id]
update_key: updated_at # override default update key
source_options:
range: 2021-01-01,2021-02-01 # specific date range
chunk_size: 7d # 7-day stream chunking/splitting
source_schema.another_table:
target_options:
delete_missing: soft # track deletes from source table
Using
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode
# Incremental load with limit
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
mode=Mode.INCREMENTAL,
object='{target_schema}.{stream_schema}_{stream_table}',
primary_key=['id'],
update_key='last_modified_dt',
source_options=SourceOptions(
limit=10000000 # limit to 10M records at a time
)
),
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table', # override default object pattern
update_key='updated_at' # override default update key
),
# backfill example
'source_schema.backfill_table': ReplicationStream(
mode=Mode.BACKFILL,
object='target_schema.backfill_table',
primary_key=['some_id'],
update_key='updated_at',
source_options=SourceOptions(
range='2021-01-01,2021-02-01', # specific date range
chunk_size='7d' # 7-day stream chunking/splitting
)
),
'source_schema.another_table': ReplicationStream(
target_options=TargetOptions(
delete_missing='soft' # track deletes from source table
)
)
}
)
replication.run()
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode incremental \
--primary-key 'id' \
--update-key 'last_modified_dt'
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: incremental
object: '{target_schema}.{stream_schema}_{stream_table}'
primary_key: [id]
update_key: last_modified_dt
streams:
source_schema.source_table:
object: target_schema.target_table # override default object pattern
update_key: updated_at # override default update key
source_schema.another_table:
source_schema.some_table:
target_options:
delete_missing: soft # track deletes from source table
# all tables in schema, except "forbidden_table"
my_schema.*:
my_schema.forbidden_table:
disabled: true
env:
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode
# Single stream example
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table',
mode=Mode.INCREMENTAL,
primary_key=['id'],
update_key='last_modified_dt'
)
}
)
replication.run()
# Multiple streams example with defaults
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
mode=Mode.INCREMENTAL,
object='{target_schema}.{stream_schema}_{stream_table}',
primary_key=['id'],
update_key='last_modified_dt'
),
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table', # override default object pattern
update_key='updated_at' # override default update key
),
'source_schema.another_table': {},
'source_schema.some_table': ReplicationStream(
target_options=TargetOptions(
delete_missing='soft' # track deletes from source table
)
),
# all tables in schema, except "forbidden_table"
'my_schema.*': {},
'my_schema.forbidden_table': ReplicationStream(
disabled=True
)
},
env={'SLING_THREADS': '3'} # run streams concurrently
)
replication.run()
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode incremental \
--primary-key 'id'
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
object: '{target_schema}.{stream_schema}_{stream_table}'
mode: incremental
primary_key: [id]
streams:
source_schema.source_table:
object: target_schema.target_table # override default object pattern
primary_key: [ col1, col2 ] # override default primary_key
source_schema.another_table:
source_schema.some_table:
target_options:
delete_missing: soft # track deletes from source table
# all tables in schema, except "forbidden_table"
my_schema.*:
my_schema.forbidden_table:
disabled: true
env:
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode
# Single stream example (full data upsert - no update_key)
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table',
mode=Mode.INCREMENTAL,
primary_key=['id']
)
}
)
replication.run()
# Multiple streams example with defaults
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
object='{target_schema}.{stream_schema}_{stream_table}',
mode=Mode.INCREMENTAL,
primary_key=['id']
),
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table', # override default object pattern
primary_key=['col1', 'col2'] # override default primary_key
),
'source_schema.another_table': {},
'source_schema.some_table': ReplicationStream(
target_options=TargetOptions(
delete_missing='soft' # track deletes from source table
)
),
# all tables in schema, except "forbidden_table"
'my_schema.*': {},
'my_schema.forbidden_table': ReplicationStream(
disabled=True
)
},
env={'SLING_THREADS': '3'} # run streams concurrently
)
replication.run()
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode incremental \
--update-key 'created_dt'
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: incremental
object: '{target_schema}.{stream_schema}_{stream_table}'
update_key: created_dt
streams:
source_schema.source_table:
object: target_schema.target_table # override default object pattern
update_key: created_at # override default update_key
source_schema.another_table:
# all tables in schema, except "forbidden_table"
my_schema.*:
my_schema.forbidden_table:
disabled: true
env:
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, Mode
# Single stream example (append only)
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table',
mode=Mode.INCREMENTAL,
update_key='created_dt'
)
}
)
replication.run()
# Multiple streams example with defaults
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
mode=Mode.INCREMENTAL,
object='{target_schema}.{stream_schema}_{stream_table}',
update_key='created_dt'
),
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table', # override default object pattern
update_key='created_at' # override default update_key
),
'source_schema.another_table': {},
# all tables in schema, except "forbidden_table"
'my_schema.*': {},
'my_schema.forbidden_table': ReplicationStream(
disabled=True
)
},
env={'SLING_THREADS': '3'} # run streams concurrently
)
replication.run()
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode truncate
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: truncate
object: '{target_schema}.{stream_schema}_{stream_table}'
streams:
source_schema.source_table:
object: target_schema.target_table # override default object pattern
source_schema.another_table:
# all tables in schema, except "forbidden_table"
my_schema.*:
my_schema.forbidden_table:
disabled: true
env:
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream
# Single stream example
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table',
mode='truncate'
)
}
)
replication.run()
# Multiple streams example with defaults
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
mode=Mode.TRUNCATE,
object='{target_schema}.{stream_schema}_{stream_table}'
),
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table' # override default object pattern
),
'source_schema.another_table': {},
# all tables in schema, except "forbidden_table"
'my_schema.*': {},
'my_schema.forbidden_table': ReplicationStream(
disabled=True
)
},
env={'SLING_THREADS': '3'} # run streams concurrently
)
replication.run()
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'source_schema.source_table' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode snapshot
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: snapshot
object: '{target_schema}.{stream_schema}_{stream_table}'
streams:
source_schema.source_table:
object: target_schema.target_table # override default object pattern
source_schema.another_table:
# all tables in schema, except "forbidden_table"
my_schema.*:
my_schema.forbidden_table:
disabled: true
env:
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream
# Single stream example
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table',
mode=Mode.SNAPSHOT
)
}
)
replication.run()
# Multiple streams example with defaults
replication = Replication(
source='MY_SOURCE_DB',
target='MY_TARGET_DB',
defaults=ReplicationStream(
mode=Mode.SNAPSHOT,
object='{target_schema}.{stream_schema}_{stream_table}'
),
streams={
'source_schema.source_table': ReplicationStream(
object='target_schema.target_table' # override default object pattern
),
'source_schema.another_table': {},
# all tables in schema, except "forbidden_table"
'my_schema.*': {},
'my_schema.forbidden_table': ReplicationStream(
disabled=True
)
},
env={'SLING_THREADS': '3'} # run streams concurrently
)
replication.run()
See also .
Using
Using
Using
See also .
Using
Using
Using
See also .
Using
Using
Using
Using
Using
Using
Using
Using
Using