Chcę utworzyć proces ETL, który odczytuje zapytania z Snowflake. Większość przykładów online pokazuje, jak skonfigurować połączenie przy użyciu zwykłego hasła ciągowego, ale moja firma skonfigurowała hasło za pomocą klucza prywatnego. Niestety, gdy próbuję przekazać klucz prywatny jako parametr, zwraca następujący błąd:

Traceback (most recent call last):
 File "/Users/rihun/PycharmProjects/snowflake_gcp_etl/loader.py", line 61, in <module>
  .option("query", query) \
 File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/readwriter.py", line 172, in load
  return self._df(self._jreader.load())
 File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
 File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py", line 79, in deco
  raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'Input PEM private key is invalid'

Przykład kodu:

import findspark
findspark.init()

import pyspark
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages net.snowflake:snowflake-jdbc:3.6.24,net.snowflake:spark-snowflake_2.11:2.4.12-spark_2.3 pyspark-shell'

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from snowflake_connector import get_keeper_token, get_snowflake_credentials

spark = SparkSession.builder.master('local').appName('Snowflake Loader').config('spark.driver.memory', '5G').getOrCreate()

spark.builder.config('spark.executor.memory', '16G')
spark.builder.config('spark.executor.cores', '4')

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

sf_creds = get_snowflake_credentials(keeper_token=get_keeper_token())

sfOptions = {
  "sfURL": sf_creds['sfURL'],
  "sfAccount": sf_creds['sfAccount'],
  "sfUser": sf_creds['sfUser'],
  "pem_private_key": sf_creds['sfPrivateKey'],
  "sfDatabase": sf_creds['sfDatabase'],
  "sfSchema": sf_creds['sfSchema'],
  "sfWarehouse": sf_creds['sfWarehouse'],
}

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query", query) \
  .load()

df.count()

Jak otrzymuję dane logowania

def get_snowflake_credentials(keeper_token: str,
             keeper_url='<keeper_url>',
             keeper_namespace='cloudDB',
             keeper_secret_path='<path_to_key>',
             sf_account='<sf_account>',
             sf_svc_user='<user>',
             sf_wh='<warehouse>',
             sf_role='<role>',
             sf_db='<db>',
             sf_schema='<schema>'):
  # Connect to Keeper to collect secrets
  client = hvac.Client(
    url=keeper_url,
    namespace=keeper_namespace,
    token=keeper_token
  )

  # Secrets are stored within the key entitled 'data'
  keeper_secrets = client.read(keeper_secret_path)['data']
  passphrase = keeper_secrets['SNOWSQL_PRIVATE_KEY_PASSPHRASE']
  private_key = keeper_secrets['private_key']

  # PEM key must be byte encoded
  key = bytes(private_key, 'utf-8')

  p_key = serialization.load_pem_private_key(
    key
    , password=passphrase.encode()
    , backend=default_backend()
  )

  pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER
    , format=serialization.PrivateFormat.PKCS8
    , encryption_algorithm=serialization.NoEncryption())

  sf_client = snowflake.connector.connect(
    user=sf_svc_user
    , account=sf_account
    , warehouse=sf_wh
    , role=sf_role
    , database=sf_db
    , schema=sf_schema
    , private_key=pkb)

  return {
    "sfURL": "<url>",
    "sfAccount": sf_account,
    "sfUser": sf_svc_user,
    "sfPrivateKey": pkb,
    "sfDatabase": sf_db,
    "sfSchema": sf_schema,
    "sfWarehouse": sf_wh
  }
2
Riley Hun 17 grudzień 2019, 06:04
Spróbuj zaktualizować złącze JDBC i sprawdź, czy to pomoże. Jakiś czas temu widziałem ten problem ze starszym złączem i aktualizacja pomogła w tym przypadku (net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4). Możesz również spróbować przetestować za pomocą Pythona, aby sprawdzić, czy problem dotyczy tylko platformy Spark.
 – 
Suzy Lockwood
17 grudzień 2019, 23:37
Dzięki Suzy - próbowałem połączyć się z Pythonem i wydawało się, że działa dobrze. Myślę więc, że ten problem jest specyficznym problemem iskry.
 – 
