79625426

Date: 2025-05-16 14:53:26
Score: 0.5
Natty:
Report link
# dags/mongo_to_gcs_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# import your helper from dag_utils
from dag_utils.mongo_to_gcs import stream_mongo_to_gcs

default_args = {
    "owner": "data-eng",
    "start_date": datetime(2025,5,20),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="mongo_to_gcs_stream",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
) as dag:

    stream_task = PythonOperator(
        task_id="stream_mongo_to_gcs",
        python_callable=stream_mongo_to_gcs,
        op_kwargs={
            "mongo_uri":        "{{ var.value.mongo_uri }}",
            "mongo_db":         "pricing",
            "mongo_collection": "raw_prices",
            "gcs_bucket":       "{{ var.value.gcs_bucket }}",
            "gcs_path":         "ingestion/mongo",
        },
    )

    stream_task
Reasons:
  • Long answer (-0.5):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: Gaurav Tripathi