Używam Docker-Compose, aby skonfigurować skalowalny klaster przepływu powietrza. Oparłem moje podejście z tego DockerFile https://hub.docker.com/r/ Puckel / Docker-Airflow /

Moim problemem jest ustawienie dzienników, aby pisać / odczytywać z S3. Kiedy ukończył DAG, otrzymuję błąd w ten sposób

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

Ustawiłem nową sekcję w pliku airflow.cfg w ten sposób

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

A następnie określono ścieżkę S3 w sekcji Remote Logów w airflow.cfg

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

Czy to zrobiłem prawidłowo i jest błąd? Czy jest tutaj receptura dla sukcesu, którego brakuje?

-- Aktualizacja

Próbowałem eksportować w formatach URI i JSON, a nie wydawał się pracować. Następnie eksportowałem aws_access_key_id i aws_secret_access_key, a następnie przepływ powietrza zaczął go zbierać. Teraz otrzymuję błąd w dziennikach roboczych

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

-- Aktualizacja

Znalazłem również ten link https://www.mail-archive.com/dev @ airflow.incubator.apache.org / msg00462.html

Potem porzuciłem się do jednego z moich maszyn robotniczych (oddzielonych od serwerów WebServer i Scheduler) i prowadził ten bit kodu w Pythonie

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

Otrzymuję ten błąd.

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

Próbowałem eksportować kilka różnych typów AIRFLOW_CONN_ Envs, jak wyjaśniono tutaj w sekcji połączeń HTTPS: / /airflow.incubator.apache.org/concepts.html i przez inne odpowiedzi na to pytanie.

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

Eksportowałem również aws_access_key_id i aws_secret_access_key bez sukcesu.

Te poświadczenia są przechowywane w bazie danych, więc po dodaniu ich w interfejsie użytkownika powinny być odebrane przez pracowników, ale nie są w stanie pisać / czytać dzienników z jakiegoś powodu.

44
JackStat 27 czerwiec 2017, 15:49

7 odpowiedzi

Najlepsza odpowiedź

Musisz skonfigurować połączenie S3 przez interfejs UI przepływu powietrza. W tym celu musisz przejść do admin - & gt; Połączenia zakładka na interfejsie interfejsu interfejsu użytkownika i utwórz nowy wiersz do połączenia S3.

Przykładowa konfiguracja byłaby:

Conn Id: my_conn_S3

Conn Type: S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}
28
jps 17 grudzień 2020, 12:03

Few! Motywacja do noszenia błędów przepływów powietrza w pączku jest konfrontacji o kilka wiązek plików Pythona XD Oto moje doświadczenie na tym z apache-airflow==1.9.0.

Przede wszystkim nie ma po prostu potrzeby airflow connections .......... --conn_extra itp.

Wystarczy ustaw swoje airflow.cfg jako:

remote_logging = True
remote_base_log_folder = s3://dev-s3-main-ew2-dmg-immutable-potns/logs/airflow-logs/
encrypt_s3_logs = False

# Logging level
logging_level = INFO
fab_logging_level = WARN

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = s3://<ACCESS-KEY>:<SECRET-ID>@<MY-S3-BUCKET>/<MY>/<SUB>/<FOLDER>/

Utrzymuj plik $AIRFLOW_HOME/config/__ init __.py i $AIRFLOW_HOME/config/log_config.py jak powyżej.

Problem ze mną jako brakujący pakiet "boto3", który mógłbym dotrzeć do:

vi /usr/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py
then >> import traceback

Iw linii zawierającej:

Nie mógł utworzyć S3Hook z identyfikatorem połączenia "% s". "" Upewnij się, że przepływ powietrza [S3] jest zainstalowany i "połączenie S3 istnieje.

Robić traceback.print_exc() i zaczęło się do zaginionego boto3!

Zainstalowano go i życie było piękne!

1
Andrzej Sydor 17 grudzień 2020, 12:57

Czy działa z przepływem powietrza 1.10 w Kube. Mam następujące zestawy env var :

AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3
1
Andrzej Sydor 17 grudzień 2020, 12:56

Uwaga z boku dla każdego po bardzo przydatnych instrukcji w Powyższa odpowiedź: Jeśli natkniesz się na ten problem: "ModulenotfoundError: No Module o nazwie "airflow.utils.log.logging_mixin.Redirectstdhandler" Jak przywołuje się tutaj ( Który dzieje się przy użyciu powietrza 1.9), Fix jest prosty - użyj raczej ten podstawowy szablon: https://github.com/apache/incubator-airflow/blob/v1-9-9-stable/airflow/config_Templates/airflow_local_settings.py (i postępuj zgodnie ze wszystkimi instrukcjami W Powyższa odpowiedź)

Obecny szablon Inkubator-przepływ / AirFlow / Config_Templates / Airflow_Local_settings .py obecny w Master Oddział zawiera odniesienie do klasy "Airflow.utils.log.s3_task_handler.s3taskHandler", który nie jest obecny w pakiecie Apache-Airflow == 1.9.0 Python. Mam nadzieję że to pomoże!

1
diogoa 22 czerwiec 2018, 17:09

Aby zakończyć odpowiedź ARNE z ostatnimi aktualizacjami przepływów powietrza, nie musisz ustawić task_log_reader do innej wartości niż domyślna: task

Jak w przypadku podążania za domyślnym szablonem rejestrującym AirFlow / Config_Templates / airflow_local_settings.py Możesz zobaczyć Od tego zatwierdzenia (zanotuj nazwę przewodnika zmienioną na 's3': {'task'... zamiast s3.task), która jest wartością w folderze zdalnego ({x2}}) zastąpi obsługę prawą jeden:

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

Więcej szczegółów na temat logowania do / Read z S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3 .

2
Paul Leclercq 31 maj 2018, 19:06

(zaktualizowany od przepływu powietrza 1.10.2)

Oto rozwiązanie, jeśli nie używasz administratora UI.

Mój przepływ powietrza nie działa na trwałym serwerze ... (zostaje uruchomiony na nowo cały dzień w pojemniku docker, na Heroku.) Wiem, że brakuje mi wielu wspaniałych funkcji, ale w moim minimalnym Setup, nigdy nie dotykam administratora UI ani pliku CFG. , muszę ustawić zmienne środowiskowe specyficzne dla powietrza w skrypcie bash, który zastępuje plik .cfg.

apache-airflow [s3]

Przede wszystkim potrzebujesz zainstalowanego podpakowania s3, aby napisać dzienniki przepływu powietrza do S3. (boto3 działa dobrze dla zadań Pythona w dagach, ale S3Hook zależy od podpakowania S3.)

Jeszcze jedna strona Uwaga: Koncepcja instalacji Nie obsługuje tego jeszcze, więc muszę zrobić pip install apache-airflow[s3].

zmienne środowiskowe

W skrypcie bash ustawić te zmienne core. Począwszy od Te instrukcje ale za pomocą konwencji nazewnictwa AIRFLOW__{SECTION}__{KEY} dla zmiennych środowiskowych , Ja robię:

export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

identyfikator połączenia S3

Powyżej s3_uri to identyfikator połączenia, który wymyśliłem. W przepływie powietrza odpowiada innej zmiennej środowiskowej, AIRFLOW_CONN_S3_URI. Wartość to jest ścieżka S3, która musi być w formie URI. Jest to

s3://access_key:secret_key@bucket/key

Przechowuj to, jednak pracujesz z innymi wrażliwymi zmiennymi środowiskowymi.

Dzięki tej konfiguracji przepływ powietrza będzie mógł napisać swoje dzienniki do S3. Postępują na ścieżce s3://bucket/key/dag/task_id/timestamp/1.log.


Dodatek na modernizację z przepływu powietrza 1.8 do powietrza przepływu 1.10

Ostatnio ulepszałem mój rurociąg produkcyjny z przepływu powietrza od 1,8 do 1,9, a następnie 1.10. Dobrą wiadomością jest to, że zmiany są ładne; Reszta pracy po prostu wyznaczyła niuanse z instalacjami pakietów (niepowiązanymi do pierwotnego pytania o dzienniki S3).

(1) Przede wszystkim musiałem uaktualnić do Pythona 3.6 z przepływem powietrza 1.9.

(2) Nazwa pakietu zmieniona z airflow do apache-airflow z 1.9. Możesz także wpaść na to w swoim pip install.

(3) Pakiet psutil musi być w określonej zakresie wersji do przepływu powietrza. Możesz to spotkać, gdy robisz pip install apache-airflow.

(4) Nagłówki Python3-dev są potrzebne z przepływem powietrza 1.9+.

(5) Oto istotne zmiany: export AIRFLOW__CORE__REMOTE_LOGGING=True jest teraz wymagana. I

(6) Dzienniki mają nieco inną ścieżkę w S3, które na bieżąco aktualizowane w odpowiedzi: s3://bucket/key/dag/task_id/timestamp/1.log.

Ale to jest to! Dzienniki nie działały w 1.9, więc polecam tylko iść prosto do 1.10, teraz jest dostępna.

13
Niels Joaquin 25 luty 2019, 20:36

Aktualizacja przepływu powietrza 1.10 sprawia, że rejestrowanie dużo łatwiejszy .

Dla rejestrowania S3 skonfigurować hak połączenia zgodnie z Powyższa odpowiedź

A następnie po prostu dodaj następujące elementy do Airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

Do logowania GCS,

  1. Zainstaluj najpierw pakiet GCP_API, jak: PIP zainstalować Apache-Airflow [GCP_API].

  2. Skonfiguruj hak połączenia zgodnie z Powyższa odpowiedź

  3. Dodaj następujące elementy do AirFlow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn
    

Uwaga: AS Of Airflow 1.9 Zdalne rejestrowanie zostało znacznie zmienione. Jeśli używasz 1.9, czytaj dalej.

Odniesienie Oto

Kompletne instrukcje:

  1. Utwórz katalog, aby przechowywać konfigurację i umieścić to, aby można było znaleźć w Pythonpath. Jednym z przykładów to $ Airflow_Home / Config

  2. Utwórz puste pliki o nazwie $ Airflow_Home / Config / Log_Config.py i $ Airflow_home / config / __ init__.py

  3. Skopiuj zawartość Airflow / Config_Templates / Airflow_Local_settings.py do pliku Log_config.py, który został utworzony w powyższym etapie.

  4. Dostosuj następujące części szablonu:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. Upewnij się, że hak połączenia S3 został zdefiniowany w przepływie powietrza, zgodnie z Powyższa odpowiedź. Hak powinien mieć dostęp do odczytu i zapisu do wiadra S3 zdefiniowanego powyżej w S3_Log_folder.

  6. Zaktualizuj $ AIRFLOW_HOME / AIRFLOW.CFG, aby zawierać:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
  7. Uruchom ponownie WebServer Airflow i Harmonogram i wyzwalaj (lub czekaj na) nową wykonanie zadania.

  8. Sprawdź, czy dzienniki pojawiają się na nowo wykonane zadania w zdefiniowanym wiadrze.

  9. Sprawdź, czy przeglądarka S3 działa w UI. Podciągnij nowo wykonane zadanie i sprawdź, czy widzisz coś takiego:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    
38
Arne Huang 10 październik 2018, 18:06