Skip to content

Commit

Permalink
Merge pull request Azure#3 from KieranBrantnerMagee/kibrantn/python-p…
Browse files Browse the repository at this point in the history
…erf-framework-v1

Python Perf Framework v1
  • Loading branch information
mikeharder authored Nov 7, 2019
2 parents c2da97b + 61b1c06 commit 6d991d0
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
################################################################################
# This .gitignore file was automatically created by Microsoft(R) Visual Studio.
################################################################################

/**/__pycache__
/python/env
8 changes: 8 additions & 0 deletions python/PerfStressProgram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import asyncio
import logging

from core.PerfStressRunner import PerfStressRunner

if __name__ == '__main__':
main_loop = PerfStressRunner()
asyncio.run(main_loop.RunAsync())
24 changes: 24 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
= Execution =

To run: (And display help string)
python PerfStressProgram.py -h


To run a test: (and display help string)
python PerfStressProgram.py <testname> -h


= Requirements =

No packages/requirements are needed for the core scaffold.

Per-Test requirements:
- Storage: (e.g. GetBlobsTest)
azure-storage-blob


= Advanced Use =

To consume as library, observe syntax in PerfStressProgram.py

To specify local/custom test location, simply provide the new location to the PerfStressRunner constructor.
Empty file added python/__init__.py
Empty file.
39 changes: 39 additions & 0 deletions python/core/Helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import random
import string
from threading import Timer


def NewGuid(length=10):
return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(length))


# Credit to https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()


def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)


def start(self):
if not self.is_running:
#NOTE: If there is a concern about perf impact of this Timer, we'd need to convert to multiprocess and use IPC.

self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True


def stop(self):
self._timer.cancel()
self.is_running = False
199 changes: 199 additions & 0 deletions python/core/PerfStressRunner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import argparse
import asyncio
import time
import inspect
import json
import logging
import os
import pkgutil
import sys
import threading

from .PerfStressTest import PerfStressTest
from .Helpers import RepeatedTimer

_DEFAULT_TEST_LOCATION = os.path.join(os.path.dirname(__file__), '../tests')

class PerfStressRunner:
def __init__(self, test_folder_path=_DEFAULT_TEST_LOCATION):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(level=logging.INFO)
self.logger.addHandler(handler)

#NOTE: If you need to support registering multiple test locations, move this into Initialize, call lazily on Run, expose RegisterTestLocation function.
self._DiscoverTests(test_folder_path)
self._ParseArgs()


self._operation_count_lock = threading.Lock()
self._operation_count = 0
self._last_completed = -1
self._latest_operation_durations = []
# These (durations and status) are different despite tracking the runners _technically_ it's not wise to use structures like this in a non-thread-safe manner,
# so I'm only abusing this for ease of status updates where error doesn't really matter, and durations is still aggregated in a safe way.
self._status = {}


def _ParseArgs(self):
# First, detect which test we're running.
arg_parser = argparse.ArgumentParser(
description='Python Perf Test Runner',
usage='{} <TEST> [<args>]'.format(__file__))

# NOTE: remove this and add another help string to query for available tests
# if/when # of classes become enough that this isn't practical.
arg_parser.add_argument('test', help='Which test to run. Supported tests: {}'.format(" ".join(sorted(self._test_classes.keys()))))

args = arg_parser.parse_args(sys.argv[1:2])
try:
self._test_class_to_run = self._test_classes[args.test]
except KeyError as e:
self.logger.error("Invalid test: {}\n Test must be one of: {}\n".format(args.test, " ".join(sorted(self._test_classes.keys()))))
raise

# Next, parse args for that test. We also do global args here too so as not to confuse the initial test parse.
per_test_arg_parser = argparse.ArgumentParser(
description=self._test_class_to_run.__doc__ or args.test,
usage='{} {} [<args>]'.format(__file__, args.test))

