Skip to content

Connect Snowflake to Google Cloud Storage

Goal of this tutorial is to load JSON and CSV data from a Google Cloud Storage using the Copy into sql command and Snowpipe to automate the ingestion process.

Video

Requirements

  • Snowflake account, you can use a free trial. We also assume no complex security needs.
  • Google cloud account, you can setup a free account to get started.

Download

  • Sample data (Link)

Manual Loading

Lets start by setting up a Snowflake connection to Google Cloud Storage and load json data. After that use snowpipe to automate the ingestion of CSV files.

Google Cloud

Sign into your google account.

Create project

If you don't have a project, start by selecting/creating a project. Project

Click create project. Create project

In our case we'll call the project danielwilczak and select the default no orginization for the locaition. Name project

Create cloud storage

We will select our new project and click cloud storage to create a storage bucket. Select project and cloud storage

Click create or create bucket. Click create bucket

I'm going to name the bucket danielwilczak as well. Copy this name, we will use it later. Container name

Upload sample data

Upload the sample data to your google cloud storage bucket (json/csv) provided in the data folder. Navigate into container

Snowflake

Let's setup snowflake by creating a worksheet in snowflake and add the code below with your bucket name from earlier and hit run:

/*
    We switch to "sysadmin" to create an object
    because it will be owned by that role.
*/
use role sysadmin;

--- Create a database to store our schemas.
create database if not exists raw;

-- Create the schema. The schema stores all objects.
create schema if not exists raw.gcp;

/*
    Warehouses are synonymous with the idea of compute
    resources in other systems. We will use this
    warehouse to query our integration and to load data.
*/
create warehouse if not exists development 
    warehouse_size = xsmall
    auto_suspend = 30
    initially_suspended = true;

/*
    Integrations are on of those important features that
    account admins should do because it's allowing outside 
    snowflake connections to your data.
*/
use role accountadmin;

create storage integration gcp_integration
    type = external_stage
    storage_provider = 'gcs'
    enabled = true
    storage_allowed_locations = ('gcs://<storage bucket name>'); /* (1)! */

-- Give the sysadmin access to use the integration.
grant usage on integration gcp_integration to role sysadmin;

desc storage integration gcp_integration;
select "property", "property_value" as principal from table(result_scan(last_query_id()))
where "property" = 'STORAGE_GCP_SERVICE_ACCOUNT';

  1. Queue URL
/*
    We switch to "sysadmin" to create an object
    because it will be owned by that role.
*/
use role sysadmin;

--- Create a database to store our schemas.
create database if not exists raw;

-- Create the schema. The schema stores all objectss.
create schema if not exists raw.gcp;

/*
    Warehouses are synonymous with the idea of compute
    resources in other systems. We will use this
    warehouse to query our integration and to load data.
*/
create warehouse if not exists development 
    warehouse_size = xsmall
    auto_suspend = 30
    initially_suspended = true;

/*
    Integrations are on of those important features that
    account admins should do because it's allowing outside 
    snowflake connections to your data.
*/
use role accountadmin;

create storage integration gcp_integration
    type = external_stage
    storage_provider = 'gcs'
    enabled = true
    storage_allowed_locations = ('gcs://danielwilczak');

-- give the sysadmin access to use the integration.
grant usage on integration gcp_integration to role sysadmin;

desc storage integration gcp_integration;
select "property", "property_value" as principal from table(result_scan(last_query_id()))
where "property" = 'STORAGE_GCP_SERVICE_ACCOUNT';
property principal
STORAGE_GCP_SERVICE_ACCOUNT rbnxkdkujw@prod3-f617.iam.gserviceaccount.com

Grant Access in Google Cloud

Lets navigate to IAM so that we can give snowflake access to our storage account. Navigate into IAM

Create a new role. New Role

Fill in the role information. We will call it snowflake. After that click Add Permissions. Role information

