Sling
Slingdata.ioBlogGithubHelp!
  • Introduction
  • Sling CLI
    • Installation
    • Environment
    • Running Sling
    • Global Variables
    • CLI Pro
  • Sling Platform
    • Sling Platform
      • Architecture
      • Agents
      • Connections
      • Editor
      • API
      • Deploy from CLI
  • Concepts
    • Replications
      • Structure
      • Modes
      • Source Options
      • Target Options
      • Columns
      • Transforms
      • Runtime Variables
      • Tags & Wildcards
    • Hooks / Steps
      • Check
      • Command
      • Copy
      • Delete
      • Group
      • Http
      • Inspect
      • List
      • Log
      • Query
      • Replication
      • Store
      • Read
      • Write
    • Pipelines
    • Data Quality
      • Constraints
  • Examples
    • File to Database
      • Custom SQL
      • Incremental
    • Database to Database
      • Custom SQL
      • Incremental
      • Backfill
    • Database to File
      • Incremental
    • Sling + Python 🚀
  • Connections
    • Database Connections
      • Athena
      • BigTable
      • BigQuery
      • Cloudflare D1
      • Clickhouse
      • Databricks
      • DuckDB
      • DuckLake
      • Iceberg
      • MotherDuck
      • MariaDB
      • MongoDB
      • Elasticsearch
      • MySQL
      • Oracle
      • Postgres
      • Prometheus
      • Proton
      • Redshift
      • S3 Tables
      • StarRocks
      • SQLite
      • SQL Server
      • Snowflake
      • Trino
    • Storage Connections
      • AWS S3
      • Azure Storage
      • Backblaze B2
      • Cloudflare R2
      • DigitalOcean Spaces
      • FTP
      • Google Drive
      • Google Storage
      • Local Storage
      • Min.IO
      • SFTP
      • Wasabi
Powered by GitBook
On this page
  • Configuration
  • Properties
  • Output
  • Examples
  • Generate Data Processing Report
  • Create JSON Configuration File
  • Write Query Results to File
  • Create Error Log
  • Generate CSV Report
  • Write Content from Local File
  • Write Processed Content
  • Create Backup Metadata
  • Write Multi-line SQL Script
  • Conditional Writing
  1. Concepts
  2. Hooks / Steps

Write

Write hooks allow you to write content to files in any file-based storage connection. This is particularly useful for creating reports, saving processed data, generating configuration files, or writing logs.

Configuration

- type: write
  to: "connection/path/to/file.txt"     # Required: Destination Location
  content: "text content to write"      # Required: Content to write
  on_failure: abort       # Optional: abort/warn/quiet/skip
  id: my_id      # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

to

Yes

content

Yes

The content to write to the file. Supports variable substitution. Can also use file://path/to/file to read content from a local file.

on_failure

No

What to do if the write fails (abort/warn/quiet/skip)

Output

When the write hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success  # Status of the hook execution
target_url: "s3://bucket/path/to/file.txt"  # The normalized URI of the target file
bytes_written: 1024  # Number of bytes written

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.target_url} - The normalized URI of the target file

  • {state.hook_id.bytes_written} - Number of bytes written

Examples

Generate Data Processing Report

Create a summary report after data processing:

hooks:
  post:
    - type: write
      to: "s3/reports/processing_summary_{timestamp.YYYY-MM-DD}.txt"
      content: |
        Data Processing Summary
        ======================
        
        Stream: {run.stream.name}
        Start Time: {run.start_time}
        End Time: {run.end_time}
        Total Rows: {run.total_rows}
        Status: {run.status}
        
        Target: {target.connection}/{target.object}
        Total Bytes Written: {run.total_bytes}
        
        Generated on: {timestamp.YYYY-MM-DD HH:mm:ss}

Create JSON Configuration File

Generate a configuration file with runtime data:

hooks:
  pre:
    - type: write
      to: "local/config/runtime_config.json"
      content: |
        {
          "processing_date": "{timestamp.YYYY-MM-DD}",
          "source_connection": "{source.connection}",
          "target_connection": "{target.connection}",
          "stream_name": "{run.stream.name}",
          "environment": "{env.ENV_NAME}",
          "user": "{env.USER}"
        }

Write Query Results to File

Save query results as a formatted report:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        SELECT 
          COUNT(*) as total_records,
          MAX(created_at) as latest_record,
          MIN(created_at) as oldest_record
        FROM {run.object.full_name}
      id: stats_query
      
    - type: write
      to: "gcs/reports/table_stats_{run.stream.name}_{timestamp.YYYY-MM-DD}.txt"
      content: |
        Table Statistics Report
        =====================
        
        Table: {run.object.full_name}
        Total Records: {state.stats_query.result[0].total_records}
        Latest Record: {state.stats_query.result[0].latest_record}
        Oldest Record: {state.stats_query.result[0].oldest_record}
        
        Report generated: {timestamp.YYYY-MM-DD HH:mm:ss}

