#
Stress Loading
Attempting to fail your research can be a useful metric to design future directions. The following hammers the servers across zones only limited by the socket limit of an authenticated in-house system. The display self-updates published values, as requests progress.
__author__ = "Vishal Anand"
__email__ = "vishal.anand@columbia.edu"
__copyright__ = "Copyright 2024"
from colored import fg, bg, attr
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pprint import pprint
import argparse, copy, json, logging, os, requests, sys, threading, time, warnings
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
def login():
try:
os.system("az login --identity --output none")
except:
os.system("az login --tenant <tenant> --output none")
os.system("az account set --subscription '<subscription>' --output none")
login()
KEY_VAULT_URL = os.environ.get("KEY_VAULT_URL") or "https://<keyvault>.vault.azure.net"
credential = DefaultAzureCredential()
vault_client = SecretClient(vault_url=KEY_VAULT_URL, credential=credential)
values = {}
AZURE_VAULT_KEYS = [
"keys",
...
]
for key in AZURE_VAULT_KEYS:
values[key] = vault_client.get_secret(key).value
azure_client = values["clientid"]
azure_key = values["client-key"]
csv_blob_connection_string = values["blob-connector-url"]
container_name = "csv"
blob_service_client = BlobServiceClient.from_connection_string(csv_blob_connection_string)
container_client = blob_service_client.get_container_client(container_name)
csv_files = [
"file.csv",
...
]
folder_name = "./csv"
os.makedirs(folder_name, exist_ok=True)
for file_name in csv_files:
file_path = os.path.join(folder_name, file_name)
if not os.path.exists(file_path):
print(f"Downloading {file_name}...")
blob_client = container_client.get_blob_client(file_name)
with open(file_path, "wb") as file:
file.write(blob_client.download_blob().readall())
print(f"{file_name} downloaded.")
else:
print(f"{file_name} already exists.")
file_root = "./csv"
daily = os.path.join(file_root, "file.csv")
warnings.filterwarnings('ignore')
global_counter = 0
threadLock = threading.Lock()
file_dict = {
'daily': ('daily.csv', daily, 'text/csv'),
}
zones_list = [
"Local", "Zone1", "Zone2",
...,
"Zone-n"
]
server_selector = {
"Local": "https://localhost:<port>",
"Zone1": "<>",
...: ...,
"Zone-n": "<>"
}
test_scenarios = [
"ALL",
"scenario-1", "scenario-2",
...
]
tested_scenarios = ["scenario-1", "D+I+S", "D+I+SA", "D+I+S+SA"]
zone_list = ["Local", "Zone1", "Zone2", ..., "Zone-n"]
payload_dictionary = {
"D": {
"Staging": {'Metadata': '{"file":"file.csv",...}'},
"Real": {'metadata': '{InputDataFileName:"file.csv", ...}'}
},
...: ...
}
data_dictionary = {
"D": {
"Staging": [('data', 'daily'), ('argument1', 'argumentvariable1')],
"Real": [('inputFile', 'daily'), ('argument1', 'argumentvariable1')]
},
...: ...
}
def create_token(zone):
url = "https://login.microsoftonline.com/<tenant>/oauth2/v2.0/token"
if (zone == "Zone1" or zone == "Zone2" or zone == "Local"):
client_id = client_id
client_key = client_key
scope = "api://<client_issuer1>/.default"
if (zone == "Zone1"):
scope = "api://<client_issuer2>/.default"
elif(zone in zone_list):
client_id = client_id
client_key = client_key
scope = "api://<client_issuer3>/.default"
if (zone == "Zone4"):
scope = "api://<client_issuer4>/.default"
if (zone == "Zone5"):
scope = "api://<client_issuer5>/.default"
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_key": client_key,
"scope": scope
}
response = requests.request("GET", url, headers=headers, data=data)
if response.status_code != 200:
exit()
return response.json()["access_token"]
def generate_scenario_payload(scenario, staging=True):
platform = "Staging" if staging==True else "Real"
payload = payload_dictionary[scenario][platform]
data = []
for (post_key, file_id) in data_dictionary[scenario][platform]:
data.append(
(
post_key,
(file_dict[file_id][0], open(file_dict[file_id][1], 'rb'), 'text/csv')
)
)
return payload, data
def generate_all_post_data(zone, bearer_token, total_requests, scenario="D+I"):
server = server_selector[zone]
file1 = daily
file2 = argument
headers = {'Authorization': 'Bearer {}'.format(bearer_token)}
post_data = []
if(zone in ["Local", "Zone1", "Zone2"]):
url = "{}/api/<server-endpoint>".format(server)
for _ in range(total_requests):
payload, data = generate_scenario_payload(scenario, staging=True)
post_data.append((url, headers, payload, data, False, zone, bearer_token))
else:
url = "{}/api/<real-server-endpoint>".format(server)
for _ in range(total_requests):
payload, data = generate_scenario_payload(scenario, staging=False)
post_data.append((url, headers, payload, data, True, zone, bearer_token))
return post_data
def post_url(args):
global global_counter
global threadLock
sl_no = 0
start_time = time.time()
created_timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
response = requests.request("POST", args[0], headers=args[1], data=args[2], files=args[3], verify=args[4])
with threadLock:
global_counter += 1
sl_no = global_counter
elapsed_time = round(time.time() - start_time, 2)
success_code = None
operation_id_key = ""
status_key = ""
if args[5] in ["Local", "Zone1", "Zone2"]:
success_code = 200
status_key = "status"
operation_id_key = "<key1>"
else:
success_code = 202
status_key = "Status"
operation_id_key = "<key2>"
if response.status_code == success_code:
(id, status) = (response.json()[operation_id_key], response.json()[status_key])
logging.info("{}\t{}\t{}\t{}\t{:.2f}s".format(args[5][:5], sl_no, id, status, elapsed_time))
return sl_no, response.status_code, id, status, start_time, elapsed_time, created_timestamp
return sl_no, response.status_code, response.text, "N/A", start_time, elapsed_time, created_timestamp
def create_job_requests(post_data, workers):
print("\n" + "-"*150)
logging.info("{}\t{}\t{}\t\t\t\t\t{}\t{}".format("Zone", "Sl.No.", "JobID", "Status", "CreationTime"))
with ThreadPoolExecutor(max_workers=workers) as pool:
response_list = list(pool.map(post_url, post_data))
requests_df = pd.DataFrame({
'Code': pd.Series(dtype=int),
'Zone': pd.Series(dtype=str),
'Timestamp': pd.Series(dtype=str),
'CreationTime': pd.Series(dtype=float),
'RequestId': pd.Series(dtype=str),
'Sl.No.': pd.Series(dtype=int)})
final_status_df = pd.DataFrame({
'Status': pd.Series(dtype=int),
'Zone': pd.Series(dtype=str),
'TimeOfRequest': pd.Series(dtype=str),
'StartTimeOrigin': pd.Series(dtype=float),
'CreationTime': pd.Series(dtype=float),
'TotalTime': pd.Series(dtype=float),
'RequestId': pd.Series(dtype=str),
'LastPolled': pd.Series(dtype=float),
'TimeIn0': pd.Series(dtype=float),
'TimeIn1': pd.Series(dtype=float),
'GetStatusTime': pd.Series(dtype=float),
'Sl.No.': pd.Series(dtype=int)})
for cnt, (sl_no, code, message_or_requestId, status, start_time, elapsed_time, created_timestamp) in enumerate(response_list):
zone = post_data[cnt][5]
new_row = {}
if code not in [200, 202]:
new_row = {
'Zone': zone, 'Code': code,
'BearerToken': post_data[cnt][6],
'Timestamp': created_timestamp, 'CreationTime': elapsed_time,
'Sl.No': sl_no, 'RequestId': "N/A"
}
logging.info("{}\t{}\t{}\t{}".format(zone, sl_no, code, message_or_requestId))
else:
new_row = {
'Zone': zone, 'Code': code,
'BearerToken': post_data[cnt][6],
'Timestamp': created_timestamp,
'CreationTime': elapsed_time,
'RequestId': message_or_requestId,
'Sl.No': sl_no
}
final_status_df = final_status_df.append({
'Status': status,
'Zone': zone,
'BearerToken': post_data[cnt][6],
'TimeOfRequest': created_timestamp,
'StartTimeOrigin': start_time,
'CreationTime': elapsed_time,
'TotalTime': elapsed_time,
'RequestId': message_or_requestId,
'LastPolled': start_time,
'GetStatusTime': elapsed_time,
'Sl.No.': sl_no,
'TimeIn0': 0, 'TimeIn1': 0,
}, ignore_index=True)
requests_df = requests_df.append(new_row, ignore_index=True)
print("\n" + "-"*80)
totalRequests = len(requests_df)
successfulRequests = 0
for success_code in [200, 202]:
if success_code in requests_df["Code"].value_counts():
successfulRequests = successfulRequests + requests_df["Code"].value_counts()[success_code]
failedRequests = totalRequests - successfulRequests
requestRate = successfulRequests * 100.0 / totalRequests
avgRequestTimes = round(requests_df["CreationTime"].mean(), 2)
print("{}\t{}\t{}\t{}\t{}".format("RequestRate", "Requests", "RequestSuccess", "RequestFailure", "TimeToRequest(s)"))
print("{}\t\t{}\t\t{}\t\t{}\t\t{}".format(requestRate, totalRequests, successfulRequests, failedRequests, avgRequestTimes))
print("\n" + "-"*80)
print("{}\t{}\t\t{}\t{}".format("Code", "Percent", "Requests", "TimeToRequest(s)"))
for code_value in ["2xx", "4xx", "5xx"]:
percent, request_count, avg_code_time = 100.0, 0, 0.0
if(code_value[0] == "2"):
lower_bound, upper_bound = 200, 299
elif(code_value[0] == "4"):
lower_bound, upper_bound = 400, 499
elif(code_value[0] == "5"):
lower_bound, upper_bound = 500, 599
request_count = ((requests_df["Code"] >= lower_bound) & (requests_df["Code"] <= upper_bound)).sum()
percent = round(request_count * 100.0 / totalRequests, 1)
avg_code_time = avg_code_time + requests_df.loc[(requests_df['Code'] >= lower_bound) & (requests_df['Code'] <= upper_bound), 'CreationTime'].sum()
if avg_code_time != 0.0:
avg_code_time = round(avg_code_time * 1.0 / request_count, 2)
print("{}\t{}\t\t{}\t\t{}".format(code_value, percent, request_count, avg_code_time))
print("\n" + "-"*80)
return requests_df, final_status_df
def get_status_caller(args):
return get_status(args[0], args[1], args[2], args[3])
def create_plots(graph_df):
values = list(graph_df[["CreationTime", "TimeIn0", "TimeIn1", "Sl.No."]].itertuples(index=False, name=None))
n = len(values)
max_values = [sum(v) for v in values]
max_total = max(max_values)
max_total = sum(max(values, key=lambda x: sum(x[:-1])))
#max_chars = 40 // (len(values) + 1)
max_chars = 40
colors = [fg('white') + bg('light_green'), fg('white') + bg('light_red'), fg('white') + bg('light_blue')]
reset_color = attr('reset')
for i in range(n):
max_total = sum(max(values, key=lambda x: sum(x[:-1])))
bar_string = ''
for j in range(3):
sum_current = values[i][0] + values[i][1] + values[i][2]
if values[i][j] != 0:
bar_length = int(values[i][j] / max_total * max_chars)
if bar_length == 0:
bar_length = 1
else:
bar_length = 0
bar_string += colors[j] + ' ' * bar_length + reset_color
print()
print('\033[{}A'.format(2))
if(len(values)>99):
print(str(values[i][3]).ljust(3), bar_string)
elif(len(values)>9):
print(str(values[i][3]).ljust(2), bar_string)
else:
print(str(values[i][3]).ljust(1), bar_string)
def print_status(final_status_df, move=True):
if(move):
print('\033[{}A'.format(len(final_status_df)+5))
pd.set_option('display.max_colwidth', 1000)
print(final_status_df[[
"TimeOfRequest", "Zone", "Sl.No.", "RequestId", "Status", "TotalTime",
"CreationTime", "TimeIn0", "TimeIn1", "GetStatusTime"]]
.to_string(index=False)
)
print("\n")
#create_plots(final_status_df[["Sl.No.", "RequestId", "TotalTime", "CreationTime", "TimeIn0", "TimeIn1"]])
print("Timestamp: {}".format(datetime.now().strftime('%H:%M:%S.%f')[:-3]))
def checkStatuses(final_status_df, workers, wait):
final_status_df = final_status_df.sort_values(by='Sl.No.')
printed = False
while((final_status_df['Status'] < 2).any()):
get_data = []
for index, row in final_status_df.iterrows():
if(row['Status'] < 2):
get_data.append((row["RequestId"], index, row["BearerToken"], server_selector[row["Zone"]]))
with ThreadPoolExecutor(max_workers=workers) as pool:
response_list = list(pool.map(get_status_caller, get_data))
timer_begin = time.time()
for (index, new_status, time_call, time_new) in response_list:
try:
if(int(new_status) < 0):
continue
except:
continue
row = final_status_df.loc[index]
last_polled = row['LastPolled']
old_status = row['Status']
if(old_status == 0):
row['TimeIn0'] = round(row['TimeIn0'] + time_new - last_polled, 2)
elif(old_status == 1):
row['TimeIn1'] = round(row['TimeIn1'] + time_new - last_polled, 2)
row['LastPolled'] = time_new
row['TotalTime'] = row['CreationTime'] + row['TimeIn0'] + row['TimeIn1']
row['GetStatusTime'] = round(time_call, 2)
row['Status'] = int(new_status)
final_status_df.loc[index] = row
print_status(final_status_df, printed)
printed = True
if ((final_status_df['Status'] > 1).all()):
break
time.sleep(wait - (time.time() - timer_begin))
return final_status_df
def all_zone_calls(threads=4, wait_in_polling=7, scenario="D+I"):
all_tokens = {}
post_data = []
for zone in ["Zone1", ..., "Zone-n"]:
all_tokens[zone] = create_token(zone)
post_data.extend(generate_all_post_data(zone, all_tokens[zone], 1, scenario))
requests_df, final_status_df = create_job_requests(post_data, threads)
final_status_df = checkStatuses(final_status_df, threads, wait_in_polling)
def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)03d \t%(message)s',
datefmt='%H:%M:%S',
handlers=[logging.StreamHandler()]
)
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--threads', type=int, default=4, help='parallel workers')
parser.add_argument('-v', '--verify', action="store_true", help='Validate all zones')
parser.add_argument('-n', '--number-of-total-requests', type=int, default=4, help='Total Job number')
parser.add_argument('-w', '--wait-in-polling', type=int, default=7, help='How long to wait between each get_status')
parser.add_argument(
'-z', '--zone', type=str, help='staging-zones', default="PRE-Staging",
choices=zones_list
)
parser.add_argument(
'-s', '--scenario', type=str, help='scenario combinations', default="D+I",
choices=test_scenarios
)
arguments = parser.parse_args()
if(arguments.verify):
if (arguments.scenario not in tested_scenarios):
print("This scenario is currently not tested by the script")
exit()
for scenario in test_scenarios:
if (arguments.scenario == scenario or arguments.scenario == "ALL"):
print("\n" + "-"*80)
print(scenario)
print("-"*80)
all_zone_calls(arguments.threads, arguments.wait_in_polling, scenario)
else:
bearer_token= create_token(arguments.zone)
post_data = generate_all_post_data(arguments.zone, bearer_token, arguments.number_of_total_requests)
requests_df, final_status_df = create_job_requests(post_data, arguments.threads)
final_status_df = checkStatuses(final_status_df, arguments.threads, arguments.wait_in_polling)
return
def get_status(id, index, bearer_token, server):
url = ""
if "staging" in server.lower():
url = "{}/api/<server-endpoint>?identifier={}".format(server, id)
else:
url = "{}/api//api/<real-server-endpoint>?identifier={}".format(server, id)
headers = {
'Authorization': 'Bearer {}'.format(bearer_token)
}
start_time = time.time()
response = requests.request("GET", url, headers=headers, data={})
time_new = time.time()
time_call = time_new - start_time
status = -1
logging.debug("{}\t{}\n".format(response.status_code, response.text))
if response.status_code == 200:
status = response.json()["status"]
elif response.status_code == 202:
status = response.json()["Status"]
return (index, status, time_call, time_new)
if __name__ == "__main__":
main()