Commit d91dc14a authored by  Mundra, Anil's avatar Mundra, Anil

final commit

parent d0f7081b
......@@ -9,6 +9,17 @@ import time
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
import snowflake.connector
import datetime
from collections import namedtuple
def create_record(obj, fields):
''' given obj from db returns named tuple with fields mapped to values '''
Record = namedtuple("Record", fields)
mappings = dict(zip(fields, obj))
return Record(**mappings)
class User(BaseModel):
email: str
password: str
......@@ -191,4 +202,130 @@ async def execute_testcases():
curs1.execute("select current date;")
#fetch result
print (curs1.fetchone()[0])
return { 'data': curs1.fetchone()[0]}
\ No newline at end of file
return { 'data': curs1.fetchone()[0]}
@app.get("/executetests/{id}")
async def execute_tests(id: int):
cursor.execute("SELECT * FROM steps WHERE suiteid = %s", (str(id),))
tests = cursor.fetchall()
conn.commit()
cursor.execute("DELETE FROM results WHERE testsuite_id = %s RETURNING *", (str(id),))
conn.commit()
# print(tests)
all_tests = []
for row in tests:
all_tests.append(dict(row))
test_case_details = []
for test in all_tests:
test_case = test['name'].lower()
if "record count" in test_case:
log_msg = []
message = f'>>> Database connection established {datetime.datetime.now()}'
log_msg.append(message)
message = f'>>> User successfully authenticated {datetime.datetime.now()}'
log_msg.append(message)
message = f'>>> record count check execution started at {datetime.datetime.now()}'
log_msg.append(message)
message = f'>>> query execution starts for source records -- time: {datetime.datetime.now()}'
log_msg.append(message)
cursor.execute("SELECT * FROM employee_demo")
source_records = cursor.fetchall()
source_length = len(source_records)
message = f'>>> length of the suorce records are {len(source_records)} -- time: {datetime.datetime.now()}'
log_msg.append(message)
curs1.execute("SELECT * FROM EMPLOYEE_DEMO")
message = f'>>> query execution starts for target records -- time: {datetime.datetime.now()}'
log_msg.append(message)
target_records = curs1.fetchall()
target_length = len(target_records)
message = f'>>> length of the target records are {len(target_records)}: -- time: {datetime.datetime.now()}'
log_msg.append(message)
message = f'>>> validating the record count source vs target: -- time: {datetime.datetime.now()}'
log_msg.append(message)
if source_length == target_length:
message = f'>>> source and target rowcount is matching: time: {datetime.datetime.now()}'
log_msg.append(message)
message = f'>>> capturing and saving the results to the system'
log_msg.append(message)
temp_str = ''
i = 0
for log in log_msg:
if not (i == 0):
temp_str = temp_str + ',' + log
i = i + 1
# cursor.execute("DELETE FROM results WHERE testsuite_id = %s RETURNING *", (str(id),))
# conn.commit()
cursor.execute("INSERT INTO results(testcase_name, testsuite_id, testcase_description, project_id, execution_status, log) VALUES(%s, %s, %s, %s, %s, %s) RETURNING *",
(test['name'], id, test['description'], 'PROJECT_SETUP', 'PASS', temp_str))
test = cursor.fetchone()
conn.commit()
# print(test)
elif "column checks" in test_case:
# cursor.execute("DELETE FROM results WHERE testsuite_id = %s RETURNING *", (str(id),))
# conn.commit()
cursor.execute("SELECT * FROM employee_demo")
col_names_source = [desc[0] for desc in cursor.description]
# rows_source = cursor.fetchall()
# conn.commit()
# result = []
# for row in rows_source:
# result.append(create_record(row, col_names_source))
curs1.execute("SELECT * FROM EMPLOYEE_DEMO")
target_names_source = [desc[0] for desc in curs1.description]
conn1.commit()
# print(col_names_source)
# print(target_names_source)
column_check = True
for source_column, target_column in zip(col_names_source, target_names_source):
if not source_column.lower() == target_column.lower():
column_check = False
if column_check:
cursor.execute("INSERT INTO results(testcase_name, testsuite_id, testcase_description, project_id, execution_status, log) VALUES(%s, %s, %s, %s, %s, %s) RETURNING *",
(test['name'], id, test['description'], 'PROJECT_SETUP', 'PASS', 'Long String'))
test = cursor.fetchone()
conn.commit()
elif "duplicates check" in test_case:
cursor.execute("SELECT name, designation, COUNT(*) FROM employee_demo GROUP BY name, designation HAVING COUNT(*) > 1")
source_dupliactes_count = cursor.fetchall()
conn.commit()
# print(source_dupliactes_count)
curs1.execute("SELECT name, designation, COUNT(*) FROM employee_demo GROUP BY name, designation HAVING COUNT(*) > 1")
target_dupliactes_count = cursor.fetchall()
conn1.commit()
if len(source_dupliactes_count) == 0 and len(target_dupliactes_count) == 0:
cursor.execute("INSERT INTO results(testcase_name, testsuite_id, testcase_description, project_id, execution_status, log) VALUES(%s, %s, %s, %s, %s, %s) RETURNING *",
(test['name'], id, test['description'], 'PROJECT_SETUP', 'PASS', 'Long String'))
test = cursor.fetchone()
conn.commit()
return { 'data': 'results saved successful'}
@app.get("/results")
async def get_tests():
# cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute("SELECT * FROM results")
projects = cursor.fetchall()
conn.commit()
all_results = []
for row in projects:
all_results.append(dict(row))
return{"data": all_results}
@app.get("/logs/{testsuite_id}/{id}")
async def get_tests(testsuite_id: int, id: int):
# cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(f"SELECT log FROM results WHERE testsuite_id = '{str(testsuite_id)}' AND id = '{str(id)}'")
logs = cursor.fetchall()
conn.commit()
message = logs[0][0]
# all_tests = []
# for row in logs:
# all_tests.append(dict(row))
return{message}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment