# Stress Loading

Attempting to fail your research can be a useful metric to design future directions. The following hammers the servers across zones only limited by the socket limit of an authenticated in-house system. The display self-updates published values, as requests progress.

Image
Image

__author__      = "Vishal Anand"
__email__       = "vishal.anand@columbia.edu"
__copyright__   = "Copyright 2024"

from colored import fg, bg, attr
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pprint import pprint

import argparse, copy, json, logging, os, requests, sys, threading, time, warnings
import pandas as pd

from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

def login():
  try:
    os.system("az login --identity --output none")
  except:
    os.system("az login --tenant <tenant> --output none")
    os.system("az account set --subscription '<subscription>' --output none")

login()
KEY_VAULT_URL = os.environ.get("KEY_VAULT_URL") or "https://<keyvault>.vault.azure.net"
credential = DefaultAzureCredential()
vault_client = SecretClient(vault_url=KEY_VAULT_URL, credential=credential)
values = {}

AZURE_VAULT_KEYS = [
  "keys",
  ...
]
for key in AZURE_VAULT_KEYS:
    values[key] = vault_client.get_secret(key).value

azure_client = values["clientid"]
azure_key = values["client-key"]

csv_blob_connection_string = values["blob-connector-url"]
container_name = "csv"
blob_service_client = BlobServiceClient.from_connection_string(csv_blob_connection_string)
container_client = blob_service_client.get_container_client(container_name)

csv_files = [
  "file.csv",
  ...
]

folder_name = "./csv"
os.makedirs(folder_name, exist_ok=True)
for file_name in csv_files:
    file_path = os.path.join(folder_name, file_name)
    if not os.path.exists(file_path):
        print(f"Downloading {file_name}...")
        blob_client = container_client.get_blob_client(file_name)
        with open(file_path, "wb") as file:
            file.write(blob_client.download_blob().readall())
        print(f"{file_name} downloaded.")
    else:
        print(f"{file_name} already exists.")

file_root = "./csv"

daily = os.path.join(file_root, "file.csv")

warnings.filterwarnings('ignore')

global_counter = 0
threadLock = threading.Lock()

file_dict = {
  'daily': ('daily.csv', daily, 'text/csv'),
}

zones_list = [
    "Local", "Zone1", "Zone2", 
    ...,
    "Zone-n"
]

server_selector = {
  "Local": "https://localhost:<port>",
  "Zone1": "<>",
  ...: ...,
  "Zone-n": "<>"
}

test_scenarios = [
  "ALL", 
  "scenario-1", "scenario-2", 
  ...
]

tested_scenarios = ["scenario-1", "D+I+S", "D+I+SA", "D+I+S+SA"]
zone_list = ["Local", "Zone1", "Zone2", ..., "Zone-n"]

payload_dictionary = {
  "D": {
    "Staging": {'Metadata': '{"file":"file.csv",...}'},
    "Real": {'metadata': '{InputDataFileName:"file.csv", ...}'}
  },
  ...: ...
}
data_dictionary = {
  "D": {
    "Staging": [('data', 'daily'), ('argument1', 'argumentvariable1')],
    "Real": [('inputFile', 'daily'), ('argument1', 'argumentvariable1')]
  },
  ...: ...
}

def create_token(zone):
  url = "https://login.microsoftonline.com/<tenant>/oauth2/v2.0/token"
  if (zone == "Zone1" or zone == "Zone2" or zone == "Local"):
    client_id = client_id
    client_key = client_key
    scope = "api://<client_issuer1>/.default"
    
    if (zone == "Zone1"):
      scope = "api://<client_issuer2>/.default"
    
  elif(zone in zone_list):
    client_id = client_id
    client_key = client_key
    scope = "api://<client_issuer3>/.default"
    
    if (zone == "Zone4"):
      scope = "api://<client_issuer4>/.default"
    if (zone == "Zone5"):
      scope = "api://<client_issuer5>/.default"

  headers = {'Content-Type': 'application/x-www-form-urlencoded'}
  data={
    "grant_type":    "client_credentials",
    "client_id":     client_id,
    "client_key": client_key,
    "scope":         scope
  }
  response = requests.request("GET", url, headers=headers, data=data)
  
  if response.status_code != 200:
    exit()

  return response.json()["access_token"]

def generate_scenario_payload(scenario, staging=True):
  platform = "Staging" if staging==True else "Real"
  payload = payload_dictionary[scenario][platform]

  data = []
  for (post_key, file_id) in data_dictionary[scenario][platform]:
    data.append(
        (
            post_key,
            (file_dict[file_id][0], open(file_dict[file_id][1], 'rb'), 'text/csv')
        )
    )

  return payload, data


