SilverSolver / ai_boundary_detection

AI-generated text boundary detection with RoFT
23 stars 0 forks source link

Low metrics for threshold classifier based on PHD #1

Open sweetdream779 opened 1 month ago

sweetdream779 commented 1 month ago

Hello! I am trying to apply your method(threshold classifier based on PHD or MLE) for Ghostbusters(https://github.com/vivek3141/ghostbuster) dataset. Here is my code, which is based on yours:

from pathlib import Path
import numpy as np
import torch
import json
from tqdm import tqdm
from typing import List, Dict
from scipy.spatial.distance import cdist
from transformers import RobertaTokenizer, RobertaModel
from sklearn.metrics import f1_score, accuracy_score

def read_jsonl(filepath):
    with open(str(filepath), "r", encoding="utf8") as f:
        num_lines = sum(1 for line in f)
    pbar = tqdm(desc=f"Processing {filepath}", total=num_lines)
    with open(str(filepath), "r", encoding="utf8") as f:
        for line in f:
            pbar.update(1)
            item = json.loads(line)
            yield item

class PHDEstimator:
    MINIMAL_STABLE_LENGTH = 55

    def __init__(self, model_name: str):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.tokenizer, self.model = self.load_model(model_name)

    @staticmethod
    def prim_tree(adj_matrix, power=1.0):
        infty = np.max(adj_matrix) + 1.0

        dst = np.ones(adj_matrix.shape[0]) * infty
        visited = np.zeros(adj_matrix.shape[0], dtype=bool)
        ancestor = -np.ones(adj_matrix.shape[0], dtype=int)

        v, s = 0, 0.0
        for i in range(adj_matrix.shape[0] - 1):
            visited[v] = 1
            ancestor[dst > adj_matrix[v]] = v
            dst = np.minimum(dst, adj_matrix[v])
            dst[visited] = infty

            v = np.argmin(dst)

            s += adj_matrix[v][ancestor[v]] ** power
        return s.item()

    @staticmethod
    def sample_W(W, nSamples):
        n = W.shape[0]
        random_indices = np.random.choice(n, size=nSamples, replace=False)
        return W[random_indices]

    @staticmethod
    def calculate_ph_dim(W, min_points=40, max_points=510, point_jump=20, alpha=1.0, restarts=3, resamples=7):
        # Computations for shorter texts are unstable and we want to avoid them
        if W.shape[0] < PHDEstimator.MINIMAL_STABLE_LENGTH:
            return np.nan

        m_candidates = []
        for i in range(restarts):
            test_n = range(min_points, max_points, point_jump)
            lengths = []

            for n in test_n:
                reruns = np.ones(resamples)
                for i in range(resamples):
                    tmp = PHDEstimator.sample_W(W, n)
                    reruns[i] = PHDEstimator.prim_tree(cdist(tmp, tmp), power=alpha)
                lengths.append(np.median(reruns))

            lengths = np.array(lengths)
            x = np.log(np.array(list(test_n)))
            y = np.log(lengths)

            N = len(x)
            m_candidates.append((N * (x * y).sum() - x.sum() * y.sum()) / (N * (x ** 2).sum() - x.sum() ** 2))
        m = np.mean(m_candidates)
        return alpha / (1 - m)

    @staticmethod
    def get_phd_single(mx_points, embeddings, alpha=1.0):
        mn_points = 40
        step = ( mx_points - mn_points ) // 7

        return PHDEstimator.calculate_ph_dim(
            embeddings,  min_points=mn_points, max_points=mx_points, point_jump=step, alpha=alpha)

    def get_embeddings(self, text):
        inputs = self.tokenizer(text, truncation=True, max_length=512, return_tensors="pt")
        for k in inputs:
            inputs[k] = inputs[k].to(self.device)
        with torch.no_grad():
            outp = self.model(**inputs)

        mx_points = inputs['input_ids'].shape[1] - 2
        return mx_points, outp[0][0]

    def get_phd_for_text(self, text: str) -> float:
        n_tokens, embeddings = self.get_embeddings(text)
        np_embeddings = embeddings.cpu().numpy()[1:-1]
        phd_dim = self.get_phd_single(n_tokens, np_embeddings)
        return phd_dim

    def load_model(self, model_name):
        tokenizer = RobertaTokenizer.from_pretrained(model_name)
        model = RobertaModel.from_pretrained(model_name, add_pooling_layer=False).eval()
        model.to(self.device)
        return tokenizer, model

def calculate_phd_for_file(input_file: Path, output_file: Path, model_name: str):
    if output_file.exists():
        return
    phd_estimator = PHDEstimator(model_name)
    out_samples = []
    for item in read_jsonl(input_file):
        text: str = item['text']
        label: str = item['label']  # human or ai
        phd_dim = phd_estimator.get_phd_for_text(text)
        out_samples.append(dict(phd=phd_dim, label=label, text=text))

    with open(str(output_file), 'w', encoding="utf8") as out_file:
        for sample in out_samples:
            out_file.write(json.dumps(sample, ensure_ascii=False) + "\n")

class MetricsCalculator:
    def __init__(self):
        self.thresholds = np.arange(6.0, 15.0, 0.1)
        self.target_fpr = 0.01

    @staticmethod
    def roc_curve(y_true, y_prob, thresholds):

        fpr, tpr = [], []
        for threshold in thresholds:
            # 0 is human, 1 is AI
            # pred_label = "human" if intrinsic_dim > threshold else "ai"
            y_pred = np.where(y_prob >= threshold, 0, 1)

            fp = np.sum((y_pred == 1) & (y_true == 0))
            tp = np.sum((y_pred == 1) & (y_true == 1))

            fn = np.sum((y_pred == 0) & (y_true == 1))
            tn = np.sum((y_pred == 0) & (y_true == 0))

            fpr.append(fp / (fp + tn))
            tpr.append(tp / (tp + fn))

        return fpr, tpr

    @staticmethod
    def get_tpr_target(fpr, tpr, target_fpr):
        indices = None
        for i in range(len(fpr)):
            if fpr[i] >= target_fpr:
                if i == 0:
                    indices = [i]
                else:
                    indices = [i - 1, i]
                break

        if indices is None:
            print(f"TPR at {target_fpr * 100}% FPR: {tpr[-1]}. FPR is too high.")
            return tpr[-1]
        else:
            tpr_values = [tpr[i] for i in indices]
            return np.mean(tpr_values)

    def evaluate_by_dim(self, dimentions: Dict[str, List[float]]):
        gt_labels, pred_dims = [], []
        cls_accuracy = {gt_label: [0.] * len(self.thresholds) for gt_label in dimentions.keys()}
        for gt_label in dimentions.keys():
            y_true = [1] * len(dimentions[gt_label]) if gt_label == "ai" else [0] * len(dimentions[gt_label])
            y_pred = dimentions[gt_label]
            pred_dims.extend(y_pred)
            gt_labels.extend(y_true)

            for th_i, threshold in enumerate(self.thresholds):
                cls_accuracy[gt_label][th_i] = accuracy_score(
                    y_true=np.array(y_true),
                    y_pred=np.where(np.array(y_pred) >= threshold, 0, 1),
                )

        # 0 is human, 1 is AI
        f1_by_thresh, acc_by_thresh = [], []
        for threshold in self.thresholds:
            pred_labels = np.where(np.array(pred_dims) >= threshold, 0, 1)
            f1 = f1_score(np.array(gt_labels), pred_labels)
            acc_score = accuracy_score(np.array(gt_labels), pred_labels)
            f1_by_thresh.append(f1)
            acc_by_thresh.append(acc_score)

        fpr, tpr = MetricsCalculator.roc_curve(np.array(gt_labels), np.array(pred_dims), self.thresholds)
        tpr_at_fpr = MetricsCalculator.get_tpr_target(fpr, tpr, self.target_fpr)
        return np.array(f1_by_thresh), np.array(acc_by_thresh), cls_accuracy, tpr_at_fpr

    def calculate_metrics(self, file_path: Path, threshold: float):
        if not file_path.exists():
            raise FileNotFoundError(f"File {file_path} not found")

        phd_dims: Dict[str, List[float]] = {"ai": [], "human": []}
        for item in read_jsonl(str(file_path)):
            if item['label'] == "human":
                phd_dims["human"].append(float(item['phd']))
            elif item['label'] == "ai":
                phd_dims["ai"].append(float(item['phd']))

        f1_by_thresh, acc_by_thresh, cls_accuracy, tpr_at_fpr = self.evaluate_by_dim(phd_dims)
        thresholds = np.round(self.thresholds, 1).tolist()
        thresh_ind = thresholds.index(threshold)
        return (
            f1_by_thresh[thresh_ind],
            acc_by_thresh[thresh_ind],
            cls_accuracy["ai"][thresh_ind],
            cls_accuracy["human"][thresh_ind],
            tpr_at_fpr
        )

if __name__ == '__main__':
    DATASET_PATH = Path('/data/Ghostbuster')
    test_file, out_test_file = DATASET_PATH / 'test.jsonl', DATASET_PATH / 'test_phd.jsonl'
    calculate_phd_for_file(test_file, out_test_file, model_name="FacebookAI/roberta-base")

    mc = MetricsCalculator()
    f1, acc, ai_acc, human_acc, tpr_fpr = mc.calculate_metrics(out_test_file, threshold=13.5)
    print(f"F1: {f1:.3f}\nAcc: {acc:.3f}\nAI Acc: {ai_acc:.3f}\nHuman Acc: {human_acc:.3f}")
    print(f"TPR at {mc.target_fpr * 100}% FPR: {tpr_fpr * 100:.1f}%")

And I get very low performance for 900 randomly taken samples (450 for "ai" category and 450 for "human" category): F1: 0.667 Accuracy: 0.500 AI Accuracy: 1.000 Human Accuracy: 0.000 TPR at 1.0% FPR: 0.3%

Threshold(13.5) was selected in such way: I took another 900 samples from the dataset and got the threshold from the [6, 15] range which gave the best F1 score.

Even if I take lower threshold based on your paper (e.g. 9.0), I still get low metrics: F1: 0.399 Accuracy: 0.536 AI Accuracy: 0.309 Human Accuracy: 0.762 TPR at 1.0% FPR: 0.3%

Is there something what I do wrong?

SilverSolver commented 1 month ago

Hello! Can you, please, send me a link to the particular data you use? I tried to find it in the "ghostbusters" repo, but wasn't sure what files did you mean. Maybe you mean something from https://github.com/vivek3141/ghostbuster-data/ ?

Also, I can say from advance that if they used something generated by gpt-4 or gpt-4-o, it will likely NOT be detected by our method, even if applied correctly. :( We didn't manage to do anything about this. This seem to be the property of these new models. If the data was generated by gpt-3.5 or earlier, I can try to see what's exactly wrong.

sweetdream779 commented 1 month ago

As I understood from their paper, they used gpt-3.5-turbo model for generations. I uploaded the data that I used to google drive. Valid data link and test data link.

SilverSolver commented 1 day ago

As I understood from their paper, they used gpt-3.5-turbo model for generations. I uploaded the data that I used to google drive. Valid data link and test data link.

Thank you for a data links and sorry for a late response. I forgot about this issue due to a lot of work and remembered it only recently.

I made a test with this data and your code. I didn't find the problems in your code and manually checked phd on the examples in this dataset by myself, too. The difference between PH dimensions on human and ai subsets of the data are indeed too small to make a reasonable threshold-based classifier.

So it seems like you did everything correct, our threshold-based classifier simply doesn't work on this data.