# Global args
per_test_arg_parser.add_argument('-p', '--parallel', nargs='?', type=int, help='Degree of parallelism to run with. Default is 1.', default=1)
per_test_arg_parser.add_argument('-d', '--duration', nargs='?', type=int, help='Duration of the test in seconds. Default is 10.', default=10)
per_test_arg_parser.add_argument('-i', '--iterations', nargs='?', type=int, help='Number of iterations in the main test loop. Default is 1.', default=1)
per_test_arg_parser.add_argument('-w', '--warmup', nargs='?', type=int, help='Duration of warmup in seconds. Default is 5.', default=5)
per_test_arg_parser.add_argument('--no-cleanup', action='store_true', help='Do not run cleanup logic. Default is false.', default=False)
per_test_arg_parser.add_argument('--sync', action='store_true', help='Run tests in sync mode. Default is False.', default=False)

# Per-test args
self._test_class_to_run.AddArguments(per_test_arg_parser)
per_test_args = per_test_arg_parser.parse_args(sys.argv[2:])

self._test_class_to_run.Arguments = per_test_args

self.logger.info("=== Options ===")
self.logger.info(args)
self.logger.info(per_test_args)


def _DiscoverTests(self, test_folder_path=_DEFAULT_TEST_LOCATION):
self._test_classes = {}

# Dynamically enumerate all python modules under the tests path for classes that implement PerfStressTest
for loader, name, _ in pkgutil.walk_packages([test_folder_path]):

try:
module = loader.find_module(name).load_module(name)
except Exception as e:
self.logger.warn("Unable to load module {}: {}".format(name, e))
continue
for name, value in inspect.getmembers(module):

if name.startswith('_'):
continue
if inspect.isclass(value) and issubclass(value, PerfStressTest) and value != PerfStressTest:
self.logger.info("Loaded test class: {}".format(name))
self._test_classes[name] = value


async def RunAsync(self):
self.logger.info("=== Setup ===")

tests = []
for _ in range(0, self._test_class_to_run.Arguments.parallel):
tests.append(self._test_class_to_run())

try:
await tests[0].GlobalSetupAsync()
try:
await asyncio.gather(*[test.SetupAsync() for test in tests])

if self._test_class_to_run.Arguments.warmup > 0:
await self._RunTestsAsync(tests, self._test_class_to_run.Arguments.warmup, "Warmup")

self.logger.info("=== Running ===")

for i in range(0, self._test_class_to_run.Arguments.iterations):
await self._RunTestsAsync(tests, self._test_class_to_run.Arguments.duration, "Test {}".format(i))

finally:
if not self._test_class_to_run.Arguments.no_cleanup:
self.logger.info("=== Cleanup ===")
await asyncio.gather(*[test.CleanupAsync() for test in tests])
finally:
if not self._test_class_to_run.Arguments.no_cleanup:
await tests[0].GlobalCleanupAsync()


async def _RunTestsAsync(self, tests, duration, title):
self._operation_count = 0
self._last_completed = -1
self._latest_operation_durations = []
self._status = {}

status_thread = RepeatedTimer(1, self._PrintStatus, title)

if self._test_class_to_run.Arguments.sync:
threads = []
for id, test in enumerate(tests):
thread = threading.Thread(target=lambda: self.RunLoop(test, duration, id))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
else:
await asyncio.gather(*[self.RunLoopAsync(test, duration, id) for id, test in enumerate(tests)])

status_thread.stop()

self.logger.info("=== Results ===")
try:
count_per_second = (self._operation_count / (sum(self._latest_operation_durations) / len(self._latest_operation_durations)))
count_per_second_per_thread = count_per_second / self._test_class_to_run.Arguments.parallel
seconds_per_operation = 1/count_per_second_per_thread
except ZeroDivisionError as e:
self.logger.warn("Attempted to divide by zero: {}".format(e))
count_per_second = 0
count_per_second_per_thread = 0
seconds_per_operation = 'N/A'
self.logger.info("\tCompleted {} operations\n\tAverage {} operations per thread per second\n\tAverage {} seconds per operation".format(self._operation_count, count_per_second_per_thread, seconds_per_operation))


def RunLoop(self, test, duration, id):
start = time.time()
runtime = 0
count = 0
while runtime < duration:
test.Run()
runtime = time.time() - start
count += 1
self._status[id] = count
self._IncrementOperationCountAndTime(count, runtime)