The permissions to select can be found on Snowflake's documentation. In this tutorial I have choosen Data loading and unloading. I have also provided a gif to show how to select the permissions because the user interface is terrible. Add permissions

Navigate back to our bucket. Click permissions, followed by add principle. Navigate to bucket and permissions

In the new principles section, add your STORAGE_GCP_SERVICE_ACCOUNT given by Snowflake earlier. Add snowflake service account

Now add your role by clicking select role -> custom -> snowflake. The last one will be your role name. Add role

If you get a 'Domain restricted sharing' error when you click 'Save'.

If you run into this error it's because google cloud has updated thier policy as of March 2024. We'll have to update them. First select your orginization (not your project), then go to IAM in the search, followed by clicking "grant access". navigate to grant access

Next we'll add our user email into the new principals area. We'll search and click on "Organization Policy Administrator". org policy admin

Click save. update

Next we'll want to update the policy. By searching IAM, selecting orgianization policies, searching domain and clicking on "Domain restricted sharing". update

Click Manage polcy. update

Note

"Allow All" is the simple approach but feel free to use more fine grain approach via Snowflake documentation.

We'll want to overide the parent policy with a new rule. Select replace the policy and then select "Allow All". Click done and "Set Polcy." and your good to go. update

The policy has been updated and you can retry adding the role to the new principal. update

Click Save and your finished with Google Cloud for manual loading. Click Save

Load the data

Lets setup the stage, file format and finally load some json data.

use database raw;
use schema gcp;
use role sysadmin;
use warehouse development;

/*
   Stages are synonymous with the idea of folders
   that can be either internal or external.
*/
create or replace stage raw.gcp.gcp
    storage_integration = gcp_integration
    url = 'gcs://<BUCKET NAME>/' /* (1)! */
    directory = ( enable = true);

/* 
    Create a file format so the "copy into"
    command knows how to copy the data.
*/
create or replace file format json
    type = 'json';


-- Create the table to load into.
create or replace table json (
    file_name varchar,
    data variant
);

-- Load the json file from the json folder.
copy into json(file_name,data)
from (
    select 
        metadata$filename,
        $1
    from
        @gcp/json
        (file_format => json)
);

  1. Queue URL
use database raw;
use schema gcp;
use role sysadmin;
use warehouse development;

/*
   Stages are synonymous with the idea of folders
   that can be either internal or external.
*/
create or replace stage raw.gcp.gcp
    storage_integration = gcp_integration
    url = 'gcs://danielwilczak/'
    directory = ( enable = true);

/* 
    Create a file format so the "copy into"
    command knows how to copy the data.
*/
create or replace file format raw.gcp.json
    type = 'json';

-- Create the table to load into.
create or replace table json (
    file_name varchar,
    data variant
);

-- Load the json file from the json folder.
copy into json(file_name,data)
from (
    select 
        metadata$filename,
        $1
    from
        @gcp/json
        (file_format => json)
);
file status
gcs://danielwilczak/json/sample.json LOADED

Look at the data you just loaded.

select * from raw.gcp.json; 

Result

Automatic Loading

Warning

If you have not manually loaded data yet from Google Cloud storage. Please go back and complete that section first.

Google Cloud

First we'll start by clicking on the project selector. Project select

We'll copy our project id. We will use this later. Project id

Next we'll open the google cloud command line interface (CLI). CLI

You will have to authorize the CLI but once opened a window below will show. Open CLI

First we'll set the current project

gcloud config set project <Project ID>
gcloud config set project danielwilczak

Set project

Next we'll create the notification topic for our cloud storage.

gsutil notification create -t snowpipe -f json gs://<Storage Bucket Name>/ /* (1)! */

  1. Queue URL
gsutil notification create -t snowpipe -f json gs://danielwilczak/

Create notification

After that we're done with the CLI. We can close it. Close CLI

Next we'll navigate to pub/sub using the search bar. Navigate to pub/sub

