Use LogBert to do log anomaly detection

1. General Knowledge about Bert

Reference:

  1. Fine-tuning BERT for Text classification in Kaggle
  2. Natural Language Processing IN2361 by Prof. Dr. Georg Groh

BERT (Bidirectional Encoder Representations from Transformers)

Language modeling is a common method of pretraining on unlabeled text (self supervised learning). Most of the language models learned by iteratively predicting next word in a sequence auto regressively across enormous data sets of text like wikepedia. This can be left to right, right to left or bi-directional.

There are two strategies of applying pretrained language representations to downstream tasks:

  1. Feature based approach
  2. Fine tuning approach

The feauture based approach, such as ELMo uses task specific architectures that include the pretrained representations as additional features.

The fine tuning approach, such as OpenAI GPT, introduces minimal task specific parameters, and is trained on the downstream task by fine tuning all the pretrained parameters.

BERT model can be used for both the approaches. BERT reformulates the language modeling pretrained task of iteratively predicting the next word in sequence to instead incorporate bidirectional context and predict mask of intermediate tokens of the sequence and predict the mask token. BERT presented a new self supervised learning task for pretaining transformers in order to fine tune them for different tasks. They major difference between BERT and prior methods of pretraining transformer models is using the bidirectional context of language modeling. Most of the models either move left to right or right to left to predict next word in sequence, where BERT tries to learn intermediate tokens (by MASK), making the name Bidirectional Encoder.

BERT uses Masked language model and also use “Next sentence prediction” task.

BERT uses 3 embeddings to compute the input representations. They are token embeddings, segment embeddings and position embeddings.

BERT Transformer will preserve the length of the (dimention of the) input. The final output will take this vector and pass these to seperate tasks (classification, in this case).

Bert Training Process with masked language model

BERT for Classification [LogBert didn’t use this strategy]

BERT consists of stacked encoder layers. Just like the input of encoder of the transformer model, BERT model takes the sequence of numeric representation of the tokens as input. For classification tasks, we must prepend the special [CLS] token (classification token)to the beginning of every sentence.

Encoder block of transformer outputs a vector with same length as of input. First position of the vector, corresponding to the [CLS] token, can now be used as the input for a classifier.

2. How to use Masked Langauge Model to do Anomaly Detection [LogBert method]

[Guo, Haixuan, Shuhan Yuan, and Xintao Wu. “Logbert: Log anomaly detection via bert.” 2021 international joint conference on neural networks (IJCNN). IEEE, 2021.]

General Idea:

3. LogBert in LogAI - the whole pipeline

3.1 Load Opensearch indices, preprocessing and save as pandas “feather” format

the util function that load data in range:

def load_data_range(index_name, start, end, client=None):
    if client == None:
        client = OpenSearch(
            hosts=['https://10.11.40.114:9200'],
            use_ssl=True,
            verify_certs=False,
            http_auth=('admin', 'admin'),
            timeout=60
        )
    max_index = client.indices.stats(index=index_name)['_all']['primaries']['docs']['count'] - 1
    start = min(start, max_index)
    end = min(end, max_index)

    print("Loading data from index " + str(start) + " until index " + str(end) + " size is " + str(end - start + 1))
    query = {
        "from": start,
        "size": end - start + 1,
        "query": {
            "match_all": {}
        },
    }

    search_results = client.search(index=index_name, body=query)

# Extract and convert the documents to a Pandas DataFrame
    documents = search_results["hits"]["hits"]
    data = [doc['_source'] for doc in documents]
    df = pd.DataFrame(data)

    return df

init opensearch client, read the index in pieces and concat them together and save them:

import warnings
from collections.abc import MutableMapping
warnings.filterwarnings('ignore')
import pandas as pd
from opensearchpy import OpenSearch
import math
import pickle
import utils

client = OpenSearch(
    hosts=['https://10.11.40.114:9200'],
    use_ssl=True,
    verify_certs=False,
    http_auth=('admin', 'admin'),
    timeout=60
)

index_name = "2023-09-05_system_v4"
size = client.indices.stats(index=index_name)['_all']['primaries']['docs']['count']
iteration_size = 500000
dataset_iterations = math.ceil((size / iteration_size))
print("df size: " + str(size) + " iterations: " + str(dataset_iterations))