def generate_all_post_data(zone, bearer_token, total_requests, scenario="D+I"):
  server = server_selector[zone]
  file1 = daily
  file2 = argument
  headers = {'Authorization': 'Bearer {}'.format(bearer_token)}
  post_data = []
  if(zone in ["Local", "Zone1", "Zone2"]):
    url = "{}/api/<server-endpoint>".format(server)
    for _ in range(total_requests):
      payload, data = generate_scenario_payload(scenario, staging=True)
      post_data.append((url, headers, payload, data, False, zone, bearer_token))
  else:
    url = "{}/api/<real-server-endpoint>".format(server)
    for _ in range(total_requests):
      payload, data = generate_scenario_payload(scenario, staging=False)
      post_data.append((url, headers, payload, data, True, zone, bearer_token))
  
  return post_data


def post_url(args):
  global global_counter 
  global threadLock
  sl_no = 0
  start_time = time.time()
  created_timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
  response = requests.request("POST", args[0], headers=args[1], data=args[2], files=args[3], verify=args[4])
  with threadLock:
    global_counter += 1
    sl_no = global_counter
  elapsed_time = round(time.time() - start_time, 2)
  success_code = None
  operation_id_key = ""
  status_key = ""

  if args[5] in ["Local", "Zone1", "Zone2"]:
    success_code = 200
    status_key = "status"
    operation_id_key = "<key1>"
  else:
    success_code = 202
    status_key = "Status"
    operation_id_key = "<key2>"

  if response.status_code == success_code:
    (id, status) = (response.json()[operation_id_key], response.json()[status_key])
    logging.info("{}\t{}\t{}\t{}\t{:.2f}s".format(args[5][:5], sl_no, id, status, elapsed_time))
    return sl_no, response.status_code, id, status, start_time, elapsed_time, created_timestamp

  return sl_no, response.status_code, response.text, "N/A", start_time, elapsed_time, created_timestamp

def create_job_requests(post_data, workers):
  print("\n" + "-"*150)
  logging.info("{}\t{}\t{}\t\t\t\t\t{}\t{}".format("Zone", "Sl.No.", "JobID", "Status", "CreationTime"))
  with ThreadPoolExecutor(max_workers=workers) as pool:
    response_list = list(pool.map(post_url, post_data))
  
  requests_df = pd.DataFrame({
      'Code': pd.Series(dtype=int),
      'Zone': pd.Series(dtype=str),
      'Timestamp': pd.Series(dtype=str),
      'CreationTime': pd.Series(dtype=float),
      'RequestId': pd.Series(dtype=str),
      'Sl.No.': pd.Series(dtype=int)})
  
  final_status_df = pd.DataFrame({
      'Status': pd.Series(dtype=int),
      'Zone': pd.Series(dtype=str),
      'TimeOfRequest': pd.Series(dtype=str),
      'StartTimeOrigin': pd.Series(dtype=float),
      'CreationTime': pd.Series(dtype=float),
      'TotalTime': pd.Series(dtype=float),
      'RequestId': pd.Series(dtype=str),
      'LastPolled': pd.Series(dtype=float),
      'TimeIn0': pd.Series(dtype=float),
      'TimeIn1': pd.Series(dtype=float),
      'GetStatusTime': pd.Series(dtype=float),
      'Sl.No.': pd.Series(dtype=int)})
  for cnt, (sl_no, code, message_or_requestId, status, start_time, elapsed_time, created_timestamp) in enumerate(response_list):
    zone = post_data[cnt][5]
    new_row = {}
    if code not in [200, 202]:
      new_row = {
          'Zone': zone, 'Code': code, 
          'BearerToken': post_data[cnt][6],
          'Timestamp': created_timestamp, 'CreationTime': elapsed_time, 
          'Sl.No': sl_no, 'RequestId': "N/A"
      }
      logging.info("{}\t{}\t{}\t{}".format(zone, sl_no, code, message_or_requestId))
    else:
      new_row = {
          'Zone': zone, 'Code': code, 
          'BearerToken': post_data[cnt][6],
          'Timestamp': created_timestamp,
          'CreationTime': elapsed_time, 
          'RequestId': message_or_requestId,
          'Sl.No': sl_no
      }
      final_status_df = final_status_df.append({
          'Status': status, 
          'Zone': zone,
          'BearerToken': post_data[cnt][6],
          'TimeOfRequest': created_timestamp,
          'StartTimeOrigin': start_time, 
          'CreationTime': elapsed_time, 
          'TotalTime': elapsed_time, 
          'RequestId': message_or_requestId, 
          'LastPolled': start_time, 
          'GetStatusTime': elapsed_time, 
          'Sl.No.': sl_no,
          'TimeIn0': 0, 'TimeIn1': 0,
        }, ignore_index=True)
    
    requests_df = requests_df.append(new_row, ignore_index=True)

  print("\n" + "-"*80)
  totalRequests = len(requests_df)
  successfulRequests = 0
  for success_code in [200, 202]:
    if success_code in requests_df["Code"].value_counts():
      successfulRequests = successfulRequests + requests_df["Code"].value_counts()[success_code]

  failedRequests = totalRequests - successfulRequests
  requestRate = successfulRequests * 100.0 / totalRequests
  avgRequestTimes = round(requests_df["CreationTime"].mean(), 2)
  print("{}\t{}\t{}\t{}\t{}".format("RequestRate", "Requests", "RequestSuccess", "RequestFailure", "TimeToRequest(s)"))
  print("{}\t\t{}\t\t{}\t\t{}\t\t{}".format(requestRate, totalRequests, successfulRequests, failedRequests, avgRequestTimes))

  print("\n" + "-"*80)
  print("{}\t{}\t\t{}\t{}".format("Code", "Percent", "Requests", "TimeToRequest(s)"))
  for code_value in ["2xx", "4xx", "5xx"]:
    percent, request_count, avg_code_time = 100.0, 0, 0.0
    if(code_value[0] == "2"):
      lower_bound, upper_bound = 200, 299
    elif(code_value[0] == "4"):
      lower_bound, upper_bound = 400, 499
    elif(code_value[0] == "5"):
      lower_bound, upper_bound = 500, 599
    
    request_count = ((requests_df["Code"] >= lower_bound) & (requests_df["Code"] <= upper_bound)).sum()
    percent = round(request_count * 100.0 / totalRequests, 1)
    avg_code_time = avg_code_time + requests_df.loc[(requests_df['Code'] >= lower_bound) & (requests_df['Code'] <= upper_bound), 'CreationTime'].sum()
    if avg_code_time != 0.0:
      avg_code_time = round(avg_code_time * 1.0 / request_count, 2)
    print("{}\t{}\t\t{}\t\t{}".format(code_value, percent, request_count, avg_code_time))
  print("\n" + "-"*80)

  return requests_df, final_status_df

