# dags/mongo_to_gcs_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# import your helper from dag_utils
from dag_utils.mongo_to_gcs import stream_mongo_to_gcs
default_args = {
"owner": "data-eng",
"start_date": datetime(2025,5,20),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="mongo_to_gcs_stream",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
) as dag:
stream_task = PythonOperator(
task_id="stream_mongo_to_gcs",
python_callable=stream_mongo_to_gcs,
op_kwargs={
"mongo_uri": "{{ var.value.mongo_uri }}",
"mongo_db": "pricing",
"mongo_collection": "raw_prices",
"gcs_bucket": "{{ var.value.gcs_bucket }}",
"gcs_path": "ingestion/mongo",
},
)
stream_task