from submissionFileReader import readSubmissionContent, levenshteinDistance from collections import namedtuple class SubmissionAnalysis: def __init__ (self, exerciseID): self.exerciseID = exerciseID self.data = list() self.SubmissionData = namedtuple("SubmissionData", "student_id f{0}_TES f{0}_DES f{0}_grade f{0}_DT f{0}_timestamp".format(exerciseID)) headerMap = {} headerMap["f{0}_TES".format(exerciseID)] = "{0}_TES".format(exerciseID) headerMap["f{0}_DES".format(exerciseID)] = "{0}_DES".format(exerciseID) headerMap["f{0}_grade".format(exerciseID)] = "{0}_grade".format(exerciseID) headerMap["f{0}_DT".format(exerciseID)] = "{0}_D/T".format(exerciseID) headerMap["student_id"] = "student_id" headerMap["f{0}_timestamp".format(exerciseID)] = "{0}_timestamp".format(exerciseID) self.headerMap = headerMap def analyze (self, submissions, firstTimestamp, path): submissions = self.fixTimestamps(submissions) resultList = [] submissionsWithFiles = [] for s in submissions: content = readSubmissionContent(path, self.exerciseID, s.submission_id) if len(content) > 0: submissionsWithFiles.append((s,content)) assert len(submissionsWithFiles) >= 1, "No valid code submitted to exercise {} from student {}".format(self.exerciseID,submissions[0].user_id) total = len(submissionsWithFiles) firstTuple = submissionsWithFiles[0] first = firstTuple[0] firstTES = first.time - firstTimestamp firstContent = firstTuple[1] firstDES = levenshteinDistance("", firstContent) firstDT = 0 if firstTES == 0 or firstDES == 0 else firstDES/firstTES resultList.append(self.SubmissionData(first.user_id, firstTES, firstDES, first.grade, firstDT, first.time)) for i in range(1, total): subTuple = submissionsWithFiles[i] sub = subTuple[0] tes = sub.time - submissionsWithFiles[i-1][0].time subContent = subTuple[1] prevContent = submissionsWithFiles[i-1][1] des = levenshteinDistance(prevContent,subContent) dt = 0 if tes == 0 or des == 0 else des/tes resultList.append(self.SubmissionData(sub.user_id, tes, des, sub.grade, dt, sub.time)) return (self.exerciseID, resultList) def fixTimestamps (self, submissions): sameTS = list() repeated = 0 for i in range(1,len(submissions)): prev = i - 1 if submissions[prev].time == submissions[i].time: sameTS.append(i) else: if len(sameTS) > 0: repeated += len(sameTS) self.spreadTSEvenly(sameTS, submissions) sameTS = list() continue if len(sameTS) > 0: repeated += len(sameTS) self.spreadTSEvenly(sameTS, submissions) # we need to sort submissions.sort(key=lambda x: x.time) if repeated > 0: sub = submissions[0] print("{0} repeated {1} TS for exercise {2}".format(sub.user_id,repeated,self.exerciseID)) return submissions def spreadTSEvenly (self, indexes, submissions): print("repeated ts") if len(indexes) == 1: sub = submissions[indexes[0]] submissions[indexes[0]] = sub._replace(time=sub.time + 30) else: if len(indexes) > 2: print("We have a problem...") first = indexes[0] sub = submissions[first] submissions[first] = sub._replace(time=sub.time + 20) for i in range(1, len(indexes)): current = indexes[i] prev = indexes[i-1] sub = submissions[current] submissions[current] = sub._replace(time=submissions[prev].time + 20) def addData (self, submissionData): self.data.append(submissionData) def saveToCSV (self, folder, dataset): with open("{}/{}.csv".format(folder, self.exerciseID),"w", encoding='utf-8') as file: fields = self.SubmissionData._fields translatedFields = [self.headerMap[x] for x in fields] header = ",".join(translatedFields) file.write(header) file.write('\n') for data in dataset: lineData = [getattr(data, x) for x in fields] line = ",".join(str(e) for e in lineData) file.write(line) file.write('\n') file.close()