KB: Airflow DAG

An Airflow DAG (Directed Acyclic Graph) is the central component in Apache Airflow that defines a workflow or pipeline. It represents the sequence and dependencies of tasks to be executed. Each DAG is a Python script that contains the structure of the workflow, detailing the tasks (operators) and their dependencies.

DAGs:

Tasks inside DAGs:




Here's a breakdown of the main elements of an Airflow DAG:

1. Directed Acyclic Graph (DAG)

  • Directed: It has a clear order where each task points to the next task.
  • Acyclic: It cannot have loops, so tasks cannot depend on each other in a circular way.
  • Graph: It represents tasks (nodes) and dependencies (edges) as a graph structure.

2. Tasks in a DAG

  • Tasks are individual steps in a DAG and are represented by operators (like PythonOperator, BashOperator, etc.).
  • Each task can perform specific actions, such as running scripts, transferring data, or checking conditions.
  • Tasks are defined in the DAG and assigned dependencies to set the order in which they run.

3. Dependencies

  • Tasks are connected with dependencies, indicating the order of execution.
  • Airflow uses >> and << operators to set dependencies between tasks (e.g., task1 >> task2 means task1 must run before task2).

4. DAG Parameters

  • dag_id: A unique identifier for the DAG.
  • schedule_interval: Defines how often the DAG should run (e.g., hourly, daily, weekly).
  • start_date: Sets the date and time when the DAG should first run.
  • catchup: If True, the DAG will "catch up" and run missed intervals if it’s behind schedule.

5. DAG Execution

  • Airflow schedules and manages the execution of DAGs based on their schedule intervals.
  • Each DAG run creates a DAG Run, which is a specific instance of the DAG with a particular execution date.

Example of a Simple DAG

python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta # Define default args for DAG default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # Define the DAG with DAG( 'example_dag', default_args=default_args, description='A simple example DAG', schedule_interval=timedelta(days=1), start_date=datetime(2024, 1, 1), catchup=False, ) as dag: def print_hello(): print("Hello from Airflow!") # Define tasks hello_task = PythonOperator( task_id='hello_task', python_callable=print_hello, ) # Set dependencies if needed

In this example:

  • example_dag is a simple Airflow DAG that runs daily.
  • hello_task is a task that prints "Hello from Airflow!"


Ref: https://airflow.apache.org/docs/apache-airflow/1.10.9/concepts.html#:~:text=In%20Airflow%2C%20a%20DAG%20%E2%80%93%20or,reflects%20their%20relationships%20and%20dependencies.

https://airflow.apache.org/docs/apache-airflow/stable/ui.html

Comments

Popular posts from this blog

KB: Azure ACA Container fails to start (no User Assigned or Delegated Managed Identity found for specified ClientId)

Electron Process Execution Failure with FSLogix

KB:RMM VS DEX (Remote Monitoring Management vs Digital Employee Experience)