You're absolutely right - the issue is that volume_mounts isn't included in the template_fields of KubernetesPodOperator, so Airflow doesn't apply templating to it at all.
I've run into this exact same problem before. Here are a few approaches that actually work:
Monkey patch the template_fields (quick and dirty)
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
# Add volume_mounts to template_fields
KubernetesPodOperator.template_fields = KubernetesPodOperator.template_fields + ('volume_mounts',)
@dag(
dag_id=PIPELINE_NAME,
schedule=None,
params={
"command": "",
"image": "python:3.13-slim",
"shared_data_mount_path": "/mnt/data/"
}
)
def run_arbitary_command_pipeline():
# ... your existing code ...
run_command = KubernetesPodOperator(
task_id="run_arbitrary_command",
cmds=["sh", "-c", "{{ params.command }}"],
image="{{ params.image }}",
volumes=[k8s.V1Volume(name=pvc_name, persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name))],
# Use dict instead of V1VolumeMount object for templating to work
volume_mounts=[{
'name': pvc_name,
'mount_path': "{{ params.shared_data_mount_path }}"
}],
)
Custom operator (cleaner approach)
class TemplatedKubernetesPodOperator(KubernetesPodOperator):
template_fields = KubernetesPodOperator.template_fields + ('volume_mounts',)
# Then use TemplatedKubernetesPodOperator instead of KubernetesPodOperator
The key insight here is that you need to use dictionary format for volume_mounts when templating is involved, not the k8s.V1VolumeMount objects. Airflow will convert the dicts to proper Kubernetes objects after template rendering.
I personally prefer Option 1 for one-off cases since it's simpler, but if you're going to reuse this pattern across multiple DAGs, definitely go with the custom operator approach.
Also make sure you're defining your params in the @dag decorator with the params argument, not as function parameters - that's another common gotcha