Riley Hun
18 grudzień 2019, 01:11
Niestety aktualizacja złącza JDBC nie rozwiązała problemu
 – 
Riley Hun
18 grudzień 2019, 01:39
To ciekawe, że działa z Pythonem. Czy możesz w ogóle ulepszyć Sparka? Może być wart biletu do pomocy technicznej, jeśli nie możesz go uruchomić.
 – 
Suzy Lockwood
19 grudzień 2019, 02:38
Utworzyłem problem dotyczący ich problemów z GitHub.
 – 
Riley Hun
19 grudzień 2019, 03:53

1 odpowiedź

Can you try with this code.

---------------------------------------------------------------------------------

  #!/usr/bin/env python
  # coding=utf-8
  from pyspark import SparkConf, SparkContext
  from pyspark.sql import SQLContext
  from pyspark.sql.types import *
  from pyspark import SparkConf, SparkContext
  from pyspark.sql import SQLContext
  from pyspark.sql.types import *
  from pyspark import SparkConf, SparkContext
  import subprocess
  from pyspark.sql import SparkSession
  import os
  import logging
  from logging import getLogger
  from cryptography.hazmat.primitives import serialization
  from cryptography.hazmat.backends import default_backend
  import re
  from cryptography.hazmat.primitives.serialization import load_pem_private_key

  v_log = '<path>/spark.log'

spark = SparkSession \
  .builder \
  .config("spark.jars", "<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
  .config("spark.repl.local.jars",
      "<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
  .config("spark.sql.catalogImplementation", "in-memory") \
  .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
  spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())  

  logging.basicConfig(
      filename=v_log,
      level=logging.DEBUG)
  logger = getLogger(__name__)  with open("<path-to>/rsa_key.p8", "rb") as key_file:
    p_key = serialization.load_pem_private_key(
      key_file.read(),
      password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
      backend=default_backend()
    )

  pkb = p_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
  )
  pkb = pkb.decode("UTF-8")
  pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")


  sfOptions = {
    "sfURL": "<URL>",
    "sfAccount": "sfcsupport",
    "sfUser": "",
    "sfDatabase": "",
    "sfSchema": "PUBLIC",
    "sfWarehouse": "",
    "sfRole": "",
    "pem_private_key":pkb
  }
  SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

  df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "Select * from <TableName>") \
    .load()


  df.show()

----------------------------------------------------------------------------
1
Ankur Srivastava 20 grudzień 2019, 20:46
Dzięki Ankurowi! Czy uruchamiam kod tak, jak jest? Otrzymuję błąd — java.lang.ClassNotFoundException: Nie można znaleźć źródła danych: net.snowflake.spark.snowflake. Proszę znaleźć pakiety na
 – 
Riley Hun
20 grudzień 2019, 13:50
Zaktualizowano go. Spróbuj z tym i wprowadź odpowiednie zmiany.
 – 
Ankur Srivastava
20 grudzień 2019, 20:47
Czy mogę również zapytać, jakiej wersji Spark i wersji Pythona używasz? Używam Sparka 2.3.3 i Pythona 3.7
 – 
Riley Hun
21 grudzień 2019, 01:45
Używam Pythona 3.7 i Spark 2.4.3. W tym kodzie ładuję w słoikach pamięci i poniżej 2 wiersze są kluczem: pkb = pkb.decode("UTF-8") pkb = re.sub("-*( BEGIN|END) KLUCZ PRYWATNY-*\n", "", pkb).replace("\n", "")
 – 
Ankur Srivastava
23 grudzień 2019, 22:01
Właśnie udało mi się to wypróbować i otrzymuję następujący błąd: Sterownik JDBC nie może połączyć się ze Snowflake. Kod błędu: 390100, Wiadomość: Podano nieprawidłową nazwę użytkownika lub hasło. Mogę również potwierdzić, że używam poprawnych poświadczeń. Może używane przeze mnie pliki jars są niepoprawne?
 – 
Riley Hun
24 luty 2020, 08:41