Chcę utworzyć proces ETL, który odczytuje zapytania z Snowflake. Większość przykładów online pokazuje, jak skonfigurować połączenie przy użyciu zwykłego hasła ciągowego, ale moja firma skonfigurowała hasło za pomocą klucza prywatnego. Niestety, gdy próbuję przekazać klucz prywatny jako parametr, zwraca następujący błąd:

Traceback (most recent call last):
  File "/Users/rihun/PycharmProjects/snowflake_gcp_etl/loader.py", line 61, in <module>
    .option("query", query) \
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'Input PEM private key is invalid'

Przykład kodu:

import findspark
findspark.init()

import pyspark
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages net.snowflake:snowflake-jdbc:3.6.24,net.snowflake:spark-snowflake_2.11:2.4.12-spark_2.3 pyspark-shell'

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from snowflake_connector import get_keeper_token, get_snowflake_credentials

spark = SparkSession.builder.master('local').appName('Snowflake Loader').config('spark.driver.memory', '5G').getOrCreate()

spark.builder.config('spark.executor.memory', '16G')
spark.builder.config('spark.executor.cores', '4')

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

sf_creds = get_snowflake_credentials(keeper_token=get_keeper_token())

sfOptions = {
    "sfURL": sf_creds['sfURL'],
    "sfAccount": sf_creds['sfAccount'],
    "sfUser": sf_creds['sfUser'],
    "pem_private_key": sf_creds['sfPrivateKey'],
    "sfDatabase": sf_creds['sfDatabase'],
    "sfSchema": sf_creds['sfSchema'],
    "sfWarehouse": sf_creds['sfWarehouse'],
}

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", query) \
    .load()

df.count()

Jak otrzymuję dane logowania

def get_snowflake_credentials(keeper_token: str,
                         keeper_url='<keeper_url>',
                         keeper_namespace='cloudDB',
                         keeper_secret_path='<path_to_key>',
                         sf_account='<sf_account>',
                         sf_svc_user='<user>',
                         sf_wh='<warehouse>',
                         sf_role='<role>',
                         sf_db='<db>',
                         sf_schema='<schema>'):
    # Connect to Keeper to collect secrets
    client = hvac.Client(
        url=keeper_url,
        namespace=keeper_namespace,
        token=keeper_token
    )

    # Secrets are stored within the key entitled 'data'
    keeper_secrets = client.read(keeper_secret_path)['data']
    passphrase = keeper_secrets['SNOWSQL_PRIVATE_KEY_PASSPHRASE']
    private_key = keeper_secrets['private_key']

    # PEM key must be byte encoded
    key = bytes(private_key, 'utf-8')

    p_key = serialization.load_pem_private_key(
        key
        , password=passphrase.encode()
        , backend=default_backend()
    )

    pkb = p_key.private_bytes(
        encoding=serialization.Encoding.DER
        , format=serialization.PrivateFormat.PKCS8
        , encryption_algorithm=serialization.NoEncryption())

    sf_client = snowflake.connector.connect(
        user=sf_svc_user
        , account=sf_account
        , warehouse=sf_wh
        , role=sf_role
        , database=sf_db
        , schema=sf_schema
        , private_key=pkb)

    return {
        "sfURL": "<url>",
        "sfAccount": sf_account,
        "sfUser": sf_svc_user,
        "sfPrivateKey": pkb,
        "sfDatabase": sf_db,
        "sfSchema": sf_schema,
        "sfWarehouse": sf_wh
    }
2
Riley Hun 17 grudzień 2019, 06:04
Spróbuj zaktualizować złącze JDBC i sprawdź, czy to pomoże. Jakiś czas temu widziałem ten problem ze starszym złączem i aktualizacja pomogła w tym przypadku (net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4). Możesz również spróbować przetestować za pomocą Pythona, aby sprawdzić, czy problem dotyczy tylko platformy Spark.
 – 