async def RunLoopAsync(self, test, duration, id):
start = time.time()
runtime = 0
count = 0
while runtime < duration:
await test.RunAsync()
runtime = time.time() - start
count += 1
self._status[id] = count
self._IncrementOperationCountAndTime(count, runtime)


def _IncrementOperationCountAndTime(self, count, runtime):
with self._operation_count_lock: # Be aware that while this can be used to update more often than "once at the end" it'll thrash the lock in the parallel case and ruin perf.
self._operation_count += count
self._latest_operation_durations.append(runtime)


def _PrintStatus(self, title):
if self._last_completed == -1:
self._last_completed = 0
self.logger.info("=== {} ===\nCurrent\t\tTotal".format(title))
operation_count = sum(self._status.values())
self.logger.info("{}\t\t{}".format(operation_count - self._last_completed, operation_count))
self._last_completed = operation_count
37 changes: 37 additions & 0 deletions python/core/PerfStressTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
class PerfStressTest:
'''Base class for implementing a python perf test.
- Run and RunAsync must be implemented.
- GlobalSetup and GlobalCleanup are optional and run once, ever, regardless of parallelism.
- Setup and Cleanup are run once per test instance (where each instance runs in its own thread/process), regardless of #iterations.
- Run/RunAsync are run once per iteration.'''
async def GlobalSetupAsync(self):
return

async def GlobalCleanupAsync(self):
return

async def SetupAsync(self):
return

async def CleanupAsync(self):
return

def __enter__(self):
return

def __exit__(self, exc_type, exc_value, traceback):
return

def Run(self):
raise Exception('Run must be implemented for {}'.format(self.__class__.__name__))

async def RunAsync(self):
raise Exception('RunAsync must be implemented for {}'.format(self.__class__.__name__))

Arguments = {}
# Override this method to add test-specific argparser args to the class.
# These are accessable in the Arguments class property.
@staticmethod
def AddArguments(parser):
return
Empty file added python/core/__init__.py
Empty file.
53 changes: 53 additions & 0 deletions python/tests/GetBlobsTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os

from core.PerfStressTest import PerfStressTest
from core.Helpers import NewGuid

from azure.storage.blob import ContainerClient as SyncContainerClient
from azure.storage.blob.aio import ContainerClient as AsyncContainerClient

class GetBlobsTest(PerfStressTest):
'''This test evaluates the perf of enumerating blobs'''
container_name = NewGuid()
def __init__(self):
connection_string = os.environ.get("STORAGE_CONNECTION_STRING")
if not connection_string:
raise Exception("Undefined environment variable STORAGE_CONNECTION_STRING")

self.container_client = SyncContainerClient.from_connection_string(conn_str=connection_string, container_name=self.container_name)

#TODO: I really hate this.
self.async_container_client = AsyncContainerClient.from_connection_string(conn_str=connection_string, container_name=self.container_name)


async def SetupAsync(self):
await self.async_container_client.__aenter__()


async def CleanupAsync(self):
await self.async_container_client.__aexit__()


async def GlobalSetupAsync(self):
self.container_client.create_container()
for _ in range(0, self.Arguments.count): #pylint: disable=no-member
self.container_client.upload_blob(NewGuid(), '')


async def GlobalCleanupAsync(self):
self.container_client.delete_container()


def Run(self):
for _ in self.container_client.list_blobs():
pass


async def RunAsync(self):
async for _ in self.async_container_client.list_blobs(): #pylint: disable=not-an-iterable
pass


@staticmethod
def AddArguments(parser):
parser.add_argument('-c', '--count', nargs='?', type=int, help='Number of blobs to populate. Default is 1.', default=1)
9 changes: 9 additions & 0 deletions python/tests/NoOpTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from core.PerfStressTest import PerfStressTest

class NoOpTest(PerfStressTest):
def Run(self):
pass

async def RunAsync(self):
pass

Empty file added python/tests/__init__.py
Empty file.

0 comments on commit 6d991d0

Please sign in to comment.