def get_status_caller(args):
  return get_status(args[0], args[1], args[2], args[3])

def create_plots(graph_df):
  values = list(graph_df[["CreationTime", "TimeIn0", "TimeIn1", "Sl.No."]].itertuples(index=False, name=None))
  n = len(values)

  max_values = [sum(v) for v in values]
  max_total = max(max_values)
  max_total = sum(max(values, key=lambda x: sum(x[:-1])))
  #max_chars = 40 // (len(values) + 1)
  max_chars = 40
  colors = [fg('white') + bg('light_green'), fg('white') + bg('light_red'), fg('white') + bg('light_blue')]
  reset_color = attr('reset')

  for i in range(n):
      max_total = sum(max(values, key=lambda x: sum(x[:-1])))
      bar_string = ''
      for j in range(3):
          sum_current = values[i][0] + values[i][1] + values[i][2]
          if values[i][j] != 0:
            bar_length = int(values[i][j] / max_total * max_chars)
            if bar_length == 0:
              bar_length = 1
          else:
            bar_length = 0
          bar_string += colors[j] + ' ' * bar_length + reset_color
      print()
      print('\033[{}A'.format(2))
      if(len(values)>99):
        print(str(values[i][3]).ljust(3), bar_string)
      elif(len(values)>9):
        print(str(values[i][3]).ljust(2), bar_string)
      else:
        print(str(values[i][3]).ljust(1), bar_string)

def print_status(final_status_df, move=True):
  if(move):
    print('\033[{}A'.format(len(final_status_df)+5))
  pd.set_option('display.max_colwidth', 1000)
  print(final_status_df[[
    "TimeOfRequest", "Zone", "Sl.No.", "RequestId", "Status", "TotalTime", 
    "CreationTime", "TimeIn0", "TimeIn1", "GetStatusTime"]]
        .to_string(index=False)
  )
  print("\n")
  #create_plots(final_status_df[["Sl.No.", "RequestId", "TotalTime", "CreationTime", "TimeIn0", "TimeIn1"]])
  
  print("Timestamp: {}".format(datetime.now().strftime('%H:%M:%S.%f')[:-3]))

