I encountered this issue yesterday when trying to invoke a lambda function from an Airflow DAG synchronously, I needed to wait for the lambda to finish in order to continue with the rest of the tasks in the workflow. Your solution looks good and helped me to fix it, however it didn't work right away when I tested your code.
Just in case this is of help, here is what I did in my DAG in order to get it working the way you wanted.
from botocore.config import Config
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
from airflow.providers.amazon.aws.operators.lambda_function import (
LambdaInvokeFunctionOperator as BaseLambdaInvokeFunctionOperator,
)
class LambdaInvokeFunctionOperator(BaseLambdaInvokeFunctionOperator):
"""
Class needed to override default confifguration lambda uses for boto3 connections to AWS
Need to extend the default connection timeot, the read timeout and need to keep the tcp connection alive.
"""
def __init__(self, *args, **kwargs):
config_dict = {
"connect_timeout": 900,
"read_timeout": 900,
"tcp_keepalive": True,
}
self.config = Config(**config_dict)
super().__init__(*args, **kwargs)
def execute(self, context):
hook = LambdaHook(aws_conn_id=self.aws_conn_id, config=self.config)
self.hook = hook
return super().execute(context)
Feeling quite surprised that there is not an easier way to do this directly with an operator or hook without having to do these weird modifications. Thanks ;)