df_save = pd.DataFrame()
for i in range(0,  dataset_iterations):
    print("Iteration " +  str(i + 1) + " of " + str(dataset_iterations))
    df = utils.load_data_range(index_name, (iteration_size * i), ((iteration_size * (i + 1)) - 1), client)
    df_save = pd.concat([df_save, df])

df_save = df_save.reset_index()
df_save.to_feather('filename.feather')

some preprocessing on the raw opensearch data:

import pandas as pd
## read saved feather data
training_df1 = pd.read_feather('filename1.feather')
training_df2 = pd.read_feather('filename2.feather')
testing_df = pd.read_feather('filename3.feather')

## rename logline column from "desciption" to "logline", because logai uses constant## "logline" as the default column name of logline.

testing_df.rename(columns={'description':'logline'},inplace=True)
training_df1.rename(columns={'description':'logline'},inplace=True)
training_df2.rename(columns={'description':'logline'},inplace=True)

## set span_id
testing_df['span_id'] = testing_df['index']
training_df1['span_id'] = training_df1['index']
training_df2['span_id'] = training_df2['index']

## label anomalies, note that the training process is purely unsupervised and## doesn't require labels, but it seems necessary to assign labels column to the## dataframe. Also it helps visualize the result later if you want to see the## loglines that you are interested in (just lable those interesting log lines with 1)## In normal cases, just set the whole columns 0 values
training_df1["labels"] = 0
training_df2["labels"] = 0
testing_df["labels"] = 0
for index, row in testing_df.iterrows():
    if "ABEND" in row['logline']:
        testing_df.at[index, 'labels'] = 1
    if "ERROR" in row['logline']:
        testing_df.at[index, 'labels'] = 1
    if "WARN" in row['logline']:
        testing_df.at[index, 'labels'] = 1

## concat 2 training dataframe as one and reset index, span_id
training_df = pd.concat([training_df1,training_df2]).reset_index()
training_df['index'] = training_df.index
training_df['span_id'] = training_df['index']
training_df.drop(['level_0'],axis=1, inplace=True)

testing_df['index'] = testing_df.index
testing_df['span_id'] = testing_df['index']

## save them again
testing_df.to_pickle('testing_df.pkl')
training_df.to_pickle('training_df.pkl')

3.2 Custom preprocessor

read the saved pkl from the last step:

testing_df = pd.read_pickle('testing_df.pkl')
training_df = pd.read_pickle('training_df.pkl')

To encode special values (like IP, HEX value, INT value, etc.) into special tokens and preprocess , we need to override 2 methods from OpenSetPreprocessor, specify the preprocessor_config, and set special tokens in the tokenizer.

3.2.1 Override 2 methods from OpenSetPreprocessor

from logai.preprocess.openset_preprocessor import OpenSetPreprocessor

class SDMLOG_Preprocessor(OpenSetPreprocessor):
    """
    Custom preprocessor for Open log dataset BGL.
    """

    def __init__(self, config: PreprocessorConfig):
        super().__init__(config)

    def _get_ids(self, logrecord: LogRecordObject) -> pd.Series:
        """Get ids of loglines.
        :param logrecord:  logrecord object containing the BGL data.
        :return: pd.Series object containing the ids of the loglines.
        """
        ids = logrecord.span_id[constants.SPAN_ID]
        return ids

    def _get_labels(self, logrecord: LogRecordObject):
        """Get anomaly detection labels of loglines.
        :param logrecord: logrecord object containing the BGL data.
        :return:pd.Series object containing the labels of the loglines.
        """
        return logrecord.labels[constants.LABELS]

preprocessor = SDMLOG_Preprocessor(config.preprocessor_config)
preprocessed_filepath = os.path.join(config.output_dir, 'filename.csv')
logrecord = preprocessor.clean_log(logrecord)
logrecord.save_to_csv(preprocessed_filepath)

preprocessor_config:
  custom_delimiters_regex:
              [':', ',', '=', '\\t']
  custom_replace_list: [
              ['(0x)[0-9a-zA-Z]+', ' HEX '],
              ['((?![A-Za-z]{8}|\\d{8})[A-Za-z\\d]{8})', ' ALPHANUM '],
              ['\\d+.\\d+.\\d+.\\d+', ' IP '],
              ['\\d+', ' INT ']
          ]