def checkStatuses(final_status_df, workers, wait):
  final_status_df = final_status_df.sort_values(by='Sl.No.')
  printed = False

  while((final_status_df['Status'] < 2).any()):
    get_data = []
    for index, row in final_status_df.iterrows():
      if(row['Status'] < 2):
        get_data.append((row["RequestId"], index, row["BearerToken"], server_selector[row["Zone"]]))

    with ThreadPoolExecutor(max_workers=workers) as pool:
      response_list = list(pool.map(get_status_caller, get_data))
    
    timer_begin = time.time()
    for (index, new_status, time_call, time_new) in response_list:
      try:
        if(int(new_status) < 0):
          continue
      except:
        continue
      row = final_status_df.loc[index]
      last_polled = row['LastPolled']
      old_status = row['Status']
      if(old_status == 0):
        row['TimeIn0'] = round(row['TimeIn0'] + time_new - last_polled, 2)
      elif(old_status == 1):
        row['TimeIn1'] = round(row['TimeIn1'] + time_new - last_polled, 2)

      row['LastPolled'] = time_new
      row['TotalTime'] = row['CreationTime'] + row['TimeIn0'] + row['TimeIn1']
      row['GetStatusTime'] = round(time_call, 2)
      row['Status'] = int(new_status)
      final_status_df.loc[index] = row

    print_status(final_status_df, printed)
    printed = True
    if ((final_status_df['Status'] > 1).all()):
      break
    time.sleep(wait - (time.time() - timer_begin))

  return final_status_df


def all_zone_calls(threads=4, wait_in_polling=7, scenario="D+I"):
    all_tokens = {}
    post_data = []
    for zone in ["Zone1", ..., "Zone-n"]:
      all_tokens[zone] = create_token(zone)
      post_data.extend(generate_all_post_data(zone, all_tokens[zone], 1, scenario))
    requests_df, final_status_df = create_job_requests(post_data, threads)
    final_status_df = checkStatuses(final_status_df, threads, wait_in_polling)

def main():
  logging.basicConfig(
      level=logging.INFO, 
      format='%(asctime)s.%(msecs)03d \t%(message)s', 
      datefmt='%H:%M:%S', 
      handlers=[logging.StreamHandler()]
  )
  parser = argparse.ArgumentParser()
  parser.add_argument('-t', '--threads', type=int, default=4, help='parallel workers')
  parser.add_argument('-v', '--verify', action="store_true", help='Validate all zones')
  parser.add_argument('-n', '--number-of-total-requests', type=int, default=4, help='Total Job number')
  parser.add_argument('-w', '--wait-in-polling', type=int, default=7, help='How long to wait between each get_status')
  parser.add_argument(
    '-z', '--zone', type=str, help='staging-zones', default="PRE-Staging",
    choices=zones_list
  )
  parser.add_argument(
    '-s', '--scenario', type=str, help='scenario combinations', default="D+I",
    choices=test_scenarios
  )
  arguments = parser.parse_args()
    
  if(arguments.verify):
    if (arguments.scenario not in tested_scenarios):
      print("This scenario is currently not tested by the script")
      exit()
    for scenario in test_scenarios:
      if (arguments.scenario == scenario or arguments.scenario == "ALL"):
        print("\n" + "-"*80)
        print(scenario)
        print("-"*80)
        all_zone_calls(arguments.threads, arguments.wait_in_polling, scenario)
  else:
    bearer_token= create_token(arguments.zone)
    post_data = generate_all_post_data(arguments.zone, bearer_token, arguments.number_of_total_requests)
    requests_df, final_status_df = create_job_requests(post_data, arguments.threads)
    final_status_df = checkStatuses(final_status_df, arguments.threads, arguments.wait_in_polling)
  
  return


def get_status(id, index, bearer_token, server):
  url = ""
  if "staging" in server.lower():
    url = "{}/api/<server-endpoint>?identifier={}".format(server, id)
  else:
    url = "{}/api//api/<real-server-endpoint>?identifier={}".format(server, id)

  headers = {
      'Authorization': 'Bearer {}'.format(bearer_token)
  }
  start_time = time.time()
  response = requests.request("GET", url, headers=headers, data={})
  time_new = time.time()
  time_call = time_new - start_time
  status = -1

  logging.debug("{}\t{}\n".format(response.status_code, response.text))
  if response.status_code == 200:
    status = response.json()["status"]
  elif response.status_code == 202:
    status = response.json()["Status"]

  return (index, status, time_call, time_new)


if __name__ == "__main__":
  main()