Skip to content

Connect Snowflake to S3 Storage

Goal of this tutorial is to load JSON and CSV data from a S3 bucket using the Copy into sql command and Snowpipe to automate the ingestion process.

Requirements

  • Snowflake account, you can use a free trial. We also assume no complex security needs.
  • AWS account, you can setup a free account to get started.

Video

Download

  • Sample data (Link)

Manual Loading

Lets start by setting up a snowflake connection to AWS s3 storage and load json data. After that use snowpipe to automate the ingestion of CSV files.

AWS

Sign into your aws account.

Create S3 bucket

Create the bucket you intend to use. In our case we'll call the bucket danielwilczak. Create S3

Upload sample data

Upload the sample data to your s3 bucket (json/csv) provided in the data folder. Upload example data

Policy and role

Copy your ARN name. This wil be used in the policy step. Copy ARN name

Go to IAM: Create S3

Create a policy: Policy

Copy the template policy json code below and add your buckets arn.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
            "s3:GetObject",
            "s3:GetObjectVersion"
            ],
            "Resource": "<COPY ARN HERE>/*" /* (1)! */
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "<COPY ARN HERE>", /* (2)! */
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "*"
                    ]
                }
            }
        }
    ]
}

  1. Copy ARN name

  2. Copy ARN name

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
            "s3:GetObject",
            "s3:GetObjectVersion"
            ],
            "Resource": "arn:aws:s3:::danielwilczak/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::danielwilczak",
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "*"
                    ]
                }
            }
        }
    ]
}

Enter your policy using the template / example above and click next. Add policy json

Give the policy a name and then click "create policy". Policy

Next lets create a role! Navigate back to IAM: Create S3

Select Roles on the navbar and then click create role: Navigate to role