Suzy Lockwood
17 grudzień 2019, 23:37
Dzięki Suzy - próbowałem połączyć się z Pythonem i wydawało się, że działa dobrze. Myślę więc, że ten problem jest specyficznym problemem iskry.
 – 
Riley Hun
18 grudzień 2019, 01:11
Niestety aktualizacja złącza JDBC nie rozwiązała problemu
 – 
Riley Hun
18 grudzień 2019, 01:39
To ciekawe, że działa z Pythonem. Czy możesz w ogóle ulepszyć Sparka? Może być wart biletu do pomocy technicznej, jeśli nie możesz go uruchomić.
 – 
Suzy Lockwood
19 grudzień 2019, 02:38
Utworzyłem problem dotyczący ich problemów z GitHub.
 – 
Riley Hun
19 grudzień 2019, 03:53

1 odpowiedź

Can you try with this code.

---------------------------------------------------------------------------------

    #!/usr/bin/env python
    # coding=utf-8
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    from pyspark import SparkConf, SparkContext
    import subprocess
    from pyspark.sql import SparkSession
    import os
    import logging
    from logging import getLogger
    from cryptography.hazmat.primitives import serialization
    from cryptography.hazmat.backends import default_backend
    import re
    from cryptography.hazmat.primitives.serialization import load_pem_private_key

    v_log = '<path>/spark.log'

spark = SparkSession \
    .builder \
    .config("spark.jars", "<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.repl.local.jars",
            "<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())   

    logging.basicConfig(
            filename=v_log,
            level=logging.DEBUG)
    logger = getLogger(__name__)



    with open("<path-to>/rsa_key.p8", "rb") as key_file:
        p_key = serialization.load_pem_private_key(
            key_file.read(),
            password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
            backend=default_backend()
        )

    pkb = p_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )
    pkb = pkb.decode("UTF-8")
    pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")


    sfOptions = {
        "sfURL": "<URL>",
        "sfAccount": "sfcsupport",
        "sfUser": "",
        "sfDatabase": "",
        "sfSchema": "PUBLIC",
        "sfWarehouse": "",
        "sfRole": "",
        "pem_private_key":pkb
    }




    SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
        .options(**sfOptions) \
        .option("query", "Select * from <TableName>") \
        .load()


    df.show()

----------------------------------------------------------------------------
1
Ankur Srivastava 20 grudzień 2019, 20:46
Dzięki Ankurowi! Czy uruchamiam kod tak, jak jest? Otrzymuję błąd — java.lang.ClassNotFoundException: Nie można znaleźć źródła danych: net.snowflake.spark.snowflake. Proszę znaleźć pakiety na
 – 
Riley Hun
20 grudzień 2019, 13:50
Zaktualizowano go. Spróbuj z tym i wprowadź odpowiednie zmiany.
 – 
Ankur Srivastava
20 grudzień 2019, 20:47
Czy mogę również zapytać, jakiej wersji Spark i wersji Pythona używasz? Używam Sparka 2.3.3 i Pythona 3.7
 – 
Riley Hun
21 grudzień 2019, 01:45
Używam Pythona 3.7 i Spark 2.4.3. W tym kodzie ładuję w słoikach pamięci i poniżej 2 wiersze są kluczem: pkb = pkb.decode("UTF-8") pkb = re.sub("-*( BEGIN|END) KLUCZ PRYWATNY-*\n", "", pkb).replace("\n", "")
 – 
Ankur Srivastava
23 grudzień 2019, 22:01
Właśnie udało mi się to wypróbować i otrzymuję następujący błąd: Sterownik JDBC nie może połączyć się ze Snowflake. Kod błędu: 390100, Wiadomość: Podano nieprawidłową nazwę użytkownika lub hasło. Mogę również potwierdzić, że używam poprawnych poświadczeń. Może używane przeze mnie pliki jars są niepoprawne?
 – 
Riley Hun
24 luty 2020, 08:41