123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- from submissionFileReader import readSubmissionContent, levenshteinDistance
- from collections import namedtuple
- class SubmissionAnalysis:
- def __init__ (self, exerciseID):
- self.exerciseID = exerciseID
- self.data = list()
- self.SubmissionData = namedtuple("SubmissionData", "student_id f{0}_TES f{0}_DES f{0}_grade f{0}_DT f{0}_timestamp".format(exerciseID))
- headerMap = {}
- headerMap["f{0}_TES".format(exerciseID)] = "{0}_TES".format(exerciseID)
- headerMap["f{0}_DES".format(exerciseID)] = "{0}_DES".format(exerciseID)
- headerMap["f{0}_grade".format(exerciseID)] = "{0}_grade".format(exerciseID)
- headerMap["f{0}_DT".format(exerciseID)] = "{0}_D/T".format(exerciseID)
- headerMap["student_id"] = "student_id"
- headerMap["f{0}_timestamp".format(exerciseID)] = "{0}_timestamp".format(exerciseID)
- self.headerMap = headerMap
- def analyze (self, submissions, firstTimestamp, path):
- submissions = self.fixTimestamps(submissions)
- resultList = []
- submissionsWithFiles = []
- for s in submissions:
- content = readSubmissionContent(path, self.exerciseID, s.submission_id)
- if len(content) > 0:
- submissionsWithFiles.append((s,content))
- assert len(submissionsWithFiles) >= 1, "No valid code submitted to exercise {} from student {}".format(self.exerciseID,submissions[0].user_id)
- total = len(submissionsWithFiles)
- firstTuple = submissionsWithFiles[0]
- first = firstTuple[0]
- firstTES = first.time - firstTimestamp
- firstContent = firstTuple[1]
- firstDES = levenshteinDistance("", firstContent)
- firstDT = 0 if firstTES == 0 or firstDES == 0 else firstDES/firstTES
- resultList.append(self.SubmissionData(first.user_id, firstTES, firstDES, first.grade, firstDT, first.time))
- for i in range(1, total):
- subTuple = submissionsWithFiles[i]
- sub = subTuple[0]
- tes = sub.time - submissionsWithFiles[i-1][0].time
- subContent = subTuple[1]
- prevContent = submissionsWithFiles[i-1][1]
- des = levenshteinDistance(prevContent,subContent)
- dt = 0 if tes == 0 or des == 0 else des/tes
- resultList.append(self.SubmissionData(sub.user_id, tes, des, sub.grade, dt, sub.time))
- return (self.exerciseID, resultList)
- def fixTimestamps (self, submissions):
- sameTS = list()
- repeated = 0
- for i in range(1,len(submissions)):
- prev = i - 1
- if submissions[prev].time == submissions[i].time:
- sameTS.append(i)
- else:
- if len(sameTS) > 0:
- repeated += len(sameTS)
- self.spreadTSEvenly(sameTS, submissions)
- sameTS = list()
- continue
- if len(sameTS) > 0:
- repeated += len(sameTS)
- self.spreadTSEvenly(sameTS, submissions)
- # we need to sort
- submissions.sort(key=lambda x: x.time)
- if repeated > 0:
- sub = submissions[0]
- print("{0} repeated {1} TS for exercise {2}".format(sub.user_id,repeated,self.exerciseID))
- return submissions
- def spreadTSEvenly (self, indexes, submissions):
- print("repeated ts")
- if len(indexes) == 1:
- sub = submissions[indexes[0]]
- submissions[indexes[0]] = sub._replace(time=sub.time + 30)
- else:
- if len(indexes) > 2:
- print("We have a problem...")
- first = indexes[0]
- sub = submissions[first]
- submissions[first] = sub._replace(time=sub.time + 20)
- for i in range(1, len(indexes)):
- current = indexes[i]
- prev = indexes[i-1]
- sub = submissions[current]
- submissions[current] = sub._replace(time=submissions[prev].time + 20)
- def addData (self, submissionData):
- self.data.append(submissionData)
- def saveToCSV (self, folder, dataset):
- with open("{}/{}.csv".format(folder, self.exerciseID),"w", encoding='utf-8') as file:
- fields = self.SubmissionData._fields
- translatedFields = [self.headerMap[x] for x in fields]
- header = ",".join(translatedFields)
- file.write(header)
- file.write('\n')
- for data in dataset:
- lineData = [getattr(data, x) for x in fields]
- line = ",".join(str(e) for e in lineData)
- file.write(line)
- file.write('\n')
- file.close()
|