Chcę utworzyć proces ETL, który odczytuje zapytania z Snowflake. Większość przykładów online pokazuje, jak skonfigurować połączenie przy użyciu zwykłego hasła ciągowego, ale moja firma skonfigurowała hasło za pomocą klucza prywatnego. Niestety, gdy próbuję przekazać klucz prywatny jako parametr, zwraca następujący błąd:
Traceback (most recent call last):
File "/Users/rihun/PycharmProjects/snowflake_gcp_etl/loader.py", line 61, in <module>
.option("query", query) \
File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py", line 79, in deco
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'Input PEM private key is invalid'
Przykład kodu:
import findspark
findspark.init()
import pyspark
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages net.snowflake:snowflake-jdbc:3.6.24,net.snowflake:spark-snowflake_2.11:2.4.12-spark_2.3 pyspark-shell'
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from snowflake_connector import get_keeper_token, get_snowflake_credentials
spark = SparkSession.builder.master('local').appName('Snowflake Loader').config('spark.driver.memory', '5G').getOrCreate()
spark.builder.config('spark.executor.memory', '16G')
spark.builder.config('spark.executor.cores', '4')
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
sf_creds = get_snowflake_credentials(keeper_token=get_keeper_token())
sfOptions = {
"sfURL": sf_creds['sfURL'],
"sfAccount": sf_creds['sfAccount'],
"sfUser": sf_creds['sfUser'],
"pem_private_key": sf_creds['sfPrivateKey'],
"sfDatabase": sf_creds['sfDatabase'],
"sfSchema": sf_creds['sfSchema'],
"sfWarehouse": sf_creds['sfWarehouse'],
}
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", query) \
.load()
df.count()
Jak otrzymuję dane logowania
def get_snowflake_credentials(keeper_token: str,
keeper_url='<keeper_url>',
keeper_namespace='cloudDB',
keeper_secret_path='<path_to_key>',
sf_account='<sf_account>',
sf_svc_user='<user>',
sf_wh='<warehouse>',
sf_role='<role>',
sf_db='<db>',
sf_schema='<schema>'):
# Connect to Keeper to collect secrets
client = hvac.Client(
url=keeper_url,
namespace=keeper_namespace,
token=keeper_token
)
# Secrets are stored within the key entitled 'data'
keeper_secrets = client.read(keeper_secret_path)['data']
passphrase = keeper_secrets['SNOWSQL_PRIVATE_KEY_PASSPHRASE']
private_key = keeper_secrets['private_key']
# PEM key must be byte encoded
key = bytes(private_key, 'utf-8')
p_key = serialization.load_pem_private_key(
key
, password=passphrase.encode()
, backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER
, format=serialization.PrivateFormat.PKCS8
, encryption_algorithm=serialization.NoEncryption())
sf_client = snowflake.connector.connect(
user=sf_svc_user
, account=sf_account
, warehouse=sf_wh
, role=sf_role
, database=sf_db
, schema=sf_schema
, private_key=pkb)
return {
"sfURL": "<url>",
"sfAccount": sf_account,
"sfUser": sf_svc_user,
"sfPrivateKey": pkb,
"sfDatabase": sf_db,
"sfSchema": sf_schema,
"sfWarehouse": sf_wh
}
1 odpowiedź
Can you try with this code.
---------------------------------------------------------------------------------
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
import subprocess
from pyspark.sql import SparkSession
import os
import logging
from logging import getLogger
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import re
from cryptography.hazmat.primitives.serialization import load_pem_private_key
v_log = '<path>/spark.log'
spark = SparkSession \
.builder \
.config("spark.jars", "<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.repl.local.jars",
"<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
logging.basicConfig(
filename=v_log,
level=logging.DEBUG)
logger = getLogger(__name__)
with open("<path-to>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")
sfOptions = {
"sfURL": "<URL>",
"sfAccount": "sfcsupport",
"sfUser": "",
"sfDatabase": "",
"sfSchema": "PUBLIC",
"sfWarehouse": "",
"sfRole": "",
"pem_private_key":pkb
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "Select * from <TableName>") \
.load()
df.show()
----------------------------------------------------------------------------
Podobne pytania
Nowe pytania
python
Python to wielozadaniowy, wielozadaniowy język programowania dynamicznie typowany. Został zaprojektowany tak, aby był szybki do nauczenia się, zrozumienia i użycia oraz wymuszania czystej i jednolitej składni. Należy pamiętać, że Python 2 oficjalnie nie jest obsługiwany od 01-01-2020. Mimo to, w przypadku pytań Pythona specyficznych dla wersji, dodaj znacznik [python-2.7] lub [python-3.x]. Korzystając z wariantu Pythona (np. Jython, PyPy) lub biblioteki (np. Pandas i NumPy), należy umieścić go w tagach.