Home Big Data Use Snowflake with Amazon MWAA to orchestrate knowledge pipelines

Use Snowflake with Amazon MWAA to orchestrate knowledge pipelines

0
Use Snowflake with Amazon MWAA to orchestrate knowledge pipelines

[ad_1]

This weblog put up is co-written with James Solar from Snowflake.

Prospects depend on knowledge from totally different sources akin to cell purposes, clickstream occasions from web sites, historic knowledge, and extra to infer significant patterns to optimize their merchandise, providers, and processes. With a knowledge pipeline, which is a set of duties used to automate the motion and transformation of knowledge between totally different techniques, you may scale back the effort and time wanted to realize insights from the info. Apache Airflow and Snowflake have emerged as highly effective applied sciences for knowledge administration and evaluation.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed workflow orchestration service for Apache Airflow that you should utilize to arrange and function end-to-end knowledge pipelines within the cloud at scale. The Snowflake Information Cloud offers a single supply of fact for all of your knowledge wants and permits your organizations to retailer, analyze, and share massive quantities of knowledge. The Apache Airflow open-source group offers over 1,000 pre-built operators (plugins that simplify connections to providers) for Apache Airflow to construct knowledge pipelines.

On this put up, we offer an summary of orchestrating your knowledge pipeline utilizing Snowflake operators in your Amazon MWAA surroundings. We outline the steps wanted to arrange the mixing between Amazon MWAA and Snowflake. The answer offers an end-to-end automated workflow that features knowledge ingestion, transformation, analytics, and consumption.

Overview of answer

The next diagram illustrates our answer structure.

Solution Overview

The information used for transformation and evaluation is predicated on the publicly accessible New York Citi Bike dataset. The information (zipped information), which incorporates rider demographics and journey knowledge, is copied from the general public Citi Bike Amazon Easy Storage Service (Amazon S3) bucket in your AWS account. Information is decompressed and saved in a special S3 bucket (reworked knowledge could be saved in the identical S3 bucket the place knowledge was ingested, however for simplicity, we’re utilizing two separate S3 buckets). The reworked knowledge is then made accessible to Snowflake for knowledge evaluation. The output of the queried knowledge is printed to Amazon Easy Notification Service (Amazon SNS) for consumption.

Amazon MWAA makes use of a directed acyclic graph (DAG) to run the workflows. On this put up, we run three DAGs:

The next diagram illustrates this workflow.

DAG run workflow

See the GitHub repo for the DAGs and different information associated to the put up.

Observe that on this put up, we’re utilizing a DAG to create a Snowflake connection, however you too can create the Snowflake connection utilizing the Airflow UI or CLI.

Conditions

To deploy the answer, you need to have a fundamental understanding of Snowflake and Amazon MWAA with the next stipulations:

  • An AWS account in an AWS Area the place Amazon MWAA is supported.
  • A Snowflake account with admin credentials. In case you don’t have an account, join a 30-day free trial. Choose the Snowflake enterprise version for the AWS Cloud platform.
  • Entry to Amazon MWAA, Secrets and techniques Supervisor, and Amazon SNS.
  • On this put up, we’re utilizing two S3 buckets, known as airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID. Amazon S3 helps world buckets, which signifies that every bucket title should be distinctive throughout all AWS accounts in all of the Areas inside a partition. If the S3 bucket title is already taken, select a special S3 bucket title. Create the S3 buckets in your AWS account. We add content material to the S3 bucket later within the put up. Substitute ACCOUNT_ID with your individual AWS account ID or some other distinctive identifier. The bucket particulars are as follows:
    • airflow-blog-bucket-ACCOUNT_ID – The highest-level bucket for Amazon MWAA-related information.
    • airflow-blog-bucket-ACCOUNT_ID/necessities – The bucket used for storing the necessities.txt file wanted to deploy Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags – The bucked used for storing the DAG information to run workflows in Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries – The bucket used for storing the Snowflake SQL queries.
    • citibike-tripdata-destination-ACCOUNT_ID – The bucket used for storing the reworked dataset.

When implementing the answer on this put up, exchange references to airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID with the names of your individual S3 buckets.

Arrange the Amazon MWAA surroundings

