Airflow Xcom Exclusive -
When a task returns a value, the Custom Backend intercepts it, serializes it to an external bucket, and writes only the URI string (the reference pointer) to the Airflow metadata database. When a downstream task calls xcom_pull , the backend intercepts the URI, fetches the object from cloud storage, deserializes it, and injects it back into the task. Step-by-Step Implementation: Building an S3 XCom Backend Step 1: Write the Custom Backend Class
xcom_objectstorage_threshold : The size threshold for switching backends. 5. Troubleshooting XComs in the UI
@task def consume_id(ref: str) -> None: # ref is automatically pulled as an exclusive XCom spark.read.parquet(ref).show()
Push only once, never overwrite a key. Use execution_date + task_id as part of the key. Enable exclusive mode to prevent accidental re-push. airflow xcom exclusive
This public link is valid for 7 days and shares a thread, including any personal information you added. This link or copies made by others cannot be deleted. If you share with third parties, their policies apply. Can’t copy the link right now. Try again later.
is not a separate feature per se, but a design pattern and configuration discipline that restricts XCom usage to specific, well-defined channels. It combines several Airflow capabilities:
def pull_task(**context): pulled = context["ti"].xcom_pull(task_ids="push_task") print(pulled["data"]) When a task returns a value, the Custom
You enable exclusive mode but still store heavy objects in the default DB. Solution: Use CustomXComBackend that serializes large objects to external storage (GCS, S3, Redis) and stores only a URI in the xcom table.
To overcome database storage constraints, Airflow provides an elegant, enterprise-grade escape hatch: . This feature allows you to change the underlying storage engine of XCom from the metadata database to cloud object storage (Amazon S3, Google Cloud Storage, Azure Blob Storage) transparently.
What are you looking to pass between tasks (e.g., status flags, file paths, large dataframes)? Enable exclusive mode to prevent accidental re-push
Airflow 2.0 introduced the ability to swap the XCom backend. This changes the game regarding the "Size Limit" constraint mentioned above.
class S3XCom(BaseXCom): @staticmethod def serialize(value): if size_of(value) > 1_000_000: s3_key = upload_to_s3(value) return "__s3_uri": s3_key return value
Tasks use xcom_pull to retrieve values from previous tasks. You can filter these requests by: Specify which task the data came from. Keys: Filter for specific identifiers. DAG IDs: Pull from different DAGs if necessary. Best Practices and Limitations