Goal of this tutorial is to setup and integration with Fivetran, GCP and Snowflake to allow fivetran to load iceberg data into a GCP datalake and then integrate Snowflake to be the query engine on that data.
Video
Video still in development.
Requirements
Snowflake account, you can use a free trial. We also assume no complex security needs.
Google cloud account, you can setup a free account to get started.
Your GCP bucket and Snowflake acount have to be in the same region to be able to manage iceberg tables.
Lets start by setting up our Google Cloud Storage bucket. After that we'll connect fivetran to it and finish it off with the Snowflake integration to the Fivtran hosted Polaris Catalog.
Google Cloud
Sign into your google account.
Create project
If you don't have a project, start by selecting/creating a project.
Click create project.
In our case we'll call the project danielwilczak and select the default no organization for the location.
Create storage bucket
We will select our new project and click cloud storage to create a storage bucket.
Click create or create bucket.
Warning
Your GCP bucket and Snowflake acount have to be in the same region to be able to manage iceberg tables.
I'm going to name the bucket danielwilczak as well. Copy this name, we will use it later. We'll also want to make sure the region is the same as our Snowflake account.
Lets now select our new bucket and add a folder.
Click create folder.
Give it a name, I've named mine fivetran because that is who is loading the data.
You can now see we have our folder. Let's move to setting up Fivetran to load data into our folder.
Fivetran
Let's now move to connecting fivetran to our bucket. To start let's add our bucket as a destination. Click destination in fivetran.
Select GCP cloud storage as the location.
Give the destination a name.
We'll first want to copy our service account URL to give Fivetran permission to use the bucket.
Authorize Fivetran in GCP
Let's head back to our bucket and click the check box followed by the permissions button.
Click add principle.
Paste our service account URL given by Fivetran and then click select role.
We'll want to search for "Storage Object Admin" and then click on it in the dropdown.
Once both are added click "Save".
Now we can added our bucket name, folder and click save and test.
Once tested your destination is ready to use. Let's view the destination.
Snowflake
Warning
Currently you must grab the catalog code when the destination is created otherwise the key will be hidden '*'. Fivetran is working on fixing this issue.
Now that our data is loaded into our GCP bucket let's connect Snowflake to it using Fivetrans hosted Polaris catalog. Let's start by going back into our GCP destination in Fivetran to copy some Snowflake code.
Once in the destination, on the top navigation bar, select "Catalog Integration", select Snowflake and then copy the sql code. We will use this in Snowflake next.
Let's head into Snowflake and add the code from fivetran and the line below. This will allow us to see the tables that are managed by the catalog.
useroleaccountadmin;-- Get our principal url to be used in GCP so we can connect to the bucket.describeexternalvolume<VOLUMENAME>;select"property",REPLACE(GET_PATH(PARSE_JSON("property_value"),'STORAGE_GCP_SERVICE_ACCOUNT')::STRING,'"','')ASurlfromtable(result_scan(last_query_id()))where"property"='STORAGE_LOCATION_1';
useroleaccountadmin;-- Get our principal url to be used in GCP so we can connect to the bucket.describeexternalvolumefivetran_volume_throwback_refinery;select"property",REPLACE(GET_PATH(PARSE_JSON("property_value"),'STORAGE_GCP_SERVICE_ACCOUNT')::STRING,'"','')ASurlfromtable(result_scan(last_query_id()))where"property"='STORAGE_LOCATION_1';
Let's navigate to IAM so that we can give snowflake access to our storage account.
Create a new role.
Fill in the role information. We will call it snowflake. After that click Add Permissions.
The permissions to select can be found on Snowflake's documentation. In this tutorial I have chosen Data loading and unloading. I have also provided a gif to show how to select the permissions because the user interface is terrible.
Navigate back to our bucket. Click permissions, followed by add principle.
In the new principles section, add your URL given by Snowflake earlier.
Now add your role by clicking select role -> custom -> snowflake. The last one will be your role name.
If you get a 'Domain restricted sharing' error when you click 'Save'.
If you run into this error it's because google cloud has updated their policy as of March 2024. We'll have to update them. First select your organization (not your project), then go to IAM in the search, followed by clicking "grant access".
Next we'll add our user email into the new principals area. We'll search and click on "Organization Policy Administrator".
Click save.
Next we'll want to update the policy. By searching IAM, selecting organization policies, searching domain and clicking on "Domain restricted sharing".
We'll want to override the parent policy with a new rule. Select replace the policy and then select "Allow All". Click done and "Set Policy." and your good to go.
The policy has been updated and you can retry adding the role to the new principal.
Click Save and you're finished GCP setup.
Load source data
This section will be vary depending on what source you plan to use. In my example I would load some data from google analytics by adding it as a connector.
Now if we check our bucket you can see fivetran has loaded the google analytics data into our data lake.
Create - Table
Let's create a database, schema and our first table in Snowflake using everything we have created.
-- Create the database and schema to store the tables.createorreplacedatabase<Databasename>;createorreplaceschema<Schema>;-- Create the table from the data and the catalog.createorreplaceicebergtable<Tablename>external_volume='<volume>'catalog='<catalog>'catalog_namespace='<source name>'catalog_table_name='<table name>'auto_refresh=true;
createorreplacedatabasefivetran;createorreplaceschemagoogle_analytics_4;-- Create the table from the data and the catalog.createorreplaceicebergtableaccountsexternal_volume='fivetran_volume_throwback_refinery'catalog='fivetran_catalog_throwback_refinery'catalog_namespace='google_analytics_4'catalog_table_name='accounts'auto_refresh=true;
UPDATE
Now that we have our first table created let's query from it.
Now let's automate the creation of all the tables using a Py. We'll add this python code into a "thon scriptPython Worksheet". We'll want to fill in our "catalog integration name" and "external volume name" that we got from fivetran.
importsnowflake.snowparkassnowparkfromsnowflake.snowpark.functionsimportcol,litimportjsonimportdatetimeimportpandasaspdfromfunctoolsimportreduceCATALOG_INTEGRATION_NAME='<Catalog name>'EXTERNAL_VOLUME_NAME='<External volume name>'defmain(session:snowpark.Session):# Fetch a list of tables from the external catalogexternal_catalog_tables=[rowforrsinsession.sql(f"select SYSTEM$LIST_ICEBERG_TABLES_FROM_CATALOG('{CATALOG_INTEGRATION_NAME}','',0) as TABLE_LIST").collect()forrowinjson.loads(rs.as_dict()["TABLE_LIST"])]# Convert the tables to the appropriate CREATE SCHEMA and CREATE ICEBERG TABLE statementsstatements=[sqlfortableinexternal_catalog_tablesforsqlin[create_schema(table),create_table(table)]]# Execute each of the statements and merge the resulting dataframes into one combined dataframe to show the userresults=reduce(lambdaleft,right:left.union_all(right),[exec(session,statement)forstatementinstatements])# Identify any tables that exist in CURRENT_DATABASE() that are not tracked in the external catalog and optionallysync_dropped_tables(session,external_catalog_tables,drop=False)returnresults.sort(col('statement_timestamp'))defcreate_schema(table):returnf"""CREATE SCHEMA if not exists {table['namespace']}EXTERNAL_VOLUME = '{EXTERNAL_VOLUME_NAME}'CATALOG='{CATALOG_INTEGRATION_NAME}'"""defcreate_table(table):returnf"""CREATE OR REPLACE ICEBERG TABLE {table['namespace']}.{table['name']}EXTERNAL_VOLUME = '{EXTERNAL_VOLUME_NAME}'CATALOG='{CATALOG_INTEGRATION_NAME}'CATALOG_NAMESPACE= '{table['namespace']}'CATALOG_TABLE_NAME = '{table['name']}'AUTO_REFRESH=TRUE;"""defexec_and_aggregate_results(session:snowpark.Session,dataframe:snowpark.DataFrame,sql:str)->snowpark.DataFrame:results=session.sql(sql)results=results.with_column('statement_timestamp',lit(datetime.datetime.now()))results=results.with_column('statement',lit(sql))returndataframe.union_all(results)ifdataframeelseresultsdefexec(session:snowpark.Session,sql:str)->snowpark.DataFrame:results=session.sql(sql)results=results.with_column('statement_timestamp',lit(datetime.datetime.now()))results=results.with_column('statement',lit(sql))returnresultsdefsync_dropped_tables(session:snowpark.Session,external_catalog_tables:list,drop=False):all_tables_df=session.sql("""SELECT CONCAT(table_schema, '.', table_name) AS FQTN, table_schema, table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_catalog = CURRENT_DATABASE() and table_schema NOT IN ('INFORMATION_SCHEMA', 'PUBLIC')""").toPandas()external_catalog_tables_df=pd.DataFrame.from_dict(external_catalog_tables)external_catalog_tables_df['FQTN']=external_catalog_tables_df["namespace"].str.upper()+"."+external_catalog_tables_df["name"].str.upper()tables_to_drop=all_tables_df.merge(external_catalog_tables_df,on='FQTN',how='left',indicator=True)tables_to_drop=tables_to_drop[tables_to_drop['_merge']=='left_only'].drop(columns=['_merge'])drop_statements=tables_to_drop["FQTN"].map(lambdafqtn:f"""DROP TABLE {fqtn}""")forsqlindrop_statements:ifdrop:session.sql(sql)print(f"Dropped orphan table: {sql}")else:print("Orphan table detected. Run the following to drop it:")print(sql)
importsnowflake.snowparkassnowparkfromsnowflake.snowpark.functionsimportcol,litimportjsonimportdatetimeimportpandasaspdfromfunctoolsimportreduceCATALOG_INTEGRATION_NAME='fivetran_catalog_throwback_refinery'EXTERNAL_VOLUME_NAME='fivetran_volume_throwback_refinery'defmain(session:snowpark.Session):# Fetch a list of tables from the external catalogexternal_catalog_tables=[rowforrsinsession.sql(f"select SYSTEM$LIST_ICEBERG_TABLES_FROM_CATALOG('{CATALOG_INTEGRATION_NAME}','',0) as TABLE_LIST").collect()forrowinjson.loads(rs.as_dict()["TABLE_LIST"])]# Convert the tables to the appropriate CREATE SCHEMA and CREATE ICEBERG TABLE statementsstatements=[sqlfortableinexternal_catalog_tablesforsqlin[create_schema(table),create_table(table)]]# Execute each of the statements and merge the resulting dataframes into one combined dataframe to show the userresults=reduce(lambdaleft,right:left.union_all(right),[exec(session,statement)forstatementinstatements])# Identify any tables that exist in CURRENT_DATABASE() that are not tracked in the external catalog and optionallysync_dropped_tables(session,external_catalog_tables,drop=False)returnresults.sort(col('statement_timestamp'))defcreate_schema(table):returnf"""CREATE SCHEMA if not exists {table['namespace']}EXTERNAL_VOLUME = '{EXTERNAL_VOLUME_NAME}'CATALOG='{CATALOG_INTEGRATION_NAME}'"""defcreate_table(table):returnf"""CREATE OR REPLACE ICEBERG TABLE {table['namespace']}.{table['name']}EXTERNAL_VOLUME = '{EXTERNAL_VOLUME_NAME}'CATALOG='{CATALOG_INTEGRATION_NAME}'CATALOG_NAMESPACE= '{table['namespace']}'CATALOG_TABLE_NAME = '{table['name']}'AUTO_REFRESH=TRUE;"""defexec_and_aggregate_results(session:snowpark.Session,dataframe:snowpark.DataFrame,sql:str)->snowpark.DataFrame:results=session.sql(sql)results=results.with_column('statement_timestamp',lit(datetime.datetime.now()))results=results.with_column('statement',lit(sql))returndataframe.union_all(results)ifdataframeelseresultsdefexec(session:snowpark.Session,sql:str)->snowpark.DataFrame:results=session.sql(sql)results=results.with_column('statement_timestamp',lit(datetime.datetime.now()))results=results.with_column('statement',lit(sql))returnresultsdefsync_dropped_tables(session:snowpark.Session,external_catalog_tables:list,drop=False):all_tables_df=session.sql("""SELECT CONCAT(table_schema, '.', table_name) AS FQTN, table_schema, table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_catalog = CURRENT_DATABASE() and table_schema NOT IN ('INFORMATION_SCHEMA', 'PUBLIC')""").toPandas()external_catalog_tables_df=pd.DataFrame.from_dict(external_catalog_tables)external_catalog_tables_df['FQTN']=external_catalog_tables_df["namespace"].str.upper()+"."+external_catalog_tables_df["name"].str.upper()tables_to_drop=all_tables_df.merge(external_catalog_tables_df,on='FQTN',how='left',indicator=True)tables_to_drop=tables_to_drop[tables_to_drop['_merge']=='left_only'].drop(columns=['_merge'])drop_statements=tables_to_drop["FQTN"].map(lambdafqtn:f"""DROP TABLE {fqtn}""")forsqlindrop_statements:ifdrop:session.sql(sql)print(f"Dropped orphan table: {sql}")else:print("Orphan table detected. Run the following to drop it:")print(sql)
status
STATEMENT_TIMESTAMP
STATEMENT
Schema GOOGLE_ANALYTICS successfully created.
2025-05-23 14:28:14.514
CREATE SCHEMA if not exists google_analytics EXTERNAL_VOLUME = 'fivetran_volume_buffer_concierge' CATALOG='fivetran_catalog_buffer_concierge'