3.2.3 set special tokens in the tokenizer config, which should be the same list of “custom_replace_list” in the preprocessor_config

log_vectorizer_config:
  algo_name: "logbert"
  algo_param:
    model_name: "bert-base-cased"
    max_token_len: 120
    custom_tokens: ["ALPHANUM", "IP", "HEX", "INT"]
    output_dir: "outputdir"
    tokenizer_dirname: "logbert_tokTenizer"

3.3 Split the training, validation and testing dataset

use sklearn.model_selection train_test_split:

here I split the logrecord_training into 99% training data, and 1% evaluation data, for the sake of adequate amount of training data and quick evaluation

from sklearn.model_selection import train_test_split

train_filepath = os.path.join(config.output_dir, 'train.csv')
dev_filepath = os.path.join(config.output_dir, 'dev.csv')
test_filepath = os.path.join(config.output_dir, 'test.csv')

train_ids, dev_ids, train_labels, dev_labels = train_test_split(
    logrecord_training.span_id[constants.SPAN_ID],
    logrecord_training.labels[constants.LABELS],
    test_size=0.01,
    shuffle=False,
    stratify=None,
)

indices_train = list(
        logrecord_training.span_id.loc[
            logrecord_training.span_id[constants.SPAN_ID].isin(train_ids)
        ].index
    )
indices_dev = list(
    logrecord_training.span_id.loc[logrecord_training.span_id[constants.SPAN_ID].isin(dev_ids)].index
)
indices_test = list(
    logrecord_testing.span_id.index
)

train_data = logrecord_training.select_by_index(indices_train)
dev_data = logrecord_training.select_by_index(indices_dev)
test_data = logrecord_testing.select_by_index(indices_test)
#train_data.save_to_csv(train_filepath)#dev_data.save_to_csv(dev_filepath)#test_data.save_to_csv(test_filepath)
print ('Train/Dev/Test Anomalous', len(train_data.labels[train_data.labels[constants.LABELS]==1]),
                                   len(dev_data.labels[dev_data.labels[constants.LABELS]==1]),
                                   len(test_data.labels[test_data.labels[constants.LABELS]==1]))
print ('Train/Dev/Test Normal', len(train_data.labels[train_data.labels[constants.LABELS]==0]),
                                   len(dev_data.labels[dev_data.labels[constants.LABELS]==0]),
                                   len(test_data.labels[test_data.labels[constants.LABELS]==0]))

3.4 Train Bert vectorizer

vectorizer = LogVectorizer(config.log_vectorizer_config)
vectorizer.fit(train_data)
train_features = vectorizer.transform(train_data)
dev_features = vectorizer.transform(dev_data)
test_features = vectorizer.transform(test_data)

3.5 Bert model training

related config:

  nn_anomaly_detection_config:
      algo_name: "logbert"
      algo_params:
          model_name: "bert-base-cased"
          learning_rate: 0.00005
          num_train_epochs: 1
          per_device_train_batch_size: 32
          max_token_len: 120
          save_steps: 1000
          eval_steps: 1000
          tokenizer_dirpath: "bert-base-cased_tokenizer"
          output_dir: "outputdir"
          pretrain_from_scratch:False
anomaly_detector = NNAnomalyDetector(config=config.nn_anomaly_detection_config)
anomaly_detector.fit(train_features, dev_features)

3.6 Bert model testing

related config:

  nn_anomaly_detection_config:
      algo_name: "logbert"
      algo_params:
          model_name: "bert-base-cased"
          per_device_eval_batch_size: 32
          eval_accumulation_steps: 284
          mask_ngram: 8
          tokenizer_dirpath: "bert-base-cased_tokenizer"
          output_dir: "outputdir"
          num_eval_shards: 100
anomaly_detector = NNAnomalyDetector(config=config.nn_anomaly_detection_config)
predict_results = anomaly_detector.predict(test_features_piece)
print (predict_results)

3.7 Output Intepretation

take partitioned logs as example, here is the data flow:

