I have posted this question and I have got the answer now. I am so happy to find the solution and post is here.
So to pass the value dynamically to the only-dir flag , we can make use of the paramaters of the DAG (which is used while triggering the DAG.)
So while triggering the DAG i have included the needed directory path in the config and made use of it using JINJA templating of Airflow.
I am sharing the code snippet sample for doing the same.
# Generate a unique identifier
unique_id = str(uuid.uuid4())
# Add the unique identifier to the labels
labels = {
"composer-env": cluster,
"project-name": gcs_project_name,
"version": "1.0",
"stream": "e2e",
"unique-id": unique_id # Unique identifier
}
class CustomGKEStartJobOperator1(GKEStartJobOperator):
def __init__():
pass
template_fields = GKEStartJobOperator.template_fields + ("volumes",)
def execute(self, context):
# Re-render volume_attributes before pod creation
for vol in self.volumes or []:
if hasattr(vol, "csi") and vol.csi and vol.csi.volume_attributes:
for key, val in vol.csi.volume_attributes.items():
if isinstance(val, str) and "{{" in val:
path_name = self.render_template(val.split("dir=")[1], context)
vol.csi.volume_attributes[key] = val.split("dir=")[0] + "dir=" + path_name
return super().execute(context)
gcs_volume_mount = V1VolumeMount(
name="gcs-fuse-csi",
mount_path="/data",
)
gcs_volume = V1Volume(
name="gcs-fuse-csi",
csi=k8s.V1CSIVolumeSource(
driver="gcsfuse.csi.storage.gke.io",
volume_attributes={
"type": "gcs",
"bucketName": datalake_bucket,
"mountOptions": "implicit-dirs,file-mode=0777,dir-mode=0777,only-dir={{ params.gcs_path }}"
}
)
)
check_mnt = CustomGKEStartJobOperator1(
task_id='cehck_mnt_task',
name='cehck_mnt-pod',
image=adtf_image,enter image description here
volumes=[gcs_volume],
volume_mounts=[gcs_volume_mount],
cmds=["bash", "-c"],
arguments=["cp /data/ab.json /data/abc.json && sleep 300"])