Integrating Spark with Snowflake

Migrate Data from MongoDB to Snowflake
March 2, 2018
Migrate Data from Greenplum to Snowflake
March 15, 2018

Integrating Spark with Snowflake

Configuring Spark to use Snowflake using PySpark

Using PySpark, the following script allows access to the AWS S3 bucket/directory used to exchange data between Spark and Snowflake.

The script uses the standard AWS method of providing a pair of awsAccessKeyId and awsSecretAccessKey values. These values should also be used to configure the Spark/Hadoop environment to access S3.

Replace the following variables in the script:

• AWS_ACCESS_KEY
• AWS_SECRET_KEY
• SF_ACCOUNT: Account name, e.g. yourcompany.
• SF_USER: User login name.
• SF_PWD: User password to log in with.
• SF_DB: The default database to use for the session after connecting.
• SF_SCHEMA: The default schema to use for the session after connecting.
• SF_WAREHOUSE: The default virtual warehouse to use for the session after connecting.

Configuration Script:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId";, "[AWS_ACCESS_KEY]")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey";, "[AWS_SECRET_KEY]")
defaultOptions = {
  "sfURL": "[SF_ACCOUNT].snowflakecomputing.com";,
  "sfAccount": "[SF_ACCOUNT]",
  "sfUser": "[SF_USER]",
  "sfPassword": "[SF_PWD]",
  "sfDatabase": "[SF_DB]",
  "sfSchema": "[SF_SCHEMA]",
  "sfWarehouse": "[SF_WAREHOUSE]",
  "awsAccessKey": sc._jsc.hadoopConfiguration().get("fs.s3n.awsAccessKeyId";),
  "awsSecretKey": sc._jsc.hadoopConfiguration().get("fs.s3n.awsSecretAccessKey";),
  "tempdir": "s3n://[AWS_TEMP_S3]"
}
def sqldf(sql):
  return(sqlContext.read
  .format("com.snowflakedb.spark.snowflakedb";)
  .option("sfURL", defaultOptions["sfURL"])
  .option("sfAccount", defaultOptions["sfAccount"])
  .option("sfUser", defaultOptions["sfUser"])
  .option("sfPassword", defaultOptions["sfPassword"])
  .option("sfDatabase", defaultOptions["sfDatabase"])
  .option("sfSchema", defaultOptions["sfSchema"])
  .option("sfWarehouse", defaultOptions["sfWarehouse"])
  .option("awsAccessKey", sc._jsc.hadoopConfiguration().get("fs.s3n.awsAccessKeyId";))
  .option("awsSecretKey", sc._jsc.hadoopConfiguration().get("fs.s3n.awsSecretAccessKey";))
  .option("tempdir", defaultOptions["tempdir"])
  .option("query", sql)
  .load())
def sql(sql):
  return(display(sqldf(sql)))
sql("""
select current_date()
""")
df = sqldf("""
select current_date()
“””)

Find out more about all the benefits Snowflake has to offer for you and your business. Enter your name and email below and we'll be in touch!

Leave a Reply

Your email address will not be published. Required fields are marked *