↓ Partitioner based on span_id, here span_id = seconds

↓ Train dev test split

Train/Dev/Test Anomalous: 0 0 71

Train/Dev/Test Normal: 181 20 51

↓ Turn pandas df into LogRecordObject

↓ LogVectorizer

↓ NNAnomalyDetector.predict _generate_masked_input

↓ NNAnomalyDetector.predict predict

↓ anomaly_results.groupby('indices').mean()

3.8 The code modification for the use of GPU acceleration and no partitioning

import torch

def _initialize_trainer(self, model, train_dataset, dev_dataset):
        """initializing huggingface trainer object for logbert"""
        training_args = TrainingArguments(
            self.model_dirpath,
            evaluation_strategy=self.config.evaluation_strategy,
            num_train_epochs=self.config.num_train_epochs,
            learning_rate=self.config.learning_rate,
            logging_steps=self.config.logging_steps,
            per_device_train_batch_size=self.config.per_device_train_batch_size,
            per_device_eval_batch_size=self.config.per_device_eval_batch_size,
            weight_decay=self.config.weight_decay,
            save_steps=self.config.save_steps,
            eval_steps=self.config.eval_steps,
            resume_from_checkpoint=self.config.resume_from_checkpoint,
        )

        data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer,
            mlm_probability=self.config.mlm_probability,
            pad_to_multiple_of=self.config.max_token_len,
        )
        ## modified by huiyu
        if torch.cuda.is_available():
            device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
            self.trainer = Trainer(
                model=model.to(device),
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=dev_dataset,
                data_collator=data_collator,
            )
        else:
            self.trainer = Trainer(
                    model=model,
                    args=training_args,
                    train_dataset=train_dataset,
                    eval_dataset=dev_dataset,
                    data_collator=data_collator,
            )

def _generate_masked_input(self, examples, indices):
        input_ids = examples["input_ids"][0]
        attention_masks = examples["attention_mask"][0]
        token_type_ids = examples["token_type_ids"][0]
        index = indices[0]
        input_ids = np.array(input_ids)

        sliding_window_diag = np.eye(input_ids.shape[0])

        if sliding_window_diag.shape[0] == 0:
            return examples
        mask = 1 - np.isin(input_ids, self.special_token_ids).astype(np.int64)
        sliding_window_diag = sliding_window_diag * mask
        sliding_window_diag = sliding_window_diag[
            ~np.all(sliding_window_diag == 0, axis=1)
        ]
        ## modified by huiyu
        if sliding_window_diag.shape[0] == 0:
            output = {}
            output["input_ids"] = np.array(examples["input_ids"])
            output["attention_mask"] = np.array(examples["attention_mask"])
            output["token_type_ids"] = np.array(examples['token_type_ids'])
            output["labels"] = -100 * np.ones(shape=(output["input_ids"].shape[0],output["input_ids"].shape[1]))
            output["indices"] = np.array([index]).astype(
                np.int64
            )
            return output
        ##
        num_sections = int(sliding_window_diag.shape[0] / self.config.mask_ngram)
        if num_sections <= 0:
            num_sections = sliding_window_diag.shape[0]
        sliding_window_diag = np.array_split(sliding_window_diag, num_sections, axis=0)
        diag = np.array([np.sum(di, axis=0) for di in sliding_window_diag])

        input_rpt = np.tile(input_ids, (diag.shape[0], 1))
        labels = np.copy(input_rpt)
        input_ids_masked = (input_rpt * (1 - diag) + diag * self.mask_id).astype(
            np.int64
        )
        attention_masks = np.tile(
            np.array(attention_masks), (input_ids_masked.shape[0], 1)
        )
        token_type_ids = np.tile(
            np.array(token_type_ids), (input_ids_masked.shape[0], 1)
        )
        labels[
            input_ids_masked != self.mask_id
        ] = -100  # Need masked LM loss only for tokens with mask_id
        examples = {}
        examples["input_ids"] = input_ids_masked
        examples["attention_mask"] = attention_masks
        examples["token_type_ids"] = token_type_ids
        examples["labels"] = labels
        examples["indices"] = np.array([index] * input_ids_masked.shape[0]).astype(
            np.int64
        )
        return examples