Integrating Spark with Snowflake

March 2, 2018
Configuring Spark to use Snowflake using PySpark

Using PySpark, the following script allows access to the AWS S3 bucket/directory used to exchange data between Spark and Snowflake.

The script uses the standard AWS method of providing a pair of awsAccessKeyId and awsSecretAccessKey values. These values should also be used to configure the Spark/Hadoop environment to access S3.

Replace the following variables in the script:

• SF_ACCOUNT: Account name, e.g. yourcompany.
• SF_USER: User login name.
• SF_PWD: User password to log in with.
• SF_DB: The default database to use for the session after connecting.
• SF_SCHEMA: The default schema to use for the session after connecting.
• SF_WAREHOUSE: The default virtual warehouse to use for the session after connecting.

Configuration Script:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId";, "[AWS_ACCESS_KEY]")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey";, "[AWS_SECRET_KEY]")
defaultOptions = {
  "sfURL": "[SF_ACCOUNT]";,
  "sfAccount": "[SF_ACCOUNT]",
  "sfUser": "[SF_USER]",
  "sfPassword": "[SF_PWD]",
  "sfDatabase": "[SF_DB]",
  "sfSchema": "[SF_SCHEMA]",
  "sfWarehouse": "[SF_WAREHOUSE]",
  "awsAccessKey": sc._jsc.hadoopConfiguration().get("fs.s3n.awsAccessKeyId";),
  "awsSecretKey": sc._jsc.hadoopConfiguration().get("fs.s3n.awsSecretAccessKey";),
  "tempdir": "s3n://[AWS_TEMP_S3]"
def sqldf(sql):
  .option("sfURL", defaultOptions["sfURL"])
  .option("sfAccount", defaultOptions["sfAccount"])
  .option("sfUser", defaultOptions["sfUser"])
  .option("sfPassword", defaultOptions["sfPassword"])
  .option("sfDatabase", defaultOptions["sfDatabase"])
  .option("sfSchema", defaultOptions["sfSchema"])
  .option("sfWarehouse", defaultOptions["sfWarehouse"])
  .option("awsAccessKey", sc._jsc.hadoopConfiguration().get("fs.s3n.awsAccessKeyId";))
  .option("awsSecretKey", sc._jsc.hadoopConfiguration().get("fs.s3n.awsSecretAccessKey";))
  .option("tempdir", defaultOptions["tempdir"])
  .option("query", sql)
def sql(sql):
select current_date()
df = sqldf("""
select current_date()

