File to Database
Examples of using Sling to load data from storage systems to databases
Last updated
Examples of using Sling to load data from storage systems to databases
Last updated
We first need to make sure our connections are available in our environment. See , and for more details.
export MY_TARGET_DB='...'
export SLING_SAMPLE_SIZE=2000 # increase the sample size to infer types. Default is 900.
export SLING_THREADS=3 # run streams concurrently
$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+---------------+------------------+-----------------+
| MY_S3_BUCKET | FileSys - S3 | sling env yaml |
| MY_TARGET_DB | DB - PostgreSQL | env variable |
| MY_GS_BUCKET | FileSys - Google | sling env yaml |
| MY_AZURE_CONT | FileSys - Azure | sling env yaml |
+---------------+------------------+-----------------+
# using windows Powershell
$env:MY_TARGET_DB = '...'
$env:SLING_SAMPLE_SIZE = 2000 # increase the sample size to infer types. Default is 900.
$env:SLING_THREADS = 3 # run streams concurrently
$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+---------------+------------------+-----------------+
| MY_S3_BUCKET | FileSys - S3 | sling env yaml |
| MY_TARGET_DB | DB - PostgreSQL | env variable |
| MY_GS_BUCKET | FileSys - Google | sling env yaml |
| MY_AZURE_CONT | FileSys - Azure | sling env yaml |
+---------------+------------------+-----------------+
Using
$ cat /tmp/my_file.csv | sling run --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-stream 'file:///tmp/my_file.csv' \
--columns '{ "*": string }' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
--columns '{col2: string, col3: string}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
--transforms '[remove_accents]' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file://C:/Temp/my_csv_folder/' \
--transforms '[remove_accents]' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
Using
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
streams:
# a stream with many parts, all sub files will be merged into one table
"file:///tmp/my_csv_folder/":
columns:
col2: string # cast `col2` as string
transforms: [remove_accents] # Apply transforms. Here we are removing diacritics (accents) from string values.
source_options:
format: csv
# expand all files into individual streams, each file will load into its own table
"file:///tmp/my_csv_folder/*.csv":
object: 'target_schema.{stream_file_name}'
# consider as a single stream (don't expand into individual streams)
"file:///tmp/my_csv_folder/prefix_*.csv":
object: 'target_schema.my_new_table'
single: true
"file:///tmp/my_file.csv":
columns:
"*": string # cast all columns to string
# Windows path format
"file://C:/Temp/my_file.csv":
columns:
"*": string # cast all columns to string
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
Using
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
os.environ['SLING_SAMPLE_SIZE'] = '2000' # increase the sample size to infer types
os.environ['SLING_THREADS'] = '3' # run streams concurrently
# Single CSV file import
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={'mode': Mode.FULL_REFRESH},
streams={
'file:///tmp/my_file.csv': ReplicationStream(
object='target_schema.target_table',
columns={'*': 'string'} # cast all columns to string
)
}
)
replication.run()
# Multiple CSV files with various options
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
},
streams={
# A stream with many parts, all sub files will be merged into one table
'file:///tmp/my_csv_folder/': ReplicationStream(
columns={'col2': 'string'}, # cast col2 as string
transforms=['remove_accents'], # Apply transforms
source_options=SourceOptions(format=Format.CSV)
),
# Expand all files into individual streams
'file:///tmp/my_csv_folder/*.csv': ReplicationStream(
object='target_schema.{stream_file_name}'
),
# Consider as a single stream (don't expand)
'file:///tmp/my_csv_folder/prefix_*.csv': ReplicationStream(
object='target_schema.my_new_table',
single=True
),
# Windows path format
'file://C:/Temp/my_file.csv': ReplicationStream(
columns={'*': 'string'}
)
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true', # adds a _sling_stream_url column
'SLING_THREADS': '3'
}
)
replication.run()
Using
$ sling run --src-stream 'file:///path/to/test.excel.xlsx' --src-options '{ sheet: "Sheet2!A:F" }' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
Using
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
streams:
# expand all files into a stream, each file will load into its own table
"file:///tmp/my_excel_folder/*.xlsx":
object: 'target_schema.{stream_file_name}'
source_options:
sheet: "Sheet1!A:F"
# consider as a single stream (don't expand into individual streams)
"file:///tmp/my_excel_folder/prefix_*.xlsx":
object: 'target_schema.my_new_table'
single: true
source_options:
sheet: "Sheet1!A:F"
"file:///path/to/test.excel.xlsx":
columns:
"*": string # cast all columns to string
source_options:
sheet: "Sheet2!A:F"
# Windows path format
"file://C:/Temp/my_file.xlsx":
columns:
"col2": integer # cast col2 to integer
source_options:
sheet: "Sheet2!A:F"
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_ROW_NUM_COLUMN: true # adds a _sling_row_num column with the row number
Using
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
# Excel file import with sheet specification
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
},
streams={
# Expand all files into streams
'file:///tmp/my_excel_folder/*.xlsx': ReplicationStream(
object='target_schema.{stream_file_name}',
source_options=SourceOptions(sheet='Sheet1!A:F')
),
# Consider as a single stream (don't expand)
'file:///tmp/my_excel_folder/prefix_*.xlsx': ReplicationStream(
object='target_schema.my_new_table',
single=True,
source_options=SourceOptions(sheet='Sheet1!A:F')
),
# Single Excel file
'file:///path/to/test.excel.xlsx': ReplicationStream(
columns={'*': 'string'}, # cast all columns to string
source_options=SourceOptions(sheet='Sheet2!A:F')
),
# Windows path format
'file://C:/Temp/my_file.xlsx': ReplicationStream(
columns={'col2': 'integer'}, # cast col2 to integer
source_options=SourceOptions(sheet='Sheet2!A:F')
)
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true', # adds a _sling_stream_url column
'SLING_ROW_NUM_COLUMN': 'true' # adds a _sling_row_num column
}
)
replication.run()
$ cat /tmp/my_file.json | sling run --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-stream 'file:///tmp/my_file.json' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_json_folder/' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
# Windows path format
$ sling run --src-stream 'file://C:/Temp/my_json_folder/' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.target_table'
source_options:
format: json
streams:
"file:///tmp/my_json_folder/":
# expand all files into a stream, each file will load into its own table
"file:///tmp/my_json_folder/*.json":
object: 'target_schema.{stream_file_name}'
# consider as a single stream (don't expand into individual streams)
"file:///tmp/my_json_folder/prefix_*.json":
object: 'target_schema.my_new_table'
single: true
"file:///tmp/my_file.json":
# Windows path format
"file://C:/Temp/my_file.json":
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
# JSON file import
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.target_table',
'source_options': SourceOptions(format=Format.JSON)
},
streams={
# JSON folder
'file:///tmp/my_json_folder/': {},
# Expand all files into streams
'file:///tmp/my_json_folder/*.json': ReplicationStream(
object='target_schema.{stream_file_name}'
),
# Consider as a single stream (don't expand)
'file:///tmp/my_json_folder/prefix_*.json': ReplicationStream(
object='target_schema.my_new_table',
single=True
),
# Single JSON file
'file:///tmp/my_file.json': {},
# Windows path format
'file://C:/Temp/my_file.json': {}
}
)
replication.run()
$ cat /tmp/my_file.json | sling run --src-options '{flatten: true}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-stream 'file:///tmp/my_file.json' \
--src-options '{flatten: true}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_json_folder/' \
--src-options '{flatten: true}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
# Windows path format
$ sling run --src-stream 'file://C:/Temp/my_json_folder/' \
--src-options '{flatten: true}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.target_table'
source_options:
format: json
flatten: true
streams:
"file:///tmp/my_json_folder/":
# expand all files into a stream, each file will load into its own table
"file:///tmp/my_json_folder/*.json":
object: 'target_schema.{stream_file_name}'
# consider as a single stream (don't expand into individual streams)
"file:///tmp/my_json_folder/prefix_*.json":
object: 'target_schema.my_new_table'
single: true
"file:///tmp/my_file.json":
# Windows path format
"file://C:/Temp/my_file.json":
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
# JSON file import with flattening
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.target_table',
'source_options': SourceOptions(
format=Format.JSON,
flatten=True
)
},
streams={
# JSON folder
'file:///tmp/my_json_folder/': {},
# Expand all files into streams
'file:///tmp/my_json_folder/*.json': ReplicationStream(
object='target_schema.{stream_file_name}'
),
# Consider as a single stream (don't expand)
'file:///tmp/my_json_folder/prefix_*.json': ReplicationStream(
object='target_schema.my_new_table',
single=True
),
# Single JSON file
'file:///tmp/my_file.json': {},
# Windows path format
'file://C:/Temp/my_file.json': {}
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true',
'SLING_THREADS': '3'
}
)
replication.run()
$ sling run --src-stream 'file:///tmp/my_file.parquet' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_parquet_folder/' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
# Windows path format
$ sling run --src-stream 'file://C:/Temp/my_parquet_folder/' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.target_table'
source_options:
format: parquet
streams:
"file:///tmp/my_parquet_folder/":
# expand all files into a stream, each file will load into its own table
"file:///tmp/my_parquet_folder/*.parquet":
object: 'target_schema.{stream_file_name}'
# consider as a single stream (don't expand into individual streams)
"file:///tmp/my_parquet_folder/prefix_*.parquet":
object: 'target_schema.my_new_table'
single: true
"file:///tmp/my_file.parquet":
# Windows path format
"file://C:/Temp/my_file.parquet":
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
# Parquet file import
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.target_table',
'source_options': SourceOptions(format=Format.PARQUET)
},
streams={
# Parquet folder
'file:///tmp/my_parquet_folder/': {},
# Expand all files into streams
'file:///tmp/my_parquet_folder/*.parquet': ReplicationStream(
object='target_schema.{stream_file_name}'
),
# Consider as a single stream (don't expand)
'file:///tmp/my_parquet_folder/prefix_*.parquet': ReplicationStream(
object='target_schema.my_new_table',
single=True
),
# Single Parquet file
'file:///tmp/my_file.parquet': {},
# Windows path format
'file://C:/Temp/my_file.parquet': {}
}
)
replication.run()
$ sling run --src-stream 'file:///tmp/my_file.sas7bdat' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_sas7bdat_folder/' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
# Windows path format
$ sling run --src-stream 'file://C:/tmp/my_file.sas7bdat' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file://C:/Temp/my_sas7bdat_folder/' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.target_table'
source_options:
format: sas7bdat
streams:
"file:///tmp/my_sas7bdat_folder/":
"file:///tmp/my_file.sas7bdat":
# Windows path format
"file://C:/Temp/my_file.sas7bdat":
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
# SAS7BDAT file import
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.target_table',
'source_options': SourceOptions(format=Format.SAS)
},
streams={
# SAS7BDAT folder
'file:///tmp/my_sas7bdat_folder/': {},
# Single SAS7BDAT file
'file:///tmp/my_file.sas7bdat': {},
# Windows path format
'file://C:/Temp/my_file.sas7bdat': {}
}
)
replication.run()
$ sling run --src-conn MY_SFTP --src-stream '/path/to/my_file.csv' \
--columns '{ "*": string }' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-conn MY_SFTP --src-stream '/path/to/my_csv_folder/' \
--columns '{col2: string, col3: string}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-conn MY_SFTP --src-stream '/path/to/my_csv_folder/' \
--transforms '[remove_accents]' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SFTP
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
streams:
"/path/to/my_csv_folder/":
columns:
col2: string # cast `col2` as string
transforms: [remove_accents] # Apply transforms. Here we are removing diacritics (accents) from string values.
source_options:
format: csv
# expand all files into a stream, each file will load into its own table
"/path/to/my_csv_folder/*.csv":
# consider as a single stream (don't expand into individual streams)
"/path/to/my_csv_folder/prefix_*.csv":
object: my_scheam.my_new_table
single: true
"/path/to/my_file.csv":
columns:
"*": string # cast all columns to string
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_SFTP'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# SFTP CSV file import
replication = Replication(
source='MY_SFTP',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
},
streams={
# SFTP CSV folder
'/path/to/my_csv_folder/': ReplicationStream(
columns={'col2': 'string'}, # cast col2 as string
transforms=['remove_accents'], # Apply transforms
source_options=SourceOptions(format=Format.CSV)
),
# Expand all files into streams
'/path/to/my_csv_folder/*.csv': {},
# Consider as a single stream (don't expand)
'/path/to/my_csv_folder/prefix_*.csv': ReplicationStream(
object='my_scheam.my_new_table',
single=True
),
# Single SFTP CSV file
'/path/to/my_file.csv': ReplicationStream(
columns={'*': 'string'} # cast all columns to string
)
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true',
'SLING_THREADS': '3'
}
)
replication.run()
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_csv_folder/' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_csv_folder/' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_csv_folder/' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GOOGLE_DRIVE --src-stream 'gdrive://folder_id/my_csv_folder/' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_file.csv' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_file.csv' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_file.csv' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GOOGLE_DRIVE --src-stream 'gdrive://folder_id/my_file.csv' --columns '{col2: string}' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_FILE_CONN
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
columns:
'*': string # cast all columns as string
source_options:
format: csv
streams:
# no need to specify scheme://bucket
"my_file.csv":
"my_csv_folder/": # single stream for whole folder
"my_csv_folder/*.csv": # individual streams for each file
"my_csv_folder/prefix_*.csv": # single stream for all files
object: 'target_schema.my_csv_data'
single: true
"s3://my-bucket/my_csv_folder/":
"s3://my-bucket/my_csv_folder/*.csv":
"s3://my-bucket/my_csv_folder/prefix_*.csv":
object: 'target_schema.my_csv_data'
single: true
"s3://my-bucket/my_file.csv":
"gs://my-bucket/my_csv_folder/":
"gs://my-bucket/my_csv_folder/*.csv":
"gs://my-bucket/my_csv_folder/prefix_*.csv":
object: 'target_schema.my_csv_data'
single: true
"gs://my-bucket/my_file.csv":
"gdrive://folder_id/my_csv_folder/":
"gdrive://folder_id/my_csv_folder/*.csv":
"gdrive://folder_id/my_csv_folder/prefix_*.csv":
object: 'target_schema.my_csv_data'
single: true
"gdrive://folder_id/my_file.csv":
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_FILE_CONN'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# Cloud Storage CSV file import
replication = Replication(
source='MY_FILE_CONN',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}',
'columns': {'*': 'string'}, # cast all columns as string
'source_options': SourceOptions(format=Format.CSV)
},
streams={
# No need to specify scheme://bucket
'my_file.csv': {},
'my_csv_folder/': {}, # single stream for whole folder
'my_csv_folder/*.csv': {}, # individual streams for each file
'my_csv_folder/prefix_*.csv': ReplicationStream( # single stream for all files
object='target_schema.my_csv_data',
single=True
),
# With full S3 path
's3://my-bucket/my_csv_folder/': {},
's3://my-bucket/my_csv_folder/*.csv': {},
's3://my-bucket/my_csv_folder/prefix_*.csv': ReplicationStream(
object='target_schema.my_csv_data',
single=True
),
's3://my-bucket/my_file.csv': {},
# Google Cloud Storage
'gs://my-bucket/my_csv_folder/': {},
'gs://my-bucket/my_csv_folder/*.csv': {},
'gs://my-bucket/my_csv_folder/prefix_*.csv': ReplicationStream(
object='target_schema.my_csv_data',
single=True
),
'gs://my-bucket/my_file.csv': {},
# Google Drive
'gdrive://folder_id/my_csv_folder/': {},
'gdrive://folder_id/my_csv_folder/*.csv': {},
'gdrive://folder_id/my_csv_folder/prefix_*.csv': ReplicationStream(
object='target_schema.my_csv_data',
single=True
),
'gdrive://folder_id/my_file.csv': {}
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true',
'SLING_THREADS': '3'
}
)
replication.run()
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_json_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_json_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_json_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_file.json' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_file.json' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_file.json' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_FILE_STORAGE
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
columns:
'*': string # cast all columns as string
source_options:
format: json
streams:
# no need to specify scheme://bucket
"my_file.json":
"my_json_folder/": # single stream for whole folder
"my_json_folder/*.json": # individual streams for each file
"my_json_folder/prefix_*.json": # single stream for all files
object: 'target_schema.my_json_data'
single: true
"s3://my-bucket/my_json_folder/":
"s3://my-bucket/my_json_folder/*.json":
"s3://my-bucket/my_json_folder/prefix_*.json":
object: 'target_schema.my_json_data'
single: true
"s3://my-bucket/my_file.json":
"gs://my-bucket/my_json_folder/":
"gs://my-bucket/my_json_folder/*.json":
"gs://my-bucket/my_json_folder/prefix_*.json":
object: 'target_schema.my_json_data'
single: true
"gs://my-bucket/my_file.json":
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_FILE_STORAGE'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# Cloud Storage JSON file import
replication = Replication(
source='MY_FILE_STORAGE',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}',
'columns': {'*': 'string'}, # cast all columns as string
'source_options': SourceOptions(format=Format.JSON)
},
streams={
# No need to specify scheme://bucket
'my_file.json': {},
'my_json_folder/': {}, # single stream for whole folder
'my_json_folder/*.json': {}, # individual streams for each file
'my_json_folder/prefix_*.json': ReplicationStream( # single stream for all files
object='target_schema.my_json_data',
single=True
),
# With full S3 path
's3://my-bucket/my_json_folder/': {},
's3://my-bucket/my_json_folder/*.json': {},
's3://my-bucket/my_json_folder/prefix_*.json': ReplicationStream(
object='target_schema.my_json_data',
single=True
),
's3://my-bucket/my_file.json': {},
# Google Cloud Storage
'gs://my-bucket/my_json_folder/': {},
'gs://my-bucket/my_json_folder/*.json': {},
'gs://my-bucket/my_json_folder/prefix_*.json': ReplicationStream(
object='target_schema.my_json_data',
single=True
),
'gs://my-bucket/my_file.json': {}
}
)
replication.run()
$ sling run --src-conn MY_S3_BUCKET --src-options '{flatten: true}' --src-stream 's3://my-bucket/my_json_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-options '{flatten: true}' --src-stream 'gs://my-bucket/my_json_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-options '{flatten: true}' --src-stream 'https://my_account.blob.core.windows.net/my-container/my_json_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_S3_BUCKET --src-options '{flatten: true}' --src-stream 's3://my-bucket/my_file.json' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-options '{flatten: true}' --src-stream 'gs://my-bucket/my_file.json' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-options '{flatten: true}' --src-stream 'https://my_account.blob.core.windows.net/my-container/my_file.json' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_FILE_STORAGE
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
columns:
'*': string # cast all columns as string
source_options:
format: json
flatten: true
streams:
# no need to specify scheme://bucket
"my_json_folder/": # single stream for whole folder
"my_json_folder/*.json": # individual streams for each file
"my_json_folder/prefix_*.json": # single stream for all files
object: 'target_schema.my_json_data'
single: true
"s3://my-bucket/my_json_folder/":
"s3://my-bucket/my_json_folder/*.json":
"s3://my-bucket/my_json_folder/prefix_*.json":
object: 'target_schema.my_json_data'
single: true
"s3://my-bucket/my_file.json":
"gs://my-bucket/my_json_folder/":
"gs://my-bucket/my_json_folder/*.json":
"gs://my-bucket/my_json_folder/prefix_*.json":
object: 'target_schema.my_json_data'
single: true
"gs://my-bucket/my_file.json":
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_FILE_STORAGE'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# Cloud Storage JSON file import with flattening
replication = Replication(
source='MY_FILE_STORAGE',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}',
'columns': {'*': 'string'}, # cast all columns as string
'source_options': SourceOptions(
format=Format.JSON,
flatten=True
)
},
streams={
# No need to specify scheme://bucket
'my_json_folder/': {}, # single stream for whole folder
'my_json_folder/*.json': {}, # individual streams for each file
'my_json_folder/prefix_*.json': ReplicationStream( # single stream for all files
object='target_schema.my_json_data',
single=True
),
# With full S3 path
's3://my-bucket/my_json_folder/': {},
's3://my-bucket/my_json_folder/*.json': {},
's3://my-bucket/my_json_folder/prefix_*.json': ReplicationStream(
object='target_schema.my_json_data',
single=True
),
's3://my-bucket/my_file.json': {},
# Google Cloud Storage
'gs://my-bucket/my_json_folder/': {},
'gs://my-bucket/my_json_folder/*.json': {},
'gs://my-bucket/my_json_folder/prefix_*.json': ReplicationStream(
object='target_schema.my_json_data',
single=True
),
'gs://my-bucket/my_file.json': {}
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true',
'SLING_THREADS': '3'
}
)
replication.run()
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_parquet_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_parquet_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_parquet_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_file.parquet' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_file.parquet' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_file.parquet' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_FILE_STORAGE
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
source_options:
format: parquet
streams:
# no need to specify scheme://bucket
"my_file.parquet":
"my_parquet_folder/": # single stream for whole folder
"my_parquet_folder/*.parquet": # one stream for each file
"my_parquet_folder/prefix_*.parquet": # single stream for all files
object: 'target_schema.my_parquet_data'
single: true
"s3://my-bucket/my_parquet_folder/":
"s3://my-bucket/my_parquet_folder/*.parquet":
"s3://my-bucket/my_parquet_folder/prefix_*.parquet":
object: 'target_schema.my_parquet_data'
single: true
"s3://my-bucket/my_file.parquet":
"gs://my-bucket/my_parquet_folder/":
"gs://my-bucket/my_parquet_folder/*.parquet":
"gs://my-bucket/my_parquet_folder/prefix_*.parquet":
object: 'target_schema.my_parquet_data'
single: true
"gs://my-bucket/my_file.parquet":
env:
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_FILE_STORAGE'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# Cloud Storage Parquet file import
replication = Replication(
source='MY_FILE_STORAGE',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}',
'source_options': SourceOptions(format=Format.PARQUET)
},
streams={
# No need to specify scheme://bucket
'my_file.parquet': {},
'my_parquet_folder/': {}, # single stream for whole folder
'my_parquet_folder/*.parquet': {}, # one stream for each file
'my_parquet_folder/prefix_*.parquet': ReplicationStream( # single stream for all files
object='target_schema.my_parquet_data',
single=True
),
# With full S3 path
's3://my-bucket/my_parquet_folder/': {},
's3://my-bucket/my_parquet_folder/*.parquet': {},
's3://my-bucket/my_parquet_folder/prefix_*.parquet': ReplicationStream(
object='target_schema.my_parquet_data',
single=True
),
's3://my-bucket/my_file.parquet': {},
# Google Cloud Storage
'gs://my-bucket/my_parquet_folder/': {},
'gs://my-bucket/my_parquet_folder/*.parquet': {},
'gs://my-bucket/my_parquet_folder/prefix_*.parquet': ReplicationStream(
object='target_schema.my_parquet_data',
single=True
),
'gs://my-bucket/my_file.parquet': {}
},
env={
'SLING_STREAM_URL_COLUMN': 'true',
'SLING_THREADS': '3'
}
)
replication.run()
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_avro_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_avro_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_avro_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_file.avro' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_file.avro' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_file.avro' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_FILE_STORAGE
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
source_options:
format: avro
streams:
# no need to specify scheme://bucket
"my_file.avro":
"my_avro_folder/": # single stream for whole folder
"my_avro_folder/*.avro": # one stream for each file
"my_avro_folder/prefix_*.avro": # single stream for all files
object: 'target_schema.my_avro_data'
single: true
"s3://my-bucket/my_avro_folder/":
"s3://my-bucket/my_avro_folder/*.avro":
"s3://my-bucket/my_avro_folder/prefix_*.avro":
object: 'target_schema.my_avro_data'
single: true
"s3://my-bucket/my_file.avro":
"gs://my-bucket/my_avro_folder/":
"gs://my-bucket/my_avro_folder/*.avro":
"gs://my-bucket/my_avro_folder/prefix_*.avro":
object: 'target_schema.my_avro_data'
single: true
"gs://my-bucket/my_file.avro":
env:
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_FILE_STORAGE'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# Cloud Storage Avro file import
replication = Replication(
source='MY_FILE_STORAGE',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}',
'source_options': SourceOptions(format=Format.AVRO)
},
streams={
# No need to specify scheme://bucket
'my_file.avro': {},
'my_avro_folder/': {}, # single stream for whole folder
'my_avro_folder/*.avro': {}, # one stream for each file
'my_avro_folder/prefix_*.avro': ReplicationStream( # single stream for all files
object='target_schema.my_avro_data',
single=True
),
# With full S3 path
's3://my-bucket/my_avro_folder/': {},
's3://my-bucket/my_avro_folder/*.avro': {},
's3://my-bucket/my_avro_folder/prefix_*.avro': ReplicationStream(
object='target_schema.my_avro_data',
single=True
),
's3://my-bucket/my_file.avro': {},
# Google Cloud Storage
'gs://my-bucket/my_avro_folder/': {},
'gs://my-bucket/my_avro_folder/*.avro': {},
'gs://my-bucket/my_avro_folder/prefix_*.avro': ReplicationStream(
object='target_schema.my_avro_data',
single=True
),
'gs://my-bucket/my_file.avro': {}
},
env={
'SLING_STREAM_URL_COLUMN': 'true',
'SLING_THREADS': '3'
}
)
replication.run()
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_xml_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_xml_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_xml_folder/' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_S3_BUCKET --src-stream 's3://my-bucket/my_file.xml' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_GS_BUCKET --src-stream 'gs://my-bucket/my_file.xml' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-conn MY_AZURE_CONT --src-stream 'https://my_account.blob.core.windows.net/my-container/my_file.xml' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_FILE_STORAGE
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
source_options:
format: xml
streams:
# no need to specify scheme://bucket
"my_xml_folder/":
"my_file.xml":
"s3://my-bucket/my_xml_folder/":
"s3://my-bucket/my_file.xml":
"gs://my-bucket/my_xml_folder/":
"gs://my-bucket/my_file.xml":
env:
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrently
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_FILE_STORAGE'] = '...'
os.environ['MY_TARGET_DB'] = '...'
# Cloud Storage XML file import
replication = Replication(
source='MY_FILE_STORAGE',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}',
'source_options': SourceOptions(format=Format.XML)
},
streams={
# No need to specify scheme://bucket
'my_xml_folder/': {},
'my_file.xml': {},
# With full S3 path
's3://my-bucket/my_xml_folder/': {},
's3://my-bucket/my_file.xml': {},
# Google Cloud Storage
'gs://my-bucket/my_xml_folder/': {},
'gs://my-bucket/my_file.xml': {}
},
env={
'SLING_STREAM_URL_COLUMN': 'true',
'SLING_THREADS': '3'
}
)
replication.run()
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using
Using