79574482

Date: 2025-04-15 06:24:51
Score: 1
Natty:
Report link

I have posted this question and I have got the answer now. I am so happy to find the solution and post is here.

So to pass the value dynamically to the only-dir flag , we can make use of the paramaters of the DAG (which is used while triggering the DAG.)

So while triggering the DAG i have included the needed directory path in the config and made use of it using JINJA templating of Airflow.

I am sharing the code snippet sample for doing the same.

# Generate a unique identifier
unique_id = str(uuid.uuid4())

# Add the unique identifier to the labels
labels = {
    "composer-env": cluster,
    "project-name": gcs_project_name,
    "version": "1.0",
    "stream": "e2e",
    "unique-id": unique_id  # Unique identifier
}


class CustomGKEStartJobOperator1(GKEStartJobOperator):
    def __init__():
        pass

    template_fields = GKEStartJobOperator.template_fields + ("volumes",)

    def execute(self, context):
        # Re-render volume_attributes before pod creation
        for vol in self.volumes or []:
            if hasattr(vol, "csi") and vol.csi and vol.csi.volume_attributes:
                for key, val in vol.csi.volume_attributes.items():
                    if isinstance(val, str) and "{{" in val:
                        path_name = self.render_template(val.split("dir=")[1], context)
                        vol.csi.volume_attributes[key] = val.split("dir=")[0] + "dir=" + path_name
        return super().execute(context)



gcs_volume_mount = V1VolumeMount(
    name="gcs-fuse-csi",
    mount_path="/data",
)

gcs_volume = V1Volume(
    name="gcs-fuse-csi",
    csi=k8s.V1CSIVolumeSource(
        driver="gcsfuse.csi.storage.gke.io",
        volume_attributes={
            "type": "gcs",
            "bucketName": datalake_bucket,
            "mountOptions": "implicit-dirs,file-mode=0777,dir-mode=0777,only-dir={{ params.gcs_path }}"

        }
    )
)
check_mnt = CustomGKEStartJobOperator1(
            task_id='cehck_mnt_task',
            name='cehck_mnt-pod',
            image=adtf_image,enter image description here
            volumes=[gcs_volume],
            volume_mounts=[gcs_volume_mount],
            cmds=["bash", "-c"],
            arguments=["cp /data/ab.json /data/abc.json && sleep 300"])
Reasons:
  • Blacklisted phrase (1): enter image description here
  • Long answer (-1):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: Raghu Chinna