Légáramlás: kevésbé ismert tippek, trükkök és bevált gyakorlatok

A használt eszközökkel vannak bizonyos dolgok, amelyekről még hosszú ideje történő használat után sem fog tudni. És amint megtudja, olyan vagy, mint „bárcsak tudtam volna ezt korábban”, mivel már mondtad az ügyfelednek, hogy ezt nem lehet jobb módon megtenni . A légáramlás, mint a többi eszköz sem különbözik, vannak olyan rejtett drágakövek, amelyek megkönnyíthetik az életét, és szórakoztatóvá teszik a DAG fejlesztését.

Lehet, hogy máris ismersz néhányat, és ha ismeri őket - nos, akkor profi vagy!

(1) DAG a kontextuskezelővel

Idegesítette magát, amikor elfelejtette hozzáadni dag = dag feladatát, és az Airflow hiba történt? Igen, könnyű elfelejteni, hogy minden feladathoz hozzáadja. Felesleges a következő példában megadott paraméter hozzáadása (példa_dag.py fájl):

A fenti példa (példa_dag.py fájl) csak 2 feladatot tartalmaz, de ha 10 vagy annál több, akkor a redundancia nyilvánvalóbbá válik. Ennek elkerülése érdekében az Airflow DAG-kat használhatja kontextuskezelőként, hogy automatikusan hozzárendeljen új operátorokat ehhez a DAG-hoz, ahogy a fenti példában látható (példa_dag_with_context.py) az utasítás használatával.

(2) A Lista használatával állíthatja be a Feladat függőségeket

Ha az alábbi képen láthatóhoz hasonló DAG-t akar létrehozni, akkor a feladatfüggőségek beállításakor meg kell ismételnie a feladatok neveit.

Amint az a fenti kódrészletből kiderül, a feladatfüggőségek szokásos módjának használata azt jelentené, hogy a task_two és a vége háromszor megismétlődnek. Ezt a python listákkal helyettesíthetjük, hogy ugyanazt az eredményt elegánsabb módon érjük el.

(3) Használjon alapértelmezett argumentumokat az argumentumok megismétlésének elkerülésére

Légáramlás, amely lehetővé teszi a paraméterek olyan szótárának átadását, amely az adott DAG összes feladatához rendelkezésre áll.

Például a DataReply-nél a BigQuery-t használjuk az összes DataWareshouse-hoz kapcsolódó DAG-hoz, és ahelyett, hogy paramétereket, például címkéket, a bigquery_conn_id-t továbbadnánk minden egyes feladathoz, egyszerűen átadjuk az indefault_args szótárnak, amint az az alábbi DAG-ban látható.

Ez akkor is hasznos, ha figyelmeztetéseket szeretne kapni az egyes feladatok kudarcairól, nem pedig csak a DAG hibákról, amelyeket már említettem az utolsó blokkban, a Slack Alerts integrálása az Airflow-ban.

(4) A „params” érv

A „params” a DAG szintű paraméterek szótára, amely sablonokban elérhetővé válik. Ezek a paraméterek a feladat szintjén felülbírálhatók.

Ez egy rendkívül hasznos érv, és sokat használtam személyesen, mivel sablon mezőben érhető el a jinja sablonnal a params.param_name használatával. Példa a következő felhasználásra:

Megkönnyíti a paraméterezett DAG írását a keményen kódolt értékek helyett. Amint a fenti példák is mutatják, a params szótár három helyen definiálható: (1) DAG objektumban (2) alapértelmezett_args szótárban (3) Minden feladat.

(5) Érzékeny adatok tárolása a kapcsolatokban

A legtöbb felhasználó tisztában van ezzel, de még mindig láttam a DAG-ban egyszerű szövegben tárolt jelszavakat. Isten kedvéért - ne csináld ezt. A DAG-kat úgy kell írnia, hogy magabiztos legyen ahhoz, hogy a DAG-jait nyilvános tárolóban tárolja.

Alapértelmezés szerint az Airflow a kapcsolat jelszavát egyszerű szövegben menti a metaadat-adatbázisban. A rejtjelező csomag nagyon ajánlott az Airflow telepítése során, és egyszerűen megtehető az apache-air flow pip installálásával [crypto].