First, you create an Amazon MWAA surroundings. Earlier than deploying the surroundings, add the necessities file to the airflow-blog-bucket-ACCOUNT_ID/necessities S3 bucket. The necessities file is predicated on Amazon MWAA model 2.6.3. In case you’re testing on a special Amazon MWAA model, replace the necessities file accordingly.

Full the next steps to arrange the surroundings:

  1. On the Amazon MWAA console, select Create surroundings.
  2. Present a reputation of your alternative for the surroundings.
  3. Select Airflow model 2.6.3.
  4. For the S3 bucket, enter the trail of your bucket (s3:// airflow-blog-bucket-ACCOUNT_ID).
  5. For the DAGs folder, enter the DAGs folder path (s3:// airflow-blog-bucket-ACCOUNT_ID/dags).
  6. For the necessities file, enter the necessities file path (s3:// airflow-blog-bucket-ACCOUNT_ID/ necessities/necessities.txt).
  7. Select Subsequent.
  8. Beneath Networking, select your current VPC or select Create MWAA VPC.
  9. Beneath Internet server entry, select Public community.
  10. Beneath Safety teams, depart Create new safety group chosen.
  11. For the Setting class, Encryption, and Monitoring sections, depart all values as default.
  12. Within the Airflow configuration choices part, select Add customized configuration worth and configure two values:
    1. Set Configuration possibility to secrets and techniques.backend and Customized worth to airflow.suppliers.amazon.aws.secrets and techniques.secrets_manager.SecretsManagerBackend.
    2. Set Configuration possibility to secrets and techniques.backend_kwargs and Customized worth to {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}.                      Configuration options for secret manager
  13. Within the Permissions part, depart the default settings and select Create a brand new function.
  14. Select Subsequent.
  15. When the Amazon MWAA surroundings us accessible, assign S3 bucket permissions to the AWS Id and Entry Administration (IAM) execution function (created through the Amazon MWAA set up).

MWAA execution role
It will direct you to the created execution function on the IAM console.

For testing functions, you may select Add permissions and add the managed AmazonS3FullAccess coverage to the person as a substitute of offering restricted entry. For this put up, we offer solely the required entry to the S3 buckets.

  1. On the drop-down menu, select Create inline coverage.
  2. For Choose Service, select S3.
  3. Beneath Entry stage, specify the next:
    1. Develop Checklist stage and choose ListBucket.
    2. Develop Learn stage and choose GetObject.
    3. Develop Write stage and choose PutObject.
  4. Beneath Sources, select Add ARN.
  5. On the Textual content tab, present the next ARNs for S3 bucket entry:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID (use your individual bucket).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID (use your individual bucket).
    3. arn:aws:s3:::tripdata (that is the general public S3 bucket the place the Citi Bike dataset is saved; use the ARN as specified right here).
  6. Beneath Sources, select Add ARN.
  7. On the Textual content tab, present the next ARNs for S3 bucket entry:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID/* (ensure to incorporate the asterisk).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID /*.
    3. arn:aws:s3:::tripdata/* (that is the general public S3 bucket the place the Citi Bike dataset is saved, use the ARN as specified right here).
  8. Select Subsequent.
  9. For Coverage title, enter S3ReadWrite.
  10. Select Create coverage.
  11. Lastly, present Amazon MWAA with permission to entry Secrets and techniques Supervisor secret keys.

This step offers the Amazon MWAA execution function to your Amazon MWAA surroundings learn entry to the key key in Secrets and techniques Supervisor.

The execution function ought to have the insurance policies MWAA-Execution-Coverage*, S3ReadWrite, and SecretsManagerReadWrite connected to it.

MWAA execution role policies

When the Amazon MWAA surroundings is obtainable, you may register to the Airflow UI from the Amazon MWAA console utilizing hyperlink for Open Airflow UI.

Airflow UI access

Arrange an SNS subject and subscription

Subsequent, you create an SNS subject and add a subscription to the subject. Full the next steps:

  1. On the Amazon SNS console, select Matters from the navigation pane.
  2. Select Create subject.
  3. For Subject kind, select Customary.
  4. For Title, enter mwaa_snowflake.
  5. Go away the remaining as default.
  6. After you create the subject, navigate to the Subscriptions tab and select Create subscription.
    SNS topic
  7. For Subject ARN, select mwaa_snowflake.
  8. Set the protocol to E mail.
  9. For Endpoint, enter your e-mail ID (you’re going to get a notification in your e-mail to simply accept the subscription).

By default, solely the subject proprietor can publish and subscribe to the subject, so you might want to modify the Amazon MWAA execution function entry coverage to permit Amazon SNS entry.

  1. On the IAM console, navigate to the execution function you created earlier.
  2. On the drop-down menu, select Create inline coverage.
    MWAA execution role SNS policy
  3. For Choose service, select SNS.
  4. Beneath Actions, increase Write entry stage and choose Publish.
  5. Beneath Sources, select Add ARN.
  6. On the Textual content tab, specify the ARN arn:aws:sns:<<area>>:<<our_account_ID>>:mwaa_snowflake.
  7. Select Subsequent.
  8. For Coverage title, enter SNSPublishOnly.
  9. Select Create coverage.

Configure a Secrets and techniques Supervisor secret

Subsequent, we arrange Secrets and techniques Supervisor, which is a supported various database for storing Snowflake connection info and credentials.

To create the connection string, the Snowflake host and account title is required. Log in to your Snowflake account, and below the Worksheets menu, select the plus signal and choose SQL worksheet. Utilizing the worksheet, run the next SQL instructions to search out the host and account title.

Run the next question for the host title:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'"','') AS HOST
FROM HOSTLIST
WHERE VALUE:kind="SNOWFLAKE_DEPLOYMENT_REGIONLESS";

Run the next question for the account title:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT
FROM HOSTLIST
WHERE VALUE:kind="SNOWFLAKE_DEPLOYMENT";

Subsequent, we configure the key in Secrets and techniques Supervisor.

  1. On the Secrets and techniques Supervisor console, select Retailer a brand new secret.
  2. For Secret kind, select Different kind of secret.
  3. Beneath Key/Worth pairs, select the Plaintext tab.
  4. Within the textual content subject, enter the next code and modify the string to replicate your Snowflake account info:

{"host": "<<snowflake_host_name>>", "account":"<<snowflake_account_name>>","person":"<<snowflake_username>>","password":"<<snowflake_password>>","schema":"mwaa_schema","database":"mwaa_db","function":"accountadmin","warehouse":"dev_wh"}

For instance:

{"host": "xxxxxx.snowflakecomputing.com", "account":"xxxxxx" ,"person":"xxxxx","password":"*****","schema":"mwaa_schema","database":"mwaa_db", "function":"accountadmin","warehouse":"dev_wh"}

The values for the database title, schema title, and function needs to be as talked about earlier. The account, host, person, password, and warehouse can differ based mostly in your setup.

Secret information

Select Subsequent.

  1. For Secret title, enter airflow/connections/snowflake_accountadmin.
  2. Go away all different values as default and select Subsequent.
  3. Select Retailer.

Pay attention to the Area during which the key was created below Secret ARN. We later outline it as a variable within the Airflow UI.

Configure Snowflake entry permissions and IAM function

Subsequent, log in to your Snowflake account. Make sure the account you’re utilizing has account administrator entry. Create a SQL worksheet. Beneath the worksheet, create a warehouse named dev_wh.

The next is an instance SQL command:

CREATE OR REPLACE WAREHOUSE dev_wh 
 WITH WAREHOUSE_SIZE = 'xsmall' 
 AUTO_SUSPEND = 60 
 INITIALLY_SUSPENDED = true
 AUTO_RESUME = true
 MIN_CLUSTER_COUNT = 1
 MAX_CLUSTER_COUNT = 5;

For Snowflake to learn knowledge from and write knowledge to an S3 bucket referenced in an exterior (S3 bucket) stage, a storage integration is required. Observe the steps outlined in Possibility 1: Configuring a Snowflake Storage Integration to Entry Amazon S3(solely carry out Steps 1 and a pair of, as described on this part).

Configure entry permissions for the S3 bucket

Whereas creating the IAM coverage, a pattern coverage doc code is required (see the next code), which offers Snowflake with the required permissions to load or unload knowledge utilizing a single bucket and folder path. The bucket title used on this put up is citibike-tripdata-destination-ACCOUNT_ID. It is best to modify it to replicate your bucket title.

{
  "Model": "2012-10-17",
  "Assertion": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:DeleteObject",
        "s3:DeleteObjectVersion"
      ],
      "Useful resource": "arn:aws:s3::: citibike-tripdata-destination-ACCOUNT_ID/*"
    },
    {
      "Impact": "Enable",
      "Motion": [
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Useful resource": "arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID"
    }
  ]
}

Create the IAM function

Subsequent, you create the IAM function to grant privileges on the S3 bucket containing your knowledge information. After creation, report the Position ARN worth situated on the function abstract web page.

Snowflake IAM role

Configure variables

Lastly, configure the variables that will probably be accessed by the DAGs in Airflow. Log in to the Airflow UI and on the Admin menu, select Variables and the plus signal.

Airflow variables

Add 4 variables with the next key/worth pairs:

  • Key aws_role_arn with worth <<snowflake_aws_role_arn>> (the ARN for function mysnowflakerole famous earlier)
  • Key destination_bucket with worth <<bucket_name>> (for this put up, the bucket utilized in citibike-tripdata-destination-ACCOUNT_ID)
  • Key target_sns_arn with worth <<sns_Arn>> (the SNS subject in your account)
  • Key sec_key_region with worth <<region_of_secret_deployment>> (the Area the place the key in Secrets and techniques Supervisor was created)

The next screenshot illustrates the place to search out the SNS subject ARN.

SNS topic ARN

The Airflow UI will now have the variables outlined, which will probably be referred to by the DAGs.

Airflow variables list

Congratulations, you’ve accomplished all of the configuration steps.

Run the DAG

Let’s have a look at how one can run the DAGs. To recap:

  • DAG1 (create_snowflake_connection_blog.py) – Creates the Snowflake connection in Apache Airflow. This connection will probably be used to authenticate with Snowflake. The Snowflake connection string is saved in Secrets and techniques Supervisor, which is referenced within the DAG file.
  • DAG2 (create-snowflake_initial-setup_blog.py) – Creates the database, schema, storage integration, and stage in Snowflake.
  • DAG3 (run_mwaa_datapipeline_blog.py) – Runs the info pipeline, which is able to unzip information from the supply public S3 bucket and duplicate them to the vacation spot S3 bucket. The following job will create a desk in Snowflake to retailer the info. Then the info from the vacation spot S3 bucket will probably be copied into the desk utilizing a Snowflake stage. After the info is efficiently copied, a view will probably be created in Snowflake, on prime of which the SQL queries will probably be run.

To run the DAGs, full the next steps:

  1. Add the DAGs to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags.
  2. Add the SQL question information to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries.
  3. Log in to the Apache Airflow UI.
  4. Find DAG1 (create_snowflake_connection_blog), un-pause it, and select the play icon to run it.

You possibly can view the run state of the DAG utilizing the Grid or Graph view within the Airflow UI.

Dag1 run

After DAG1 runs, the Snowflake connection snowflake_conn_accountadmin is created on the Admin, Connections menu.

  1. Find and run DAG2 (create-snowflake_initial-setup_blog).

Dag2 run

After DAG2 runs, the next objects are created in Snowflake:

  • The database mwaa_db
  • The schema mwaa_schema
  • The storage integration mwaa_citibike_storage_int
  • The stage mwaa_citibike_stg

Earlier than working the ultimate DAG, the belief relationship for the IAM person must be up to date.

  1. Log in to your Snowflake account utilizing your admin account credentials.
  2. Open your SQL worksheet created earlier and run the next command:
DESC INTEGRATION mwaa_citibike_storage_int;

mwaa_citibike_storage_int is the title of the mixing created by the DAG2 within the earlier step.

From the output, report the property worth of the next two properties:

  • STORAGE_AWS_IAM_USER_ARN – The IAM person created to your Snowflake account.
  • STORAGE_AWS_EXTERNAL_ID – The exterior ID that’s wanted to ascertain a belief relationship.

Now we grant the Snowflake IAM person permissions to entry bucket objects.

  1. On the IAM console, select Roles within the navigation pane.
  2. Select the function mysnowflakerole.
  3. On the Belief relationships tab, select Edit belief relationship.
  4. Modify the coverage doc with the DESC STORAGE INTEGRATION output values you recorded. For instance:
{
  "Model": "2012-10-17",
  "Assertion": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::5xxxxxxxx:user/mgm4-s- ssca0079"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "AWSPARTNER_SFCRole=4_vsarJrupIjjJh77J9Nxxxx/j98="
        }
      }
    }
  ]
}

The AWS function ARN and ExternalId will probably be totally different to your surroundings based mostly on the output of the DESC STORAGE INTEGRATION question

Trust relationship

  1. Find and run the ultimate DAG (run_mwaa_datapipeline_blog).

On the finish of the DAG run, the info is prepared for querying. On this instance, the question (discovering the highest begin and vacation spot stations) is run as a part of the DAG and the output could be seen from the Airflow XCOMs UI.

Xcoms

Within the DAG run, the output can also be printed to Amazon SNS and based mostly on the subscription, an e-mail notification is shipped out with the question output.

Email

One other methodology to visualise the outcomes is immediately from the Snowflake console utilizing the Snowflake worksheet. The next is an instance question:

SELECT START_STATION_NAME,
COUNT(START_STATION_NAME) C FROM MWAA_DB.MWAA_SCHEMA.CITIBIKE_VW 
GROUP BY 
START_STATION_NAME ORDER BY C DESC LIMIT 10;

Snowflake visual

There are other ways to visualise the output based mostly in your use case.

As we noticed, DAG1 and DAG2 have to be run just one time to arrange the Amazon MWAA connection and Snowflake objects. DAG3 could be scheduled to run each week or month. With this answer, the person analyzing the info doesn’t must log in to both Amazon MWAA or Snowflake. You possibly can have an automatic workflow triggered on a schedule that may ingest the newest knowledge from the Citi Bike dataset and supply the highest begin and vacation spot stations for the given dataset.

Clear up

To keep away from incurring future prices, delete the AWS sources (IAM customers and roles, Secrets and techniques Supervisor secrets and techniques, Amazon MWAA surroundings, SNS matters and subscription, S3 buckets) and Snowflake sources (database, stage, storage integration, view, tables) created as a part of this put up.

Conclusion

On this put up, we demonstrated how one can arrange an Amazon MWAA connection for authenticating to Snowflake in addition to to AWS utilizing AWS person credentials. We used a DAG to automate creating the Snowflake objects akin to database, tables, and stage utilizing SQL queries. We additionally orchestrated the info pipeline utilizing Amazon MWAA, which ran duties associated to knowledge transformation in addition to Snowflake queries. We used Secrets and techniques Supervisor to retailer Snowflake connection info and credentials and Amazon SNS to publish the info output for finish consumption.

With this answer, you’ve an automatic end-to-end orchestration of your knowledge pipeline encompassing ingesting, transformation, evaluation, and knowledge consumption.

To be taught extra, consult with the next sources:


Concerning the authors

Payal Singh is a Associate Options Architect at Amazon Internet Providers, centered on the Serverless platform. She is accountable for serving to accomplice and clients modernize and migrate their purposes to AWS.

James Solar is a Senior Associate Options Architect at Snowflake. He actively collaborates with strategic cloud companions like AWS, supporting product and repair integrations, in addition to the event of joint options. He has held senior technical positions at tech corporations akin to EMC, AWS, and MapR Applied sciences. With over 20 years of expertise in storage and knowledge analytics, he additionally holds a PhD from Stanford College.

Bosco Albuquerque is a Sr. Associate Options Architect at AWS and has over 20 years of expertise working with database and analytics merchandise from enterprise database distributors and cloud suppliers. He has helped know-how corporations design and implement knowledge analytics options and merchandise.

Manuj Arora is a Sr. Options Architect for Strategic Accounts in AWS. He focuses on Migration and Modernization capabilities and choices in AWS. Manuj has labored as a Associate Success Options Architect in AWS over the past 3 years and labored with companions like Snowflake to construct answer blueprints which can be leveraged by the purchasers. Outdoors of labor, he enjoys touring, taking part in tennis and exploring new locations with household and mates.

[ad_2]

LEAVE A REPLY

Please enter your comment!
Please enter your name here