-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransition_analysis.py
334 lines (303 loc) · 13.4 KB
/
transition_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import copy
import json
import logging
import Levenshtein
import os
import numpy as np
import pandas as pd
import re
import spacy
from datetime import datetime
import sys
from functools import partial
from sentence_transformers import SentenceTransformer, util
from sklearn.metrics import recall_score, precision_score, accuracy_score
from scipy.stats import wilcoxon
from tqdm import tqdm
from typing import List
from dataset_preprocessing import load_from_jsonl, write_to_jsonl
from GeneralLLM import LargeLanguageModel, Qwen, ChatGPT
from KPPerturbation import MultipleChoiceQuestion, get_mcq_llm_answer, QuestionGenerator
logging.basicConfig(level=logging.INFO)
def generate_prompt_for_data(data:list):
result = copy.deepcopy(data)
for i in range(len(data)):
mcq = MultipleChoiceQuestion()
mcq.load_dict(data[i])
result[i]['prompt'] = mcq.get_prompt()
return result
def jaccard_similarity(x:set, y:set):
result = len(x.intersection(y)) / len(x.union(y))
return result
def consist_similarity(x:set, y:set):
result = len(x.intersection(y)) / len(x)
return result
def consist_score(x:np.array, y:np.array):
assert(len(x) == len(y))
n_both_correct = 0
for a, b in zip(x,y):
n_both_correct += (a == 1 and b == 1)
result = n_both_correct / len(x)
return result
def joint_analysis(left_data: List[dict], right_data: List[dict]):
left_df = pd.DataFrame(left_data)
right_df = pd.DataFrame(right_data)
# merge two data
merge_df = pd.merge(left_df, right_df, on = ['subject', 'id'], how = 'inner')
merge_df['score_x'] = (merge_df['true_answer_x'] == merge_df['model_output_x']).astype(int)
merge_df['score_y'] = (merge_df['true_answer_y'] == merge_df['model_output_y']).astype(int)
score_x = merge_df['score_x'].mean()
score_y = merge_df['score_y'].mean()
wilcoxon_result = wilcoxon(
merge_df['score_x'].values,
merge_df['score_y'].values,
alternative = 'greater'
)
print('MCQ Benchmark Performance:')
print(f"score_1 = {score_x.round(4)} ({merge_df['score_x'].sum()}/{merge_df.shape[0]})")
print(f"score_2 = {score_y.round(4)} ({merge_df['score_y'].sum()}/{merge_df.shape[0]})")
print(f"performance_drop_rate = {((score_y-score_x)).round(4)}")
print(f"Wilcoxon hypothesis test = {wilcoxon_result}")
recall_c = recall_score(merge_df['score_x'], merge_df['score_y'])
consist_c = consist_score(merge_df['score_x'], merge_df['score_y'])
# print('Performance Consistency:')
print(f"recall_c = {round(recall_c,4)}")
print(f"acc@consist = {round(consist_c,4)}")
return merge_df
def llm_based_knowledge_invariance_analysis(
left_data: List[dict], right_data: List[dict],
judge: ChatGPT, log_save_path:str, start_id:int = 0):
left_df = pd.DataFrame(left_data)
right_df = pd.DataFrame(right_data)
scores = []
results = []
# merge two data
merge_df = pd.merge(left_df, right_df, on = ['subject', 'id'], how = 'inner')
current_id = 0
for _, row in merge_df.iterrows():
if current_id < start_id:
current_id += 1
continue
print(f'current id = {current_id} start.')
prompt_x = row['prompt_x']
prompt_y = row['prompt_y']
judge_prompt = '''Your task is to grade the knowledge invariance degree of a perturbed multiple choice question against the original question.
You clearly know that if a perturbed question is knowledge-invariant, the question has to satisfy the following requirements:
[Perturbation Requirements Start]
1. Semantic Information Invariance. The perturbed question must have the same semantic information as the original question, which cannot change the name of entities, logic of statements and meaning of equations.
2. Reasoning Invariance. A human test-taker's reasoning process to obtain his/her response in the perturbed question should be consistent with that in the original question.
3. Answer Invariance. The answer of a perturbed question should be semantically equivalent to the answer of the original question.
4. Statement Clarity. The perturbed question should clearly present contexts, conditions and the target of the question without ambiguous statement.
[Perturbation Requirements End]
The grading score is from 1 to 5. Grading criteria are given in the following:
[Grading Criteria Start]
1.0 - There are fatal flaws in the perturbed question that makes it entirely unacceptable.
2.0 - There are major flaws in the perturbed question that makes it unacceptable.
3.0 - Only some parts of the perturbation is acceptable. As a whole, the perturbed question is less acceptable.
4.0 - There are only minor flaws in the perturbed question. As a whole, the perturbed question is acceptable.
5.0 - The perturbation perfectly satisfies all the requirements and is entirely acceptable.
[Grading Criteria End]
[Original Question Start]:
%s
[Original Question End]
[Perturbed Question Start]:
%s
[Perturbed Question End]
You should grade the perturbation following these steps:
1. Recall the perturbation requirements and grading criteria, and read the original and the perturbed questions in detail.
2. For each of perturbation requirements, carefully judge its satisfaction degree of the perturbed question.
3. Based on step 1 and step 2, give a total grading score for the perturbed question.
4. Analyze strengths and weakness of the perturbed question from the view of perturbation requirements based on step 1,2,3.
Think carefully for a while, then propose your conclusion. Your output template is given as follows:
[Template Start]
{
"score": <numeric score from 1 to 5>,
"strength": <"xxx", strengths of the perturbation>,
"weakness": <"xxx", weaknesses of the perturbation>
}
[Template End]
Your conclusion:
'''%(prompt_x, prompt_y)
judge_ok = False
n_retry = 0
while n_retry < 3:
try:
judge.refresh()
response = judge.listen_and_response(judge_prompt)
judgement = re.findall(r'[{][^{]*[}]', response)[0]
judgement = json.loads(judgement)
assert(judgement.get('score', None) is not None)
assert(judgement.get('strength', None) is not None)
assert(judgement.get('weakness', None) is not None)
judge_ok = True
break
except:
n_retry += 1
logging.error('Judgement output error. Try again.')
if judge_ok is False:
logging.error('Judgement output fatal error. Exit.')
return -1
judgement['subject'] = row['subject']
judgement['id'] = row['id']
judgement['prompt_x'] = prompt_x
judgement['prompt_y'] = prompt_y
scores.append(judgement['score'])
results.append(judgement)
write_to_jsonl([results[-1]], log_save_path, 'a')
current_id += 1
scores = np.array(scores)
print(f"Knowledge Invariance Score = {np.mean(scores).round(4)} +- {np.std(scores).round(4)}")
return results
def edit_distance_analysis(left_data: List[dict], right_data: List[dict]):
left_df = pd.DataFrame(left_data)
right_df = pd.DataFrame(right_data)
distances = []
results = []
# merge two data
merge_df = pd.merge(left_df, right_df, on = ['subject', 'id'], how = 'inner')
for _, row in merge_df.iterrows():
prompt_x = row['prompt_x']
prompt_y = row['prompt_y']
edit_distance = Levenshtein.distance(prompt_x, prompt_y)
distances.append(edit_distance)
results.append({
'subject': row['subject'],
'id': row['id'],
'prompt_x': prompt_x,
'prompt_y': prompt_y,
'edit_distance': edit_distance
})
print(f'Edit distance = {np.mean(distances).round(2)} +- {np.std(distances).round(2)}')
return results
def question_sentence_analysis(data:List[dict]):
nlp = spacy.load('en_core_web_sm')
n_sents = 0
for item in tqdm(data):
doc = nlp(item["question_text"])
n_sents += len(list(doc.sents))
print(f"Avg # of sentences = {n_sents}/{len(data)} = {n_sents/len(data)}")
def transition_analysis(log_path_1:str, log_path_2:str,
subjects:list,
question_id_path:str=None):
raw_log1 = load_from_jsonl(log_path_1)
raw_log2 = load_from_jsonl(log_path_2)
score_xs = []
score_ys = []
rops = []
css = []
if question_id_path is not None:
df_qid = pd.read_csv(question_id_path)
all_log1 = []
all_log2 = []
for subject in subjects:
print(f'========{subject}========')
log1 = []
log2 = []
if question_id_path is None:
for item in raw_log1:
if item["subject"] == subject:
log1.append(item)
for item in raw_log2:
if item["subject"] == subject:
log2.append(item)
else:
for item in raw_log1:
if item["subject"] == subject and df_qid[(df_qid['subject'] == item['subject']) * (df_qid['id'] == item['id'])].shape[0] > 0:
log1.append(item)
for item in raw_log2:
if item["subject"] == subject and df_qid[(df_qid['subject'] == item['subject']) * (df_qid['id'] == item['id'])].shape[0] > 0:
log2.append(item)
all_log1 += log1
all_log2 += log2
result = joint_analysis(log1, log2)
score_xs.append(result['score_x'].mean())
score_ys.append(result['score_y'].mean())
rops.append(recall_score(result['score_x'], result['score_y']))
css.append(consist_score(result['score_x'], result['score_y']))
print('========All (micro)========')
joint_analysis(all_log1, all_log2)
print('========All (macro)========')
score_xs = np.array(score_xs)
score_ys = np.array(score_ys)
macro_pdr = np.mean(score_ys - score_xs)
macro_rop = np.mean(rops)
macro_css = np.mean(css)
print(f"macro_acc (before) = {np.mean(score_xs)}")
print(f"macro_acc (after) = {np.mean(score_ys)}")
print(f"macro_pdr = {macro_pdr}")
print(f"macro_rop = {macro_rop}")
print(f"macro_acc@consist = {macro_css}")
def knowledge_invariance_analysis(original_data_path, perturbed_data_path,
subjects: list,
referee: LargeLanguageModel,
llm_ki_to_save_path = None,
systematic_gap = 10):
raw_original_data = load_from_jsonl(original_data_path)
raw_perturbed_data = load_from_jsonl(perturbed_data_path)
raw_original_data = generate_prompt_for_data(raw_original_data)
raw_perturbed_data = generate_prompt_for_data(raw_perturbed_data)
original_data = []
perturbed_data = []
for elem in raw_original_data:
if elem['id'] % systematic_gap == 0:
original_data.append(elem)
for elem in raw_perturbed_data:
if elem['id'] % systematic_gap == 0:
perturbed_data.append(elem)
print(f"n_original_data (after sampling) = {len(original_data)}")
print(f"n_perturbed_data (after sampling) = {len(perturbed_data)}")
print("Get the intersaction of the two dataset for analysis.")
score_results = llm_based_knowledge_invariance_analysis(
left_data = original_data,
right_data = perturbed_data,
log_save_path = llm_ki_to_save_path,
judge = referee,
start_id = 0
)
edit_distance_results = edit_distance_analysis(
left_data = original_data,
right_data = perturbed_data
)
for subject in subjects:
subject_scores = []
subject_edit_distances = []
for elem in score_results:
if elem['subject'] == subject:
subject_scores.append(elem['score'])
for elem in edit_distance_results:
if elem['subject'] == subject:
subject_edit_distances.append(elem['edit_distance'])
print(f'======{subject}======')
print(f'Edit distance = {np.median(subject_edit_distances).round(2)} +- {np.std(subject_edit_distances).round(2)}')
print(f'Knowledge invariance score = {np.mean(subject_scores).round(2)} +- {np.std(subject_scores).round(2)}\n')
def response_pattern_analysis(log_path:str):
data = load_from_jsonl(log_path)
n_correct = 0
n_incorrect = 0
n_invalid = 0
n_multiple = 0
n_incorrect_single = 0
n_incorrect_multiple = 0
for elem in data:
if elem['model_output'] == elem['true_answer']:
n_correct += 1
continue
model_output = np.array(elem['model_output'])
true_answer = np.array(elem['true_answer'])
n_incorrect += 1
# Invalid response
if np.sum(model_output) == 0:
n_invalid += 1
# Select extra options
elif np.sum(model_output * true_answer) == np.sum(true_answer):
n_multiple += 1
elif np.sum(model_output) == 1:
n_incorrect_single += 1
else:
n_incorrect_multiple += 1
print('======Incorrect Response Analaysis======')
print(f"% of correct responses: {round(n_correct / len(data),4)}")
print(f"% of invalid responses: {round(n_invalid / len(data),4)}")
print(f"% of too many options: {round(n_multiple / len(data),4)}")
print(f"% of incorrect single option: {round(n_incorrect_single / len(data),4)}")
print(f"% of incorrect multiple options: {round(n_incorrect_multiple / len(data),4)}")