In this article, I will demonstrate how both basic and slightly in-depth ETL can be facilitated almost entirely in Snowflake. The only outside elements would be some form of cloud storage (s3 bucket for example) and a cloud notification service (AWS SNS, or GCP pub/sub). These external items could be avoided, but it would require any of your outside data to be uploaded to a Snowflake internal stage, which seems like more work than just using the two outside services.
I will provide two hypothetical situations and show how Snowflake native tools can be used to make a pipeline to handle them. All code and sample data are located in my git repo: https://github.com/jkimmerling/snowflake-etl
Guides for specific cloud components can be found below
- Snowflake guide to pipe auto ingest: https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto
- AWS External Storage Guide: https://docs.snowflake.com/en/user-guide/data-load-s3
- GCP External Storage Guide: https://docs.snowflake.com/en/user-guide/data-load-gcs
- AZURE External Storage Guide: https://docs.snowflake.com/en/user-guide/data-load-azure
Automated Basic Ingestion
In this scenario there is payment data coming from a third party payment source. This payment data covers any payments made from the previous day, and it is in CSV format. The CSV data is uploaded daily to a GCP Cloud Storage bucket. To accomplish an automated ingestion of this data, the following code will setup a Snowflake Snowpipe and all of its dependencies.
Outline of what will be set up:
- A table to house the incoming data
- A
FILE FORMAT
to tell Snowflake how to handle the CSV files - A
STORAGE INTEGRATION
to facilitate the connection to the GCP bucket - A
STAGE
representing the bucket as a queriable Snowflake location - A
NOTIFICATION INTEGRATION
to allow GCP Pub/Sub to send notifications when a new file is uploaded to the bucket - A
PIPE
(Snowpipe) that will trigger when a new notification is sent. The pipe executes theCOPY INTO
SQL code when it is triggered, which inserts the CSV data as new rows into the table created in step 1.
The code and steps to see this hypothetical pipeline in action
SQL to create the table
TABLE PAYMENTS_CSV (
id number autoincrement start 1 increment 1 PRIMARY KEY,
invoiceId INT,
amountPaid NUMBER,
paymentDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
SQL to create the File Format
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = CSV FIELD_DELIMITER = ','
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
NULL_IF = ('NULL', 'null','')
ESCAPE_UNENCLOSED_FIELD = None
EMPTY_FIELD_AS_NULL = true
skip_header = 1
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;
SQL to create the Storage Integration, must use role ACCOUNTADMIN
. The GRANT
should be tailored to whatever role you plan to use.
CREATE OR REPLACE STORAGE INTEGRATION SNOWFLAKE_DATA_PIPELINE_STORAGE_INTEGRATION
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<your bucket url here>');
GRANT USAGE ON INTEGRATION SNOWFLAKE_DATA_PIPELINE_STORAGE_INTEGRATION TO ROLE SYSADMIN;
SQL to create the stage linked to the bucket – switch back to your intended role.
CREATE OR REPLACE STAGE SNOWFLAKE_DATA_PIPELINE
URL='gcs://<your bucket url here>'
STORAGE_INTEGRATION = SNOWFLAKE_DATA_PIPELINE_STORAGE_INTEGRATION;
SQL to create the Integration to notify the Snowpipe when new files land in the bucket, use role ACCOUNTADMIN
. The GRANT should be tailored to whatever role you plan to use.
CREATE OR REPLACE NOTIFICATION INTEGRATION SNOWFLAKE_DATA_PIPELINE_NOTIFICATION
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
GRANT USAGE ON INTEGRATION SNOWFLAKE_DATA_PIPELINE_NOTIFICATION TO ROLE SYSADMIN;
SQL to create the Snowpipe to ingest the CSV files. Switch back to your intended role.
CREATE OR REPLACE PIPE SNOWFLAKE_DATA_PIPELINE_PIPE_CSV
AUTO_INGEST = true
INTEGRATION = 'SNOWFLAKE_DATA_PIPELINE_NOTIFICATION'
AS
COPY INTO PAYMENTS_CSV (invoiceId, amountPaid, paymentDate)
FROM ( select $1, $2, $3 from @SNOWFLAKE_DATA_PIPELINE )
FILE_FORMAT = (FORMAT_NAME = 'my_csv_format')
PATTERN = '.*[.]csv'
ON_ERROR = SKIP_FILE;
Testing the basic pipeline
To test the demo code, you can upload this file into your bucket: https://github.com/jkimmerling/snowflake-etl/blob/main/scenario_1_data.csv
After a few minutes, you should see data in the PAYMENTS_CSV
table: ![[Pasted image 20240911130656.png]]
Scenario wrap up
This was a very simple example, but if the primary need is to get raw flat file data into Snowflake, this accomplishes it without having to design a pipeline or pay for SaaS solutions. With changes to PATTERN
, more Snowpipes can be added to allow for filename filtering and different target tables.
Automated ETL
In this scenario, payment data again comes from a third-party payment source and is uploaded to a GCP Cloud Storage bucket. This time, dealing with the payment data is not as straightforward. It comes in at random times and has a varying time window, so duplicate data is expected. It is also in JSON format, so data transformation is needed. The goal for this pipeline is to take this payment data and merge it with existing invoice data, check if the invoices are paid off, see what the last payment amount and date were and put this transformed data into the INVOICE_TRACKING
table. Duplicates need to be avoided, new records need to be inserted, and existing records need to be updated. To accomplish this, ETL pipeline parts from the previous pipeline will be used, with the additions of a STREAM
and a TASK
.
The outline of what will be set up:
- A staging table to house the incoming data
- A
FILE FORMAT
to tell Snowflake how to handle the JSON files - A
STORAGE INTEGRATION
to facilitate the connection to the GCP Cloud Storage bucket - A
STAGE
representing the bucket as a queriable Snowflake location - A
NOTIFICATION INTEGRATION
to allow GCP Pub/Sub to send notifications when a new file is uploaded to the bucket - A
PIPE
(Snowpipe) that will trigger when a new notification is sent. The pipe executes theCOPY INTO
SQL code when it is triggered, which inserts the JSON data as a single row with aCREATIONDATE
column and aVARIANT
column - A
STREAM
that will log any changes made to the staging table - A
TASK
that will check every 5 minutes if a change is logged in theSTREAM
. TheTASK
will execute the SQL code if there is a change. The SQL code in this example does the needed transformations and joins, merges the transformed data intoINVOICE_TRACKING
, truncates the staging table to prepare it for the next file upload, and resets theSTREAM
to avoid the truncate triggering anotherTASK
execution.
The code and steps to see this hypothetical pipeline in action
SQL to create the staging table. This table will house VARIANT data from a JSON.
CREATE OR REPLACE TABLE PAYMENTS_JSON (
payment_data VARIANT,
creationDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
SQL to create the table used for JOINs to demo overall functionality. This will act as the primary source table and will be joined with the PAYMENTS_JSON table.
CREATE OR REPLACE TABLE INVOICES (
id number autoincrement start 1 increment 1 PRIMARY KEY,
customerID INT,
orderId INT,
totalCost NUMBER,
creationDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
SQL to create the target table to demo the PIPE
, TASK
, and STREAM
pipeline. The transformed data will go into this table via a MERGE…INSERT…UPDATE
CREATE OR REPLACE TABLE INVOICE_TRACKING (
id number autoincrement start 1 increment 1 PRIMARY KEY,
invoiceId INT,
customerId INT,
totalCost NUMBER,
totalPaid NUMBER,
paidOff BOOLEAN,
lastPayment NUMBER,
lastPaymentDate TIMESTAMP
);
SQL to create the File Format
CREATE OR REPLACE FILE FORMAT my_json_format
TYPE = JSON;
SQL to create the Storage Integration, must use role ACCOUNTADMIN
. The GRANT
should be tailored to whatever role you plan to use.
CREATE OR REPLACE STORAGE INTEGRATION SNOWFLAKE_DATA_PIPELINE_STORAGE_INTEGRATION
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<your bucket url here>');
GRANT USAGE ON INTEGRATION SNOWFLAKE_DATA_PIPELINE_STORAGE_INTEGRATION TO ROLE SYSADMIN;
SQL to create the stage linked to the bucket. Switch back to your intended role.
CREATE OR REPLACE STAGE SNOWFLAKE_DATA_PIPELINE
URL='gcs://<your bucket url here>'
STORAGE_INTEGRATION = SNOWFLAKE_DATA_PIPELINE_STORAGE_INTEGRATION;
SQL to create the Integration to notify the Snowpipe when new files land in the bucket, use role ACCOUNTADMIN
. The GRANT should be tailored to whatever role you plan to use.
CREATE OR REPLACE NOTIFICATION INTEGRATION SNOWFLAKE_DATA_PIPELINE_NOTIFICATION
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
GRANT USAGE ON INTEGRATION SNOWFLAKE_DATA_PIPELINE_NOTIFICATION TO ROLE SYSADMIN;
SQL to create the Snowpipe to ingest the CSV files. Switch back to your intended role.
CREATE OR REPLACE PIPE SNOWFLAKE_DATA_PIPELINE_PIPE_JSON
AUTO_INGEST = true
INTEGRATION = 'SNOWFLAKE_DATA_PIPELINE_NOTIFICATION'
AS
COPY INTO PAYMENTS_JSON (PAYMENT_DATA, CREATIONDATE)
FROM (
select $1, CURRENT_TIMESTAMP()
from @SNOWFLAKE_DATA_PIPELINE
)
FILE_FORMAT = (FORMAT_NAME = 'my_json_format')
PATTERN = '.*[.]json'
ON_ERROR = SKIP_FILE;
SQL to create the STREAM
to catch changes in the staging table where the JSON files land
CREATE STREAM SNOWFLAKE_DATA_PIPELINE_PIPE_STREAM ON TABLE DATA_PIPELINE.TEST.PAYMENTS_JSON;
SQL to create the TASK
that will be triggered when the stream logs changes
CREATE OR REPLACE TASK SNOWFLAKE_DATA_PIPELINE_PIPE_TASK
WAREHOUSE = compute_wh
SCHEDULE = '5 minute'
WHEN SYSTEM$STREAM_HAS_DATA('SNOWFLAKE_DATA_PIPELINE_PIPE_STREAM')
AS
BEGIN
-- Merging the staging table and invoice table into the INVOIVE_TRACKING table
MERGE INTO INVOICE_TRACKING
USING (
with initial_payments as (
SELECT
r.value:invoiceId AS invoiceId,
r.value:amountPaid::NUMBER AS amountPaid,
TO_TIMESTAMP(r.value:paymentDate::STRING, 'YYYY-MM-DDTHH24:MI:SS') AS paymentDate,
SUM(r.value:amountPaid::NUMBER) OVER (PARTITION BY r.value:invoiceId) AS totalPaid,
ROW_NUMBER() OVER (PARTITION BY r.value:invoiceId ORDER BY TO_TIMESTAMP(r.value:paymentDate::STRING, 'YYYY-MM-DDTHH24:MI:SS') DESC) AS rn
FROM
PAYMENTS_JSON t,
LATERAL FLATTEN(input => t.payment_data:payment_data) r
),
final_payments as (
SELECT
*
FROM initial_payments
WHERE
rn = 1
),
final_invoices as (
SELECT
id as invoiceId,
customerId,
orderId,
totalCost
from INVOICES
)
SELECT
i.invoiceId as invoiceId,
i.customerId as customerId,
i.totalCost as totalCost,
p.totalPaid as totalPaid,
CASE
WHEN p.totalPaid >= i.totalCost THEN true
ELSE false
END as paidOff,
p.amountPaid as lastPayment,
p.paymentDate as lastPaymentDate
FROM final_invoices i
LEFT JOIN final_payments p ON i.invoiceId = p.invoiceId
) as cte
ON cte.invoiceId = INVOICE_TRACKING.invoiceId
WHEN MATCHED THEN
UPDATE SET
INVOICE_TRACKING.customerId = cte.customerId,
INVOICE_TRACKING.totalCost = cte.totalCost,
INVOICE_TRACKING.totalPaid = cte.totalPaid,
INVOICE_TRACKING.paidOff = cte.paidOff,
INVOICE_TRACKING.lastPayment = cte.lastPayment,
INVOICE_TRACKING.lastPaymentDate = cte.lastPaymentDate
WHEN NOT MATCHED THEN
INSERT (invoiceId, customerId, totalCost, totalPaid, paidOff, lastPayment, lastPaymentDate)
VALUES (cte.invoiceId, cte.customerId, cte.totalCost, cte.totalPaid, cte.paidOff, cte.lastPayment, cte.lastPaymentDate);
-- Truncate the staging table to remove already processed data
TRUNCATE PAYMENTS_JSON;
-- Reset the stream so the TRUNCATE does not trigger another run
CREATE OR REPLACE TEMP TABLE RESET_TBL AS
SELECT * FROM SNOWFLAKE_DATA_PIPELINE_PIPE_STREAM;
END;
SQL to enable the TASK, as a TASK
is disabled by default upon creation
ALTER TASK SNOWFLAKE_DATA_PIPELINE_PIPE_TASK RESUME;
Testing the pipeline
To test the demo code:
- Add these two rows to the
INVOICES
table:
INSERT INTO INVOICES (customerID, orderId, totalCost, creationDate)
VALUES (1234, 5678, 2300, '2023-01-01T00:01:00');
INSERT INTO INVOICES (customerID, orderId, totalCost, creationDate)
VALUES (2345, 6789, 2350, '2022-12-31T12:34:45');
- Upload this JSON file into your bucket: https://github.com/jkimmerling/snowflake-etl/blob/main/scenario_2_data.json
If you are quick, you might be able to see the JSON data loaded into the PAYMENTS_JSON
table before the TASK
truncates it: ![[Pasted image 20240911130833.png]]
After a few minutes, you should see data in the INVOICE_TRACKING
table: ![[Pasted image 20240911130740.png]]
Scenario wrap up
This scenario gave a bit more of a complete ETL pipeline feel. However, it was still relatively simple and made assumptions that should not be made in production—namely, that only one payment data JSON would be uploaded at a time. However, this demo was created to show a “skeleton” of what could be accomplished. I hope I have shown it in a way that allows you to explore and expand upon it.
Conclusion
Will this type of ETL setup work for everything? No, not at all. These examples are intended to present a possible alternative to overengineered pipelines for what should be a basic problem. There are also some other interesting benefits to a Snowflake-based ETL pipeline. With an entire pipeline written in SQL, the talent pool that can work on the pipeline is enormous. In addition, a SQL-only pipeline also enables version control and deployment via tools like DBT.
Hopefully, this article has shown you some new/interesting possibilities.
I am open to consulting engagements if you need help building this or any other data solution. Feel free to email me directly at jason@hyperfocusdata.io