Create Error Log

Write error information to a log file when processing fails:

hooks:
  post:
    - type: write
      if: run.status == "error"
      to: "local/logs/error_log_{timestamp.YYYY-MM-DD}.txt"
      content: |
        ERROR LOG ENTRY
        ===============
        
        Timestamp: {timestamp.YYYY-MM-DD HH:mm:ss}
        Stream: {run.stream.name}
        Source: {source.connection}/{source.object}
        Target: {target.connection}/{target.object}
        Error: {run.error}
        
        Environment: {env.ENV_NAME}
        User: {env.USER}
        
        ---
      on_failure: warn

Generate CSV Report

Create a CSV file with processed data statistics:

hooks:
  post:
    - type: write
      to: "s3/reports/daily_stats_{timestamp.YYYY-MM-DD}.csv"
      content: |
        date,stream_name,source_connection,target_connection,rows_processed,bytes_processed,status,duration_seconds
        {timestamp.YYYY-MM-DD},{run.stream.name},{source.connection},{target.connection},{run.total_rows},{run.total_bytes},{run.status},{run.duration}

Write Content from Local File

Write content from a local file to a remote location:

hooks:
  post:
    - type: write
      to: "s3/reports/daily_report_{timestamp.YYYY-MM-DD}.html"
      content: "file://templates/report_template.html"

Write Processed Content

Process stored content and write it to a new file:

hooks:
  pre:
    - type: read
      from: "s3/templates/email_template.html"
      into: "template"
      
  post:
    - type: write
      to: "local/output/personalized_email_{timestamp.YYYY-MM-DD-HH-mm}.html"
      content: |
        {store.template | replace('{{USER_NAME}}', '{env.USER_NAME}') | replace('{{DATE}}', '{timestamp.YYYY-MM-DD}') | replace('{{ROWS_PROCESSED}}', '{run.total_rows}')}

Create Backup Metadata

Write metadata about the backup operation:

hooks:
  end:
    - type: write
      to: "s3/backups/metadata/backup_{timestamp.YYYY-MM-DD-HH-mm}.json"
      content: |
        {
          "backup_timestamp": "{timestamp.YYYY-MM-DD HH:mm:ss}",
          "source": {
            "connection": "{source.connection}",
            "object": "{source.object}",
            "total_rows": {run.total_rows}
          },
          "target": {
            "connection": "{target.connection}",
            "object": "{target.object}",
            "bytes_written": {run.total_bytes}
          },
          "status": "{run.status}",
          "duration_seconds": {run.duration},
          "environment": "{env.ENV_NAME}"
        }

Write Multi-line SQL Script

Generate a SQL script based on runtime data:

hooks:
  post:
    - type: write
      to: "local/sql/cleanup_{run.stream.name}_{timestamp.YYYY-MM-DD}.sql"
      content: |
        -- Cleanup script for {run.stream.name}
        -- Generated on {timestamp.YYYY-MM-DD HH:mm:ss}
        
        BEGIN;
        
        -- Archive old data
        CREATE TABLE {run.object.full_name}_archive_{timestamp.YYYY_MM_DD} AS
        SELECT * FROM {run.object.full_name}
        WHERE created_at < CURRENT_DATE - INTERVAL '90 days';
        
        -- Delete old data
        DELETE FROM {run.object.full_name}
        WHERE created_at < CURRENT_DATE - INTERVAL '90 days';
        
        -- Update statistics
        ANALYZE {run.object.full_name};
        
        COMMIT;
        
        -- Summary: Processed {run.total_rows} rows

Conditional Writing

Write different content based on conditions:

hooks:
  post:
    - type: write
      if: run.total_rows > 1000
      to: "s3/alerts/high_volume_{timestamp.YYYY-MM-DD}.txt"
      content: |
        HIGH VOLUME ALERT
        =================
        
        Stream: {run.stream.name}
        Rows Processed: {run.total_rows}
        Threshold: 1000
        Time: {timestamp.YYYY-MM-DD HH:mm:ss}
        
        This requires immediate attention.
      
    - type: write
      if: run.total_rows <= 1000
      to: "s3/logs/normal_volume_{timestamp.YYYY-MM-DD}.txt"
      content: |
        Normal processing completed for {run.stream.name}: {run.total_rows} rows processed.
PreviousReadNextPipelines

Last updated 3 days ago

The destination string. Contains connection name and file path.

location