Select AWS account, This account(#), Require external ID and enter 0000 for now. We will update this later. Trusted entity

Add the policy to the role: Policy to role

Add the role name and click "create role": Add role name

Once created click your role: Click role

Copy your role ARN, this will be used in the next step: Copy arn

Snowflake

Lets start the snowflake setup by creating our database and schema. Followed by creating the integration to AWS by running the code below with your copied role arn and bucket name:

use role sysadmin;

-- Create a database to store our schemas.
create database if not exists raw;

-- Create the schema. The schema stores all objects.
create schema if not exists raw.aws;

/*
    Warehouses are synonymous with the idea of compute
    resources in other systems. We will use this
    warehouse to query our integration and to load data.
*/
create warehouse if not exists development 
    warehouse_size = xsmall
    auto_suspend = 30
    initially_suspended = true;

/*
    Integrations are on of those important features that
    account admins should do because it's allowing outside 
    snowflake connections to your data.
*/
use role accountadmin;

create storage integration s3_integration
    type = external_stage
    storage_provider = 's3'
    enabled = true
    storage_aws_role_arn = '<ROLE ARN HERE>' /* (1)! */
    storage_allowed_locations = ('s3://<BUCKET NAME>/'); /* (2)! */

-- Give the sysadmin access to use the integration.
grant usage on integration s3_integration to role sysadmin;

desc integration s3_integration;
select "property", "property_value" from TABLE(RESULT_SCAN(LAST_QUERY_ID()))
where "property" = 'STORAGE_AWS_IAM_USER_ARN' or "property" = 'STORAGE_AWS_EXTERNAL_ID';

  1. Copy arn

  2. Copy arn

use role sysadmin;

-- Create a database to store our schemas.
create database if not exists raw;

-- Create the schema. The schema stores all objectss.
create schema if not exists raw.aws;

/*
    Warehouses are synonymous with the idea of compute
    resources in other systems. We will use this
    warehouse to query our integration and to load data.
*/
create warehouse development 
    warehouse_size = xsmall
    auto_suspend = 30
    initially_suspended = true;

/*
    Integrations are on of those important features that
    account admins should do because it's allowing outside 
    snowflake connections to your data.
*/
use role accountadmin;

create storage integration s3_integration
    type = external_stage
    storage_provider = 's3'
    enabled = true
    storage_aws_role_arn = 'arn:aws:iam::484577546576:role/danielwilczak-role'
    storage_allowed_locations = ('s3://danielwilczak/');

-- Give the sysadmin access to use the integration.
grant usage on integration s3_integration to role sysadmin;

desc integration s3_integration;
select "property", "property_value" from TABLE(RESULT_SCAN(LAST_QUERY_ID()))
where "property" = 'STORAGE_AWS_IAM_USER_ARN' or "property" = 'STORAGE_AWS_EXTERNAL_ID';
property property_value
STORAGE_AWS_IAM_USER_ARN arn:aws:iam::001782626159:user/8pbb0000-s
STORAGE_AWS_EXTERNAL_ID GGB82720_SFCRole=2_vcN2MIiC7PW0OMOyA82W5BLJrqY=

Note the result, it will be used in the next step.

Grant Access in S3

Navigate back to the role: Click role

Click trusted relationship: Click trusted relationship

Click edit trust policy: Edit trust policy

Copy the policy json template code below and add your "STORAGE_AWS_IAM_USER_ARN" and "STORAGE_AWS_EXTERNAL_ID" from prior Snowflake step.

{
    "Version": "2012-10-17",
    "Statement": [
    {
        "Sid": "",
        "Effect": "Allow",
        "Principal": {
        "AWS": "<STORAGE_AWS_IAM_USER_ARN>" /* (1)! */
        },
        "Action": "sts:AssumeRole",
        "Condition": {
        "StringEquals": {
            "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>" /* (2)! */
        }
        }
    }
    ]
}

  1. From snowflake results we got earlier:

    property property_value
    STORAGE_AWS_IAM_USER_ARN arn:aws:iam::001782626159:user/8pbb0000-s
    STORAGE_AWS_EXTERNAL_ID GGB82720_SFCRole=2_vcN2MIiC7PW0OMOyA82W5BLJrqY=
  2. From snowflake results we got earlier:

    property property_value
    STORAGE_AWS_IAM_USER_ARN arn:aws:iam::001782626159:user/8pbb0000-s
    STORAGE_AWS_EXTERNAL_ID GGB82720_SFCRole=2_vcN2MIiC7PW0OMOyA82W5BLJrqY=
{
    "Version": "2012-10-17",
    "Statement": [
    {
        "Sid": "",
        "Effect": "Allow",
        "Principal": {
        "AWS": "arn:aws:iam::001782626159:user/8pbb0000-s"
        },
        "Action": "sts:AssumeRole",
        "Condition": {
        "StringEquals": {
            "sts:ExternalId": "GGB82720_SFCRole=2_vcN2MIiC7PW0OMOyA82W5BLJrqY="
        }
        }
    }
    ]
}

Enter your policy using the template / example above. Edit trust policy

Load the data

Lets setup the stage, file format, warehouse and finally load some json data.

use role sysadmin;
use database raw;
use schema aws;
use warehouse development;

/*
   Stages are synonymous with the idea of folders
   that can be either internal or external.
*/
create or replace stage s3
    storage_integration = s3_integration
    url = 's3://<BUCKET NAME>/' /* (1)! */
    directory = ( enable = true);

/* 
    Create a file format so the "copy into"
    command knows how to copy the data.
*/
create or replace file format json
    type = 'json';

-- Create the table.
create or replace table json (
    file_name varchar,
    data variant
);

-- Load json data.
copy into json(file_name,data)
from (
    select 
        metadata$filename,
        $1
    from
        @s3/json
        (file_format => json)
);

  1. Copy arn
use role sysadmin;
use database raw;
use schema aws;
use warehouse development;

/*
   Stages are synonymous with the idea of folders
   that can be either internal or external.
*/
create or replace stage s3
    storage_integration = s3_integration
    url = 's3://danielwilczak/'
    directory = ( enable = true);

/* 
    Create a file format so the "copy into"
    command knows how to copy the data.
*/
create or replace file format json
    type = 'json';

-- Create the table.
create or replace table json (
    file_name varchar,
    data variant
);

-- Load json data.
copy into json(file_name,data)
from (
    select 
        metadata$filename,
        $1
    from
        @s3/json
        (file_format => json)
);
file status
s3://danielwilczak/json/sample.json LOADED

Look at the data you just loaded.

select * from raw.aws.json; 

Results loading manually

Automatic Loading

Warning

If you have not manually loaded data yet from S3. Please go back and complete that section first.

Lets create a pipe to automate copying data into a table. Create the file format, table and pipe in snowflake. This approach automates the process so you don't have to manually name all the columns. This code will also give you your SQS queue string to be entered into AWS later.

Snowflake

Note

Sometimes it may take 1-2 minutes before you see data in the table. This depends on how AWS is feeling today.

In this case we'll load a csv file by automating the creation of the table and infering the names in the csv pipe.

use role sysadmin;
use database raw;
use schema aws;
use warehouse development;

/*
    Copy CSV data using a pipe without having
    to write out the column names.
*/
create or replace file format infer
    type = csv
    parse_header = true
    skip_blank_lines = true
    field_optionally_enclosed_by ='"'
    trim_space = true
    error_on_column_count_mismatch = false;

/*
    Creat the table with the column names
    generated for us.
*/
create or replace table csv
    using template (
        select array_agg(object_construct(*))
        within group (order by order_id)
        from table(
            infer_schema(        
            LOCATION=>'@s3/csv'
        , file_format => 'infer')
        )
    );

-- Create the pipe to load any new data.
create pipe csv auto_ingest=true as
    COPY into
        csv
    from
        @s3/csv

    file_format = (format_name= 'infer')
    match_by_column_name=case_insensitive;

/*
    Get the arn for the pipe. We will add this
    to aws in step the next AWS step.
*/
show pipes;
select "name", "notification_channel" as sqs_queue
from TABLE(RESULT_SCAN(LAST_QUERY_ID()));
name sqs_queue
CSV arn:aws:sqs:us-west-2:001782626159:sf-snowpipe-AIDAQ...

Grant Access in S3

Navigate to your bucket and click properties: Properties

Scroll down to "Create event notification": Create event notification

Add a name to the notification and select all object create notification: Properties

Scroll down and enter your sqs queue we got from our snowflake step and click "save changes": Properties

Almost done, in snowflake lets refresh the pipe so that we ingest all the current files.

Load the data

alter pipe raw.aws.csv refresh;
File Status
/sample_1.csv SENT

Result

Note

Sometimes it may take 1-2 minutes before you see data in the table. This depends on how AWS is feeling today.

Let's add more sample data into the S3 bucket CSV folder and see it added in snowflake ~30 seconds later. We can see this by doing a count on our table and seeing 20 records, whereas the original CSV only has 10 records.