Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Users/anksing/event closed bug #34495

Merged
merged 5 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
import logging

from openai import AsyncAzureOpenAI
from openai import AsyncAzureOpenAI, AzureOpenAI
from openai.types.chat.chat_completion import ChatCompletion

from azure.ai.generative.evaluate._user_agent import USER_AGENT
Expand All @@ -25,8 +25,8 @@ def __init__(self, openai_params):
self._azure_deployment = openai_params.get("azure_deployment", None)\
if openai_params.get("azure_deployment", None) else openai_params.get("deployment_id", None)

self._client = AsyncAzureOpenAI(
azure_endpoint=self._azure_endpoint,
self._client = AzureOpenAI(
azure_endpoint=self._azure_endpoint.strip("/"),
api_version=self._api_version,
api_key=self._api_key,
default_headers={
Expand All @@ -35,19 +35,18 @@ def __init__(self, openai_params):
},
)

async def bounded_chat_completion(self, messages):
async with semaphore:
try:
result = await self._client.with_options(max_retries=5).chat.completions.create(
model=self._azure_deployment,
messages=messages,
temperature=0,
seed=0,
)
return result
except Exception as ex:
LOGGER.debug(f"Failed to call llm with exception : {str(ex)}")
return ex
def bounded_chat_completion(self, messages):
try:
result = self._client.with_options(max_retries=5).chat.completions.create(
model=self._azure_deployment,
messages=messages,
temperature=0,
seed=0,
)
return result
except Exception as ex:
LOGGER.debug(f"Failed to call llm with exception : {str(ex)}")
return ex

@staticmethod
def get_chat_completion_content_from_response(response):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import asyncio
import json
from json import JSONDecodeError

import numpy as np
import pandas as pd
import logging
import tqdm.asyncio
from numpy import NaN

from .._client.openai_client import AzureOpenAIClient
from .._metric_handler import MetricHandler
from ..metrics._custom_metric import PromptMetric
from ..metrics._parsers import JsonParser, NumberParser
from concurrent.futures.thread import ThreadPoolExecutor
import tqdm

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,49 +104,100 @@ def _parser_response(self, value, metric):

return result

async def _compute_metric_row(self, metric, data):
def _compute_metric_row(self, metric, data):
message = self._convert_metric_to_message(metric, data)
response = await self._client.bounded_chat_completion(message)
response = self._client.bounded_chat_completion(message)
content = self._client.get_chat_completion_content_from_response(response)
result = self._parser_response(content if content is not None else response, metric)
return result

async def _compute_metric(self, metric):
def _compute_metric(self, metric):
data = self._get_data_for_metric(metric)
tasks = []
for row_data in data:
task = asyncio.ensure_future(
self._compute_metric_row(metric, row_data)
)
tasks.append(task)

responses = await asyncio.gather(*tasks, return_exceptions=True)
results = {"artifacts": {}, "metrics": {}}
for key in responses[0].keys():
results["artifacts"].update({
key: [row[key] for row in responses]
})

return results
row_metric_futures = []
row_metric_results = []

async def _compute_metrics(self, metrics):
tasks = []
metrics_dict = {"artifacts": {}, "metrics": {}}
for metric in self.metrics:
task = asyncio.ensure_future(
self._compute_metric(metric)
)
tasks.append(task)
with ThreadPoolExecutor(thread_name_prefix="code_metrics_row") as thread_pool:
for i in range(0, len(data)):
row_metric_futures.append(thread_pool.submit(
self._compute_metric_row, metric, data=data[i]
))

for row_metric_future in row_metric_futures:
row_metric_results.append(row_metric_future.result())

results = {"artifacts": {}, "metrics": {}}

if isinstance(row_metric_results[0], dict):
for key in row_metric_results[0].keys():
results["artifacts"].update({
key: [row[key] for row in row_metric_results]
})
else:
results["artifacts"].update(
{metric.name: row_metric_results}
)

# tasks = []
# for row_data in data:
# task = asyncio.ensure_future(
# self._compute_metric_row(metric, row_data)
# )
# tasks.append(task)

# responses = await asyncio.gather(*tasks, return_exceptions=True)
responses = await tqdm.asyncio.tqdm.gather(*tasks)
for response in responses:
for k, v in metrics_dict.items():
v.update(response[k])
# results = {"artifacts": {}, "metrics": {}}
# for key in responses[0].keys():
# results["artifacts"].update({
# key: [row[key] for row in responses]
# })

return results

def _compute_metrics(self, metrics):

metrics_dict = {"artifacts": {}, "metrics": {}}
metric_results_futures = {}
with tqdm.tqdm(total=len(metrics)) as progress_bar:
with ThreadPoolExecutor(thread_name_prefix="prompt_metrics") as thread_pool:
for metric in self.metrics:
metric_values = []
metric_results_futures.update({metric.name: thread_pool.submit(
self._compute_metric, metric,
)})

for metric_name, metric_result_future in metric_results_futures.items():
try:
metric_result = metric_result_future.result()
metrics_dict["artifacts"].update(metric_result["artifacts"])
if "metrics" in metric_result.keys() and metric_result["metrics"] is not None:
metrics_dict["metrics"].update(metric_result["metrics"])
progress_bar.update(1)
except Exception as ex:
progress_bar.update(1)
LOGGER.info(
f"Error calculating value for {metric_name}, failed with error {str(ex)} : Stack trace : {str(ex.__traceback__)}")


return metrics_dict

# tasks = []
# metrics_dict = {"artifacts": {}, "metrics": {}}
# for metric in self.metrics:
# task = asyncio.ensure_future(
# self._compute_metric(metric)
# )
# tasks.append(task)

# # responses = await asyncio.gather(*tasks, return_exceptions=True)
# responses = await tqdm.asyncio.tqdm.gather(*tasks)
# for response in responses:
# for k, v in metrics_dict.items():
# v.update(response[k])

# return metrics_dict

def calculate_metrics(self):
LOGGER.info(f"Calculating prompt metric {[metric.name for metric in self.metrics]}")
result = asyncio.run(self._compute_metrics(self.metrics), debug=True)
result = self._compute_metrics(self.metrics)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def run_pf_flow_with_dict_list(flow_path, data: List[Dict], flow_params=None):
data=tmp_path,
column_mapping=column_mapping,
environment_variables=env_vars,
stream=True,
**flow_params
)

Expand Down