From 60cadc96070e97df67c9611b23ab08bbb1b3e453 Mon Sep 17 00:00:00 2001 From: Ankit Singhal Date: Thu, 15 Feb 2024 10:39:18 -0800 Subject: [PATCH 1/3] Adding tests to capture groundedness with expected values --- .../evaluate/e2etests/test_evaluate_e2e.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sdk/ai/azure-ai-generative/tests/evaluate/e2etests/test_evaluate_e2e.py b/sdk/ai/azure-ai-generative/tests/evaluate/e2etests/test_evaluate_e2e.py index 7a8c06ee5771..46eeef058b53 100644 --- a/sdk/ai/azure-ai-generative/tests/evaluate/e2etests/test_evaluate_e2e.py +++ b/sdk/ai/azure-ai-generative/tests/evaluate/e2etests/test_evaluate_e2e.py @@ -22,10 +22,14 @@ class TestEvaluate(AzureRecordedTestCase): def test_evaluate_built_in_metrics(self, e2e_openai_api_base, e2e_openai_api_key, e2e_openai_completion_deployment_name, tmpdir): test_data = [ - {"question": "How do you create a run?", "context": "AML API only", - "answer": "To create a run using the Azure Machine Learning API, you first need to create an Experiment. Once you have an experiment, you can create a Run object that is associated with that experiment. Here is some Python code that demonstrates this process:\n\n```\nfrom azureml.core import Experiment, Run\nfrom azureml.core.workspace import Workspace\n\n# Define workspace and experiment\nws = Workspace.from_config()\nexp = Experiment(workspace=ws, name='my_experiment')\n\n# Create a new run\nrun = exp.start_logging()\n```\n\nIn this code, the `from_config()` method reads the configuration file that you created when you set up your Azure Machine Learning workspace. The `Experiment` constructor creates an Experiment object that is associated with your workspace, and the `start_logging()` method creates a new Run object that is associated with the Experiment. Now you can use the `run` object to log metrics, upload files, and track other information related to your machine learning experiment."}, - {"question": "How do you log a model?", "context": "Logging can be done using any OSS Sdk", - "answer": "There are a few ways to log models in Azure Machine Learning. \n\nOne way is to use the `register_model()` method of the `Run` object. The `register_model()` method logs a model file in the Azure Machine Learning service workspace and makes it available for deployment. Here's an example:\n\n```python\nfrom azureml.core import Model\n\nmodel_path = '.\/outputs\/my_model.pkl'\nmodel = Model.register(workspace=ws, model_path=model_path, model_name='my_model')\n```\n\nThis code registers the model file located at `model_path` to the Azure Machine Learning service workspace with the name `my_model`. \n\nAnother way to log a model is to save it as an output of a `Run`. If your model generation code is part of a script or Jupyter notebook that runs as an Azure Machine Learning experiment, you can save the model file as an output of the `Run` object. Here's an example:\n\n```python\nfrom sklearn.linear_model import LogisticRegression\nfrom azureml.core.run import Run\n\n# Initialize a run object\nrun = Run.get_context()\n\n# Train your model\nX_train, y_train = ...\nclf = LogisticRegression().fit(X_train, y_train)\n\n# Save the model to the Run object's outputs directory\nmodel_path = 'outputs\/model.pkl'\njoblib.dump(value=clf, filename=model_path)\n\n# Log the model as a run artifact\nrun.upload_file(name=model_path, path_or_stream=model_path)\n```\n\nIn this code, `Run.get_context()` retrieves the current run context object, which you can use to track metadata and metrics for the run. After training your model, you can use `joblib.dump()` to save the model to a file, and then log the file as an artifact of the run using `run.upload_file()`."}, + {"context": "Some are reported as not having been wanted at all.", + "question": "", + "answer": "All are reported as being completely and fully wanted." + }, + {"question": "How do you log a model?", + "context": "There are a few ways to log models in Azure Machine Learning. \n\nOne way is to use the `register_model()` method of the `Run` object. The `register_model()` method logs a model file in the Azure Machine Learning service workspace and makes it available for deployment. Here's an example:\n\n```python\nfrom azureml.core import Model\n\nmodel_path = '.\/outputs\/my_model.pkl'\nmodel = Model.register(workspace=ws, model_path=model_path, model_name='my_model')\n```\n\nThis code registers the model file located at `model_path` to the Azure Machine Learning service workspace with the name `my_model`. \n\nAnother way to log a model is to save it as an output of a `Run`. If your model generation code is part of a script or Jupyter notebook that runs as an Azure Machine Learning experiment, you can save the model file as an output of the `Run` object. Here's an example:\n\n```python\nfrom sklearn.linear_model import LogisticRegression\nfrom azureml.core.run import Run\n\n# Initialize a run object\nrun = Run.get_context()\n\n# Train your model\nX_train, y_train = ...\nclf = LogisticRegression().fit(X_train, y_train)\n\n# Save the model to the Run object's outputs directory\nmodel_path = 'outputs\/model.pkl'\njoblib.dump(value=clf, filename=model_path)\n\n# Log the model as a run artifact\nrun.upload_file(name=model_path, path_or_stream=model_path)\n```\n\nIn this code, `Run.get_context()` retrieves the current run context object, which you can use to track metadata and metrics for the run. After training your model, you can use `joblib.dump()` to save the model to a file, and then log the file as an artifact of the run using `run.upload_file()`.", + "answer": "There are a few ways to log models in Azure Machine Learning. \n\nOne way is to use the `register_model()` method of the `Run` object. The `register_model()` method logs a model file in the Azure Machine Learning service workspace and makes it available for deployment. Here's an example:\n\n```python\nfrom azureml.core import Model\n\nmodel_path = '.\/outputs\/my_model.pkl'\nmodel = Model.register(workspace=ws, model_path=model_path, model_name='my_model')\n```\n\nThis code registers the model file located at `model_path` to the Azure Machine Learning service workspace with the name `my_model`. \n\nAnother way to log a model is to save it as an output of a `Run`. If your model generation code is part of a script or Jupyter notebook that runs as an Azure Machine Learning experiment, you can save the model file as an output of the `Run` object. Here's an example:\n\n```python\nfrom sklearn.linear_model import LogisticRegression\nfrom azureml.core.run import Run\n\n# Initialize a run object\nrun = Run.get_context()\n\n# Train your model\nX_train, y_train = ...\nclf = LogisticRegression().fit(X_train, y_train)\n\n# Save the model to the Run object's outputs directory\nmodel_path = 'outputs\/model.pkl'\njoblib.dump(value=clf, filename=model_path)\n\n# Log the model as a run artifact\nrun.upload_file(name=model_path, path_or_stream=model_path)\n```\n\nIn this code, `Run.get_context()` retrieves the current run context object, which you can use to track metadata and metrics for the run. After training your model, you can use `joblib.dump()` to save the model to a file, and then log the file as an artifact of the run using `run.upload_file()`." + }, ] with tmpdir.as_cwd(): @@ -44,8 +48,8 @@ def test_evaluate_built_in_metrics(self, e2e_openai_api_base, e2e_openai_api_key "deployment_id": e2e_openai_completion_deployment_name, }, data_mapping={ - "questions": "question", - "contexts": "context", + "question": "question", + "context": "context", "y_pred": "answer", "y_test": "truth", }, @@ -57,6 +61,8 @@ def test_evaluate_built_in_metrics(self, e2e_openai_api_base, e2e_openai_api_key assert "gpt_groundedness" in metrics_summary.keys() assert metrics_summary.get("gpt_groundedness") == np.nanmean(tabular_result["gpt_groundedness"]) + assert tabular_result["gpt_groundedness"][0] in [1,2] + assert tabular_result["gpt_groundedness"][1] in [5, 4] def test_duplicate_metrics_name(self, e2e_openai_api_base, e2e_openai_api_key, e2e_openai_completion_deployment_name, tmpdir): From 8f8d770ddc984c302447e3fba258e85b8a9628f1 Mon Sep 17 00:00:00 2001 From: Ankit Singhal Date: Tue, 27 Feb 2024 23:14:21 -0800 Subject: [PATCH 2/3] Using threadpool to calculate prompt metrics instead of ayncio --- .../evaluate/_client/openai_client.py | 31 +++-- .../_prompt_metric_handler.py | 115 +++++++++++++----- 2 files changed, 98 insertions(+), 48 deletions(-) diff --git a/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_client/openai_client.py b/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_client/openai_client.py index 2e6cbd95fb15..58c1bc38b8ea 100644 --- a/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_client/openai_client.py +++ b/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_client/openai_client.py @@ -4,7 +4,7 @@ import asyncio import logging -from openai import AsyncAzureOpenAI +from openai import AsyncAzureOpenAI, AzureOpenAI from openai.types.chat.chat_completion import ChatCompletion from azure.ai.generative.evaluate._user_agent import USER_AGENT @@ -25,8 +25,8 @@ def __init__(self, openai_params): self._azure_deployment = openai_params.get("azure_deployment", None)\ if openai_params.get("azure_deployment", None) else openai_params.get("deployment_id", None) - self._client = AsyncAzureOpenAI( - azure_endpoint=self._azure_endpoint, + self._client = AzureOpenAI( + azure_endpoint=self._azure_endpoint.strip("/"), api_version=self._api_version, api_key=self._api_key, default_headers={ @@ -35,19 +35,18 @@ def __init__(self, openai_params): }, ) - async def bounded_chat_completion(self, messages): - async with semaphore: - try: - result = await self._client.with_options(max_retries=5).chat.completions.create( - model=self._azure_deployment, - messages=messages, - temperature=0, - seed=0, - ) - return result - except Exception as ex: - LOGGER.debug(f"Failed to call llm with exception : {str(ex)}") - return ex + def bounded_chat_completion(self, messages): + try: + result = self._client.with_options(max_retries=5).chat.completions.create( + model=self._azure_deployment, + messages=messages, + temperature=0, + seed=0, + ) + return result + except Exception as ex: + LOGGER.debug(f"Failed to call llm with exception : {str(ex)}") + return ex @staticmethod def get_chat_completion_content_from_response(response): diff --git a/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_metrics_handler/_prompt_metric_handler.py b/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_metrics_handler/_prompt_metric_handler.py index 2d297b72d272..b98bc806d3ea 100644 --- a/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_metrics_handler/_prompt_metric_handler.py +++ b/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_metrics_handler/_prompt_metric_handler.py @@ -1,20 +1,20 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -import asyncio import json from json import JSONDecodeError import numpy as np import pandas as pd import logging -import tqdm.asyncio from numpy import NaN from .._client.openai_client import AzureOpenAIClient from .._metric_handler import MetricHandler from ..metrics._custom_metric import PromptMetric from ..metrics._parsers import JsonParser, NumberParser +from concurrent.futures.thread import ThreadPoolExecutor +import tqdm LOGGER = logging.getLogger(__name__) @@ -104,49 +104,100 @@ def _parser_response(self, value, metric): return result - async def _compute_metric_row(self, metric, data): + def _compute_metric_row(self, metric, data): message = self._convert_metric_to_message(metric, data) - response = await self._client.bounded_chat_completion(message) + response = self._client.bounded_chat_completion(message) content = self._client.get_chat_completion_content_from_response(response) result = self._parser_response(content if content is not None else response, metric) return result - async def _compute_metric(self, metric): + def _compute_metric(self, metric): data = self._get_data_for_metric(metric) - tasks = [] - for row_data in data: - task = asyncio.ensure_future( - self._compute_metric_row(metric, row_data) - ) - tasks.append(task) - - responses = await asyncio.gather(*tasks, return_exceptions=True) - results = {"artifacts": {}, "metrics": {}} - for key in responses[0].keys(): - results["artifacts"].update({ - key: [row[key] for row in responses] - }) - return results + row_metric_futures = [] + row_metric_results = [] - async def _compute_metrics(self, metrics): - tasks = [] - metrics_dict = {"artifacts": {}, "metrics": {}} - for metric in self.metrics: - task = asyncio.ensure_future( - self._compute_metric(metric) - ) - tasks.append(task) + with ThreadPoolExecutor(thread_name_prefix="code_metrics_row") as thread_pool: + for i in range(0, len(data)): + row_metric_futures.append(thread_pool.submit( + self._compute_metric_row, metric, data=data[i] + )) + + for row_metric_future in row_metric_futures: + row_metric_results.append(row_metric_future.result()) + + results = {"artifacts": {}, "metrics": {}} + + if isinstance(row_metric_results[0], dict): + for key in row_metric_results[0].keys(): + results["artifacts"].update({ + key: [row[key] for row in row_metric_results] + }) + else: + results["artifacts"].update( + {metric.name: row_metric_results} + ) + + # tasks = [] + # for row_data in data: + # task = asyncio.ensure_future( + # self._compute_metric_row(metric, row_data) + # ) + # tasks.append(task) # responses = await asyncio.gather(*tasks, return_exceptions=True) - responses = await tqdm.asyncio.tqdm.gather(*tasks) - for response in responses: - for k, v in metrics_dict.items(): - v.update(response[k]) + # results = {"artifacts": {}, "metrics": {}} + # for key in responses[0].keys(): + # results["artifacts"].update({ + # key: [row[key] for row in responses] + # }) + return results + + def _compute_metrics(self, metrics): + + metrics_dict = {"artifacts": {}, "metrics": {}} + metric_results_futures = {} + with tqdm.tqdm(total=len(metrics)) as progress_bar: + with ThreadPoolExecutor(thread_name_prefix="prompt_metrics") as thread_pool: + for metric in self.metrics: + metric_values = [] + metric_results_futures.update({metric.name: thread_pool.submit( + self._compute_metric, metric, + )}) + + for metric_name, metric_result_future in metric_results_futures.items(): + try: + metric_result = metric_result_future.result() + metrics_dict["artifacts"].update(metric_result["artifacts"]) + if "metrics" in metric_result.keys() and metric_result["metrics"] is not None: + metrics_dict["metrics"].update(metric_result["metrics"]) + progress_bar.update(1) + except Exception as ex: + progress_bar.update(1) + LOGGER.info( + f"Error calculating value for {metric_name}, failed with error {str(ex)} : Stack trace : {str(ex.__traceback__)}") + + return metrics_dict + # tasks = [] + # metrics_dict = {"artifacts": {}, "metrics": {}} + # for metric in self.metrics: + # task = asyncio.ensure_future( + # self._compute_metric(metric) + # ) + # tasks.append(task) + + # # responses = await asyncio.gather(*tasks, return_exceptions=True) + # responses = await tqdm.asyncio.tqdm.gather(*tasks) + # for response in responses: + # for k, v in metrics_dict.items(): + # v.update(response[k]) + + # return metrics_dict + def calculate_metrics(self): LOGGER.info(f"Calculating prompt metric {[metric.name for metric in self.metrics]}") - result = asyncio.run(self._compute_metrics(self.metrics), debug=True) + result = self._compute_metrics(self.metrics) return result From 5f846452d44ad2fdfc4ed37c6dd7e9c4cc4d78f3 Mon Sep 17 00:00:00 2001 From: Ankit Singhal Date: Thu, 29 Feb 2024 12:07:24 -0800 Subject: [PATCH 3/3] Streaming logs for PF run so to show them on console --- .../azure-ai-generative/azure/ai/generative/evaluate/_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_utils.py b/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_utils.py index 4529401c22d4..6a100cedac34 100644 --- a/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_utils.py +++ b/sdk/ai/azure-ai-generative/azure/ai/generative/evaluate/_utils.py @@ -67,6 +67,7 @@ def run_pf_flow_with_dict_list(flow_path, data: List[Dict], flow_params=None): data=tmp_path, column_mapping=column_mapping, environment_variables=env_vars, + stream=True, **flow_params )