Airflow
BranchPythonOperator allows conditional branching in a DAG by returning the task id of the downstream task to execute. Tasks on non selected branches ar...
Can you provide an example of using the BranchPythonOperator in a DAG?
BranchPythonOperator allows conditional branching in a DAG by returning the task id of the downstream task to execute. Tasks on non selected branches are marked as skipped . The callable must return a valid task id string (or a list of task IDs) Tasks on branches not returned are set to skipped state Use trigger rule='none failed min one success' on downstream join tasks to handle skipped branches
Describe a situation where you had to implement a complex workflow in Airflow. How did you approach it?
A common complex workflow involves multi source ingestion with transformations and downstream reporting. The approach is to decompose the workflow into discrete, testable tasks and leverage Airflow's dependency management. Data ingestion : Used custom operators or PythonOperator to pull from APIs, databases, and file systems in parallel Transformation layer : Chained PythonOperator tasks for cleaning, deduplication, and enrichment, with XCom to pass metadata between stages Loading and reporting
How do you handle task failures in Airflow to ensure minimal disruption?
Airflow provides several mechanisms to handle task failures gracefully, minimizing impact on the overall pipeline. Retries : Set retries and retry delay in default args to automatically retry transient failures Failure callbacks : Use on failure callback to trigger custom notifications (Slack, PagerDuty) with context about the failure Sensors with timeout : For tasks depending on external conditions, use sensors with timeout and poke interval to fail gracefully if conditions are never met Trigge
Imagine you have a task that depends on the successful completion of tasks in another DAG. How would you set this up?
Use the ExternalTaskSensor to create cross DAG dependencies. It pauses execution until a specified task in another DAG reaches a successful state. external dag id and external task id identify the upstream task to wait for Use mode='reschedule' to free the worker slot while waiting instead of blocking Both DAGs must have the same schedule interval (or use execution delta / execution date fn to align different schedules) Set a reasonable timeout to avoid indefinite waiting
Can you provide an example of how you've used custom operators in Airflow?
Custom operators encapsulate reusable logic by subclassing BaseOperator and implementing the execute() method. This promotes consistency and reduces code duplication across DAGs. Override init for task specific parameters and execute() for runtime logic Use self.log for structured logging within the operator Place custom operators in a shared plugins/ or dags/operators/ directory for reuse across DAGs
How do you manage sensitive information, like API keys or database passwords, within Airflow?
Airflow provides multiple layers for managing secrets securely, ensuring credentials are never hardcoded in DAG files. Connections : Store database credentials, API keys, and service accounts in Airflow's Connections (via the UI or CLI). Reference them in DAGs using hooks: Variables : Store non connection config values using Variable.get('key') , with optional deserialize json=True Secrets backends : Integrate with HashiCorp Vault , AWS Secrets Manager , or GCP Secret Manager to fetch credential
Write a PythonOperator task that reads data from an API and pushes the result to XCom. import requests from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime
This example fetches data from a REST API and pushes the result into XCom so downstream tasks can consume it. xcom push(key, value) stores data in Airflow's metadata DB, retrievable by downstream tasks via xcom pull Alternatively, returning a value from the callable automatically pushes it to XCom with the return value key Keep XCom payloads small (metadata, file paths, record counts) for large data, store in external storage and pass the reference
How do you test and debug DAGs in Airflow before production?
Testing and debugging DAGs before production deployment is critical to avoiding pipeline failures and data issues. DAG validation : Import the DAG file in a Python script to catch syntax errors and verify it loads correctly: Unit testing : Test individual task callables with pytest , mocking external dependencies CLI testing : Use airflow tasks test dag id task id execution date to run a single task locally without affecting the scheduler Dry runs : Use airflow dags test dag id execution date (A