We'll click on the topic we created. Click Topic

We'll click create subscription. Click Create Subscription

We'll give it a name. I used snowpipe_subscription and make sure it's set to Pull. Name and Pull

We'll click create. Click create

Once created we'll copy the full subscription name. We will use this in the next step. Copy subscription name

Snowflake

Lets create the notification integration in a Snowflake worksheet.

use role accountadmin;

create or replace notification integration gcp_notification_integration
    type = queue
    notification_provider = gcp_pubsub
    enabled = true
    gcp_pubsub_subscription_name = '<SUBSCRIPTION NAME>'; /* (1)! */

grant usage on integration gcp_notification_integration to role sysadmin;

desc notification integration gcp_notification_integration;
select "property", "property_value" as principal from table(result_scan(last_query_id()))
where "property" = 'GCP_PUBSUB_SERVICE_ACCOUNT';

  1. Copy subscription name
use role accountadmin;

create or replace notification integration gcp_notification_integration
    type = queue
    notification_provider = gcp_pubsub
    enabled = true
    gcp_pubsub_subscription_name = 'projects/danielwilczak/subscriptions/snowpipe_subscription';

grant usage on integration gcp_notification_integration to role sysadmin;

desc notification integration gcp_notification_integration;
select "property", "property_value" as principal from table(result_scan(last_query_id()))
where "property" = 'GCP_PUBSUB_SERVICE_ACCOUNT';
property principal
GCP_PUBSUB_SERVICE_ACCOUNT geimkrazlq@prod3-f617.iam.gserviceaccount.com

Grant Access in Google Cloud

Lets go back into google cloud and click on our subscription. Navigate to subscription

Click show panel if not open already. Show panel

Click Add Principle. Add Principle

Add your principle login user we got from snowflake in the prior step. Add login user

Click select role and select Pub / Sub -> Pub / Sub Subscriber. Add role

Click Save. Click Save

Next we'll want to go back to IAM. Navigate IAM

Click Grant Access. Grant Access

Add your principle login user we got from snowflake in the prior step. Add login user part 2

Click select role and search Monitoring Viewer and click Monitoring Viewer. Add monitoring viewer role

Final Google Cloud step - Click Save. Add role

Load the data

Note

Sometimes it may take 1-2 minutes before you see data in the table. This depends on how Google Cloud is feeling today.

In this case we'll load a csv file by automating the creation of the table and infering the names in the csv pipe.

use role sysadmin;
use database raw;
use schema gcp;
use warehouse development;

/*
    Copy CSV data using a pipe without having
    to write out the column names.
*/
create or replace file format infer
    type = csv
    parse_header = true
    skip_blank_lines = true
    field_optionally_enclosed_by ='"'
    trim_space = true
    error_on_column_count_mismatch = false;

/*
    Creat the table with the column names
    generated for us.
*/
create or replace table csv
    using template (
        select array_agg(object_construct(*))
        within group (order by order_id)
        from table(
            infer_schema(        
            LOCATION=>'@gcp/csv'
        , file_format => 'infer')
        )
    );

/*
    Load the data and assign the pipe notification
    to know when a file is added.
*/
create or replace pipe csv 
    auto_ingest = true 
    integration = 'GCP_NOTIFICATION_INTEGRATION' 
    as

    COPY into
        csv
    from
        @gcp/csv

    file_format = (format_name= 'infer')
    match_by_column_name=case_insensitive;

/* 
    Refresh the state of the pipe to make
    sure it's updated with all files.
*/
alter pipe csv refresh;
File Status
/sample_1.csv SENT

Result

Note

Sometimes it may take 1-2 minutes before you see data in the table. This depends on how Google Cloud is feeling today.

Lets add more sample data into the google cloud storage bucket csv folder and see it added in snowflake ~30 seconds later. We can see this by doing a count on our table and see 20 records where the original csv only has 10 records.