Ezután könnyen elérheti a következőképpen:

az airflow.hooks.base_hook webhelyről importálja a BaseHook programot
slack_token = BaseHook.get_connection ('laza') jelszó

(6) Korlátozza a légáramlás-változók számát a DAG-ban

A légáramlás-változók a metaadat-adatbázisban vannak tárolva, így a változókra történő felhívás kapcsolatot jelentene a metaadat-DB-vel. A DAG fájljait X másodpercenként értelmezi. Ha nagyszámú változót használ a DAG-ban (és ami még rosszabb az alapértelmezett_args-ban), akkor jelentheti, hogy az adatbázishoz engedélyezett kapcsolatok számának telítettségét eredményezheti.

A helyzet elkerülése érdekében vagy csak egyetlen JSON értékű Airflow változót használhat. Mivel az Airflow változó tartalmazhat JSON-értéket, az összes DAG-konfigurációt egyetlen változóban tárolhatja, az alábbi képen látható módon:

Ahogyan ez a képernyőképen látható, az értékeket külön Airflow változókban vagy egyetlen Airflow változó alatt JSON mezőként tárolhatja

Ezután az Ajánlott módon alább látható módon érheti el őket:

(7) A „kontextus” szótár

A felhasználók gyakran elfelejtik a kontextus szótár tartalmát, amikor a PythonOperator-t hívható funkcióval használják.

A kontextus hivatkozásokat tartalmaz a kapcsolódó objektumokra a feladatpéldányra, és az API makrók szakaszában van dokumentálva, mivel ezek a sablonmezőhöz is elérhetők.

{
      „dag”: task.dag,
      „ds”: ds,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      'prev_ds': prev_ds,
      'prev_ds_nodash': prev_ds_nodash,
      'ds_nodash': ds_nodash,
      „ts”: ts,
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'tegnap_ds': tegnap_ds,
      'tegnap_ds_nodash': tegnap_ds_nodash,
      'holnap_ds': holnap_ds,
      'morning_ds_nodash': morning_ds_nodash,
      'END_DATE': ds,
      'end_date': ds,
      'dag_run': dag_run,
      'run_id': run_id,
      'végrehajtási_nap': self.execution_date,
      'prev_execution_date': prev_execution_date,
      'next_execution_date': next_execution_date,
      'legújabb_nap': ds,
      „makrók”: makrók,
      „params”: params,
      „táblák”: táblák,
      „feladat”: feladat,
      'task_instance': én,
      „ti”: önálló,
      'task_instance_key_str': ti_key_str,
      'conf': konfiguráció,
      'test_mode': self.test_mode,
      „var”: {
          'érték': VariableAccessor (),
          'json': VariableJsonAccessor ()
      },
      „bemeneti nyílások”: task.inlets,
      „outlets”: task.outlets,
}

(8) Dinamikus légáramlási feladatok generálása

Sok kérdésre válaszoltam a StackOverflow kapcsán, amely a dinamikus feladatok létrehozásáról szól. A válasz egyszerű, minden feladathoz el kell generálnia egyedi task_id azonosítót. Az alábbiakban bemutatjuk 2 példát, hogyan lehet ezt elérni:

(9) Futtassa az „airflow upgradeb” parancsot az „airflow initdb” helyett.

Köszönet Ash Berlinnek a tippekért az első Apache Airflow London Meetup beszélgetésében.

Az airflow initdb létrehoz minden alapértelmezett kapcsolatot, diagramot stb., amelyeket esetleg nem használunk és nem akarunk a termelési adatbázisunkban. A airflow upgradeb ehelyett a hiányzó áttelepítéseket csak az adatbázis-táblára alkalmazza. (beleértve a hiányzó táblázatok létrehozását stb.) Biztonságos minden alkalommal futtatni, nyomon követi, hogy mely migrációkat már alkalmazták (az Alembic modul használatával).

Tudassa velem az alábbi megjegyzés szakaszban, ha tud valamit, amit érdemes felvenni ebbe a blogbejegyzésbe. Boldog légáramlás :-)