The following code solves the problem posed by the question, i.e., retrieve schedules that have at least one job (any component in the pipeline that runs on the schedule) successfully finished.
The issue however is that when a schedule has multiple runs, only the first run is considered.
A more interesting problem is to retrieve schedules that have at least one run completed, or the last run completed, or the last run failed. I will address this question in a separate post (please answer the question if you have a better solution)
# -------------------------------------------------
# Connect to AML and set tracking URI in mlflow
# -------------------------------------------------
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
# Connect to AML
client = MLClient(
credential= InteractiveBrowserCredential(),
subscription_id="my-subscription-id",
resource_group_name="my-resource-group",
workspace_name="my-workspace"
)
# set tracking uri if run locally
mlflow_tracking_uri = client.workspaces.get(client.workspace_name).mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
# -------------------------------------------------
# Retrieve and filter schedules
# -------------------------------------------------
schedules = client.schedules.list()
# optional: filter schedules based on name containing substring:
selected_schedules = [
schedule
for schedule in schedules
if "inference_pipelin" in schedule.name
]
# -------------------------------------------------
# Get schedules that have *at least* one job (not one run) completed
# -------------------------------------------------
experiment_names = [schedule.create_job.experiment_name for schedule in selected_schedules]
filter_string = " or ".join([f"(name = {x})" for x in experiment_names])
experiments = mlflow.search_experiments(filter_string=filter_string)
experiments_df = pd.DataFrame(
{
"experiment_id": [exp.experiment_id for exp in experiments],
"experiment_name": [exp.name for exp in experiments],
"schedule": selected_schedules,
}
)
all_runs = mlflow.search_runs(
experiment_names=experiment_names,
filter_string="tags.mlflow.user='Jaume Amores'",
)
selected_experiments = all_runs.groupby("experiment_id")["status"].apply(lambda x: (x == "FINISHED").any())
selected_schedules = experiments_df[experiments_df["experiment_id"].isin(selected_experiments[selected_experiments].index)]["schedule"].tolist()