Airflow | Xcom

@staticmethod def serialize(value): # 1. Convert value to string/bytes data = json.dumps(value).encode('utf-8')

I can provide a tailored code template or design a optimized custom storage backend for your stack. XComs — Airflow 3.2.1 Documentation

from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook import json import uuid class S3XComBackend(BaseXCom): PREFIX = "s3://my-airflow-xcom-bucket/payloads/" @staticmethod def serialize_value(value, **kwargs): # Convert any python object or string to a JSON string bytes payload string_data = json.dumps(value) # Define an isolated file path pointer unique_id = str(uuid.uuid4()) s3_key = f"xcom_unique_id.json" # Upload data to remote object storage s3_hook = S3Hook(aws_conn_id="aws_default") s3_hook.load_string( string_data=string_data, key=s3_key, bucket_name="my-airflow-xcom-bucket", replace=True ) # Return the URI string to be saved inside the Airflow DB return BaseXCom.serialize_value(f"S3XComBackend.PREFIXs3_key") @staticmethod def deserialize_value(result): # Retrieve the saved URI reference string from the DB uri = BaseXCom.deserialize_value(result) if isinstance(uri, str) and uri.startswith(S3XComBackend.PREFIX): s3_key = uri.replace(S3XComBackend.PREFIX, "") s3_hook = S3Hook(aws_conn_id="aws_default") # Download file contents from remote storage bucket file_content = s3_hook.read_key(key=s3_key, bucket_name="my-airflow-xcom-bucket") return json.loads(file_content) return uri Use code with caution. airflow xcom

Apache Airflow is a prominent workflow orchestration platform that models workflows as Directed Acyclic Graphs (DAGs) of tasks. By design, tasks are intended to be isolated and independent. However, real-world data pipelines often require tasks to exchange small amounts of metadata or state. This paper examines (short for "cross-communication"), Airflow’s native mechanism for inter-task data exchange. We explore its architecture, usage patterns, limitations, and best practices for reliable workflow design.

def pull_function(**kwargs): ti = kwargs['ti'] @staticmethod def serialize(value): # 1

XCom is for metadata, not data itself.

One of the most powerful features of XCom is injecting data into SQL queries using Jinja templating. You do need to use Python code to pull XComs into SQL operators. key='return_value' ) return sum(values)

To avoid database overload for large payloads (e.g., DataFrames), you can configure a custom backend:

The TaskFlow API simplifies XComs by abstracting the push and pull syntax entirely. When you decorate Python functions with @task , any returned Python object is automatically registered as an XCom value under the default key return_value . Passing that function's output directly to another task automatically creates the downstream dependency and pulls the value.

@task def aggregate(**context): values = context['task_instance'].xcom_pull( task_ids=['task_a', 'task_b'], key='return_value' ) return sum(values)