Skip to content

HowTo Create your own SequenceDetector

landauermax edited this page Aug 7, 2023 · 4 revisions

This wiki details the development of a custom detector for the aminer. In particular, we will create a detector that learns sequences of values occurring in particular positions (token) of log events during the training phase. Then, the detector recognizes all new value sequences in a subsequent detection phase. This technique is based on the STIDE (sequence time-delay embedding) as proposed by Forrest et al [1].

Sample scenario

Let's start with an example that we can also use for testing our detector. Create and open a new file with the commands

root@user-1:/home/ubuntu# touch access.log
root@user-1:/home/ubuntu# vim access.log

And copy the following Apache Access log lines into the file:

192.168.10.190 - - [29/Feb/2020:13:58:32 +0000] "GET /services/portal/ HTTP/1.1" 200 7499 "-" "-"
192.168.10.190 - - [29/Feb/2020:13:58:47 +0000] "GET /nag/ HTTP/1.1" 200 8891 "-" "-"
192.168.10.190 - - [29/Feb/2020:13:58:55 +0000] "POST /nag/task/save.php HTTP/1.1" 200 5220 "-" "-"
192.168.10.190 - - [29/Feb/2020:14:00:12 +0000] "GET /services/portal/ HTTP/1.1" 200 1211 "-" "-"
192.168.10.190 - - [29/Feb/2020:14:02:08 +0000] "GET /kronolith/ HTTP/1.1" 200 24703 "-" "-"
192.168.10.190 - - [29/Feb/2020:14:05:11 +0000] "GET /services/portal/ HTTP/1.1" 200 3456 "-" "-"
192.168.10.190 - - [29/Feb/2020:14:05:25 +0000] "GET /nag/ HTTP/1.1" 200 1131 "-" "-"
192.168.10.190 - - [29/Feb/2020:14:05:57 +0000] "POST /nag/task/save.php HTTP/1.1" 200 8754 "-" "-"

Apache Access logs document which pages were requested from a web server. In this case, the lines resemble the logs of a server that hosts a mail platform that comprises a calendar (kronolith) as well as a task list (nag). Usually, users move from one page to another in particular paths, rather than at random. This is visible in the logs, for example, the user is first at the main page (/services/portal/), then moves to the page for managing the tasks (/nag/), and finally creates a new task (/nag/task/save.php). It is unlikely that a new task is created right after the user is on any page other than the task overview, e.g., a user that is currently writing a mail is unlikely to create a new task from that view (assuming that this is not supported by the platform).

Notice that in the example above, the user repeats the page sequence (1) /services/portal/, (2) /nag/, (3) /nag/task/save.php twice. It makes sense to split the overall sequence into smaller chunks to avoid overfitting during learning, i.e., to avoid creating overly specific sequences and instead learn more generally valid sequences, such as moving from the portal to the task list and create a new task. For a subsequence length of 3, the following subsequences are derived from the example above: (1) ["/services/portal/", "/nag/", "/nag/task/save.php"], (2) ["/nag/", "/nag/task/save.php", "/services/portal/"], (3) ["/nag/task/save.php", "/services/portal/", "/kronolith/"], (4) ["/services/portal/", "/kronolith/", "/services/portal/"], (5) ["/kronolith/", "/services/portal/", "/nag/"]. Note that subsequences that repeat are only stored a single time. Assuming that all these subsequences were observed during normal behavior, all newly occurring subsequences can be flagged as anomalous and potentially malicious. Obviously, the training data should be much bigger to actually reflect all possible movements between pages that users usually perform.

Installing and running aminer from sources

Let's realize this detection method as an aminer detector. First, make sure that you are familiar with the aminer installation as outlined in the Getting Started. For this tutorial, we assume that aminer V2.2.0 is used:

root@user-1:/home/ubuntu/git# aminer -v
   (Austrian Institute of Technology)
       (https://aecid.ait.ac.at)
            Version: 2.2.0            

Get the aminer installation script and set the permissions using the commands

root@user-1:/home/ubuntu# wget https://raw.githubusercontent.com/ait-aecid/logdata-anomaly-miner/main/scripts/aminer_install.sh
root@user-1:/home/ubuntu# chmod +x aminer_install.sh

One possibility to extend the aminer is to install the aminer as outlined in the Getting started and work directly in the sources located at /usr/lib/logdata-anomaly-miner/. However, this is not recommended. Instead, execute the installation script but pass a parameter for the source directory path as follows:

root@user-1:/home/ubuntu# ./aminer_install.sh "https://github.com/ait-aecid/logdata-anomaly-miner.git" "/home/ubuntu/aminer"

You will now find the aminer sources in /home/ubuntu/aminer/source/root/usr/lib/logdata-anomaly-miner. Since this method of installing the AMiner is mainly intended for developers, the development branch is installed by default. Change the branch in the aminer_install.sh script if you want to check out the main branch instead. We will now use ansible version 2.5.1 to install the aminer. Every time you change something in these sources, reinstall the aminer with the following commands:

root@user-1:/home/ubuntu# sudo ansible-playbook /home/ubuntu/aminer/playbook.yml

This will install the aminer from the sources located in /home/ubuntu/aminer. Now make sure that the aminer runs as expected. We will use the same parsing model as in the Getting Started:

root@user-1:/home/ubuntu# sudo ln -s /etc/aminer/conf-available/generic/ApacheAccessModel.py /etc/aminer/conf-enabled/

And we also use the same configuration as in the Getting Started, with the addition that we set json: True in the EventHandlers section. Create and edit the file /etc/aminer/config.yml and insert the following code:

LearnMode: True

LogResourceList:
        - 'file:///home/ubuntu/access.log'

Parser:
        - id: 'START'
          start: True
          type: ApacheAccessModel
          name: 'apache'

Input:
        timestamp_paths: "/accesslog/time"

Analysis:
        - type: "NewMatchPathValueDetector"
          paths: ["/accesslog/status"]
          output_logline: True

EventHandlers:
        - id: "stpe"
          json: True
          type: "StreamPrinterEventHandler"

Now run the aminer with flags -C to clear any previously learned artifacts and -f to start from the beginning of the file to get the following results:

root@user-1:/home/ubuntu/aminer# aminer -C -f
{
  "AnalysisComponent": {
    "AnalysisComponentIdentifier": 1,
    "AnalysisComponentType": "NewMatchPathDetector",
    "AnalysisComponentName": "DefaultNewMatchPathDetector",
    "Message": "New path(es) detected",
    "PersistenceFileName": "Default",
    "TrainingMode": true,
    "AffectedLogAtomPaths": [
      "/accesslog",
      "/accesslog/host",
      "/accesslog/sp0",
      "/accesslog/ident",
      "/accesslog/sp1",
      "/accesslog/user",
      "/accesslog/sp2",
      "/accesslog/time",
      "/accesslog/sp3",
      "/accesslog/fm/request",
      "/accesslog/sp6",
      "/accesslog/status",
      "/accesslog/sp7",
      "/accesslog/size",
      "/accesslog/combined",
      "/accesslog/combined/combined",
      "/accesslog/combined/combined/sp9",
      "/accesslog/combined/combined/referer",
      "/accesslog/combined/combined/sp10",
      "/accesslog/combined/combined/user_agent",
      "/accesslog/combined/combined/sp11",
      "/accesslog/fm/request/method",
      "/accesslog/fm/request/sp5",
      "/accesslog/fm/request/request",
      "/accesslog/fm/request/sp6",
      "/accesslog/fm/request/version"
    ]
  },
  "LogData": {
    "RawLogData": [
      "192.168.10.190 - - [29/Feb/2020:13:58:32 +0000] \"GET /services/portal/ HTTP/1.1\" 200 7499 \"-\" \"-\""
    ],
    "Timestamps": [
      1582984712
    ],
    "DetectionTimestamp": 1669839365.35,
    "LogLinesCount": 1,
    "AnnotatedMatchElement": {
      "/accesslog": "192.168.10.190 - - [29/Feb/2020:13:58:32 +0000] \"GET /services/portal/ HTTP/1.1\" 200 7499 \"-\" \"-\"",
      "/accesslog/host": "192.168.10.190",
      "/accesslog/sp0": " ",
      "/accesslog/ident": "-",
      "/accesslog/sp1": " ",
      "/accesslog/user": "-",
      "/accesslog/sp2": " ",
      "/accesslog/time": "1582984712",
      "/accesslog/sp3": "] \"",
      "/accesslog/fm/request": "GET /services/portal/ HTTP/1.1",
      "/accesslog/sp6": "\" ",
      "/accesslog/status": "200",
      "/accesslog/sp7": " ",
      "/accesslog/size": "7499",
      "/accesslog/combined": " \"-\" \"-\"",
      "/accesslog/combined/combined": " \"-\" \"-\"",
      "/accesslog/combined/combined/sp9": " \"",
      "/accesslog/combined/combined/referer": "-",
      "/accesslog/combined/combined/sp10": "\" \"",
      "/accesslog/combined/combined/user_agent": "-",
      "/accesslog/combined/combined/sp11": "\"",
      "/accesslog/fm/request/method": "0",
      "/accesslog/fm/request/sp5": " ",
      "/accesslog/fm/request/request": "/services/portal/",
      "/accesslog/fm/request/sp6": " ",
      "/accesslog/fm/request/version": "HTTP/1.1"
    }
  }
}
{
  "AnalysisComponent": {
    "AnalysisComponentIdentifier": 2,
    "AnalysisComponentType": "NewMatchPathValueDetector",
    "AnalysisComponentName": "NewMatchPathValueDetector2",
    "Message": "New value(s) detected",
    "PersistenceFileName": "Default",
    "TrainingMode": true,
    "AffectedLogAtomPaths": [
      "/accesslog/status"
    ],
    "AffectedLogAtomValues": [
      "200"
    ]
  },
  "LogData": {
    "RawLogData": [
      "192.168.10.190 - - [29/Feb/2020:13:58:32 +0000] \"GET /services/portal/ HTTP/1.1\" 200 7499 \"-\" \"-\""
    ],
    "Timestamps": [
      1582984712
    ],
    "DetectionTimestamp": 1669839365.35,
    "LogLinesCount": 1,
    "AnnotatedMatchElement": {
      "/accesslog": "192.168.10.190 - - [29/Feb/2020:13:58:32 +0000] \"GET /services/portal/ HTTP/1.1\" 200 7499 \"-\" \"-\"",
      "/accesslog/host": "192.168.10.190",
      "/accesslog/sp0": " ",
      "/accesslog/ident": "-",
      "/accesslog/sp1": " ",
      "/accesslog/user": "-",
      "/accesslog/sp2": " ",
      "/accesslog/time": "1582984712",
      "/accesslog/sp3": "] \"",
      "/accesslog/fm/request": "GET /services/portal/ HTTP/1.1",
      "/accesslog/sp6": "\" ",
      "/accesslog/status": "200",
      "/accesslog/sp7": " ",
      "/accesslog/size": "7499",
      "/accesslog/combined": " \"-\" \"-\"",
      "/accesslog/combined/combined": " \"-\" \"-\"",
      "/accesslog/combined/combined/sp9": " \"",
      "/accesslog/combined/combined/referer": "-",
      "/accesslog/combined/combined/sp10": "\" \"",
      "/accesslog/combined/combined/user_agent": "-",
      "/accesslog/combined/combined/sp11": "\"",
      "/accesslog/fm/request/method": "0",
      "/accesslog/fm/request/sp5": " ",
      "/accesslog/fm/request/request": "/services/portal/",
      "/accesslog/fm/request/sp6": " ",
      "/accesslog/fm/request/version": "HTTP/1.1"
    }
  }
}

In the following, whenever we do any changes to the code and then start the aminer, make sure to execute the commands sudo ansible-playbook /home/ubuntu/aminer/playbook.yml followed by aminer -C -f. The output shows that the aminer detected a new type of log event (NewMatchPathValueDetector) and the value 200 for the status (NewMatchPathValueDetector). Great!

Creating a new detector

Now it's time to create our own detector. We start by creating the following file:

root@user-1:/home/ubuntu/aminer# touch /home/ubuntu/aminer/source/root/usr/lib/logdata-anomaly-miner/aminer/analysis/SequenceDetector.py

Edit the file and insert the following detector template:

import time
import os
import logging

from aminer import AminerConfig
from aminer.AminerConfig import STAT_LEVEL, STAT_LOG_NAME, CONFIG_KEY_LOG_LINE_PREFIX
from aminer.AnalysisChild import AnalysisContext
from aminer.util import PersistenceUtil
from aminer.events.EventInterfaces import EventSourceInterface
from aminer.input.InputInterfaces import AtomHandlerInterface
from aminer.util.TimeTriggeredComponentInterface import TimeTriggeredComponentInterface


class SequenceDetector(AtomHandlerInterface, TimeTriggeredComponentInterface, EventSourceInterface):
    """This class creates events when new value sequences were found."""
    
    time_trigger_class = AnalysisContext.TIME_TRIGGER_CLASS_REALTIME

    def __init__(self, aminer_config, target_path_list, anomaly_event_handlers, seq_len, persistence_id='Default', learn_mode=False,
                 output_logline=True):
        """Initialize the detector."""
        self.target_path_list = target_path_list
        self.anomaly_event_handlers = anomaly_event_handlers
        self.learn_mode = learn_mode
        self.next_persist_time = None
        self.output_logline = output_logline
        self.aminer_config = aminer_config
        self.seq_len = seq_len
        self.persistence_id = persistence_id

        self.persistence_file_name = AminerConfig.build_persistence_file_name(aminer_config, self.__class__.__name__, persistence_id)
        PersistenceUtil.add_persistable_component(self)

    def receive_atom(self, log_atom):
        """Receive a log atom from a source."""
        print('Detector template received a log atom!')

    def get_time_trigger_class(self):
        """
        Get the trigger class this component should be registered for.
        This trigger is used only for persistence, so real-time triggering is needed.
        """
        return AnalysisContext.TIME_TRIGGER_CLASS_REALTIME

    def do_timer(self, trigger_time):
        """Check current ruleset should be persisted."""
        if self.next_persist_time is None:
            return 600

        delta = self.next_persist_time - trigger_time
        if delta < 0:
            self.do_persist()
            delta = 600
        return delta

    def do_persist(self):
        """Immediately write persistence data to storage."""
        pass

    def allowlist_event(self):
        """Allowlist an event."""
        pass

This template class comprises the following parameters in its init method:

  • aminer_config, which contains information from the config that we just created before.
  • target_path_list, a list of parser paths to be analyzed by the detector.
  • anomaly_event_handlers, a list of event handlers that process generated anomalies.
  • seq_len, the length of the subsequences to be learned.
  • persistence_id='Default', the name of the file holding the learned data.
  • learn_mode=False, a flag that indicates whether training or detection mode is active.
  • output_logline=True, a flag that indicates whether additional log information should be printed.

Furthermore, the template class involves the following methods:

  • receive_atom, the main method for processing parsed log data.
  • get_time_trigger_class, a simple method for specifying time triggers.
  • do_timer, a method that is automatically executed when the specified time has passed.
  • do_persist, a method for storing learned data on disk so that it can be reloaded in subsequent aminer executions.
  • allowlist_event, a method for injecting values to the learned data at runtime.

Note that at this point, the receive_atom method only contains a single debug print command that we will use to test the setup. To add the SequenceDetector to the aminer analysis pipeline, we have to adjust the configuration. Insert a detector of type SequenceDetector in the Analysis section of the config.yml:

Analysis:
        - type: "SequenceDetector"
          paths: ["/accesslog/fm/request/request"]
          seq_len: 3
          learn_mode: True

We pass three parameters to the SequenceDetector: The paths to be analyzed, which is set to the path of the request in the log event. The length of the sequence, which we set to 3 to correspond to our example above. Finally, we set the learn_mode to true, indicating that we want to build our model and not just detect anomalies.

When parsing the config.yml, the aminer needs to know how to interpret the parameters. First, open the schema for the config.yml as follows.

root@user-1:/home/ubuntu/aminer# vim /home/ubuntu/aminer/source/root/usr/lib/logdata-anomaly-miner/aminer/schemas/normalisation/AnalysisNormalisationSchema.py 

There are already a high number of parameters defined and we can reuse them for our component. The SequenceDetector does not introduce any new parameters. However, consider the following definition for the parameter seq_len.

'seq_len': {'type': 'integer', 'default': 3},

Note that definitions of new parameters can be made anywhere in the parameters of the 'Analysis' section. Note that in later version of the AMiner (>= 2.3.1), this parameter already exists - just make sure that it is there and do not add anything. Next, add the parameters that are specific to this detector into their own schema. For this, open the validation schema definition as follows.

root@user-1:/home/ubuntu/aminer# vim /home/ubuntu/aminer/source/root/usr/lib/logdata-anomaly-miner/aminer/schemas/validation/AnalysisValidationSchema.py

There are already a number of blocks defined. Add the following block to the list at an arbitrary position.

          {
              'id': {'type': 'string', 'nullable': True},
              'type': {'type': 'string', 'allowed': ['SequenceDetector'], 'required': True},
              'paths': {'type': 'list', 'schema': {'type': 'string'}, 'nullable': True},
              'seq_len': {'type': 'integer', 'min': 1},
              'persistence_id': {'type': 'string'},
              'learn_mode': {'type': 'boolean'},
              'output_logline': {'type': 'boolean'},
              'output_event_handlers': {'type': 'list', 'nullable': True},
          },

It is also necessary to tell the aminer to which analysis component this parameter belongs. For this, open the YamlConfig.py using the following command.

root@user-1:/home/ubuntu/aminer# vim /home/ubuntu/aminer/source/root/usr/lib/logdata-anomaly-miner/aminer/YamlConfig.py

Again, locate the section where analysis components are defined in the method build_analysis_components. There is a large structure consisting of several elif statements, one for each analysis component. Add a new elif block for our SequenceDetector anywhere in this structure:

elif item['type'].name == 'SequenceDetector':
                tmp_analyser = func(analysis_context.aminer_config, item['paths'], anomaly_event_handlers, persistence_id=item['persistence_id'],
                                    seq_len=item['seq_len'], learn_mode=learn, output_logline=item['output_logline'])

This line takes the parameters defined by the YamlSchema.py and passes it to the python classes. For example, the parameter seq_len is set to the value of the item 'seq_len' that will be defined in the config.yml.

Now everything is set and the aminer can be installed with the modified sources. As before, execute the following commands to install the aminer. The old installation is automatically replaced.

root@user-1:/home/ubuntu# sudo ansible-playbook /home/ubuntu/aminer/playbook.yml

Once installation completed successfully, run the aminer as before. You should obtain the following output:

root@user-1:/home/ubuntu/aminer# aminer -C -f
2021-01-08 09:54:18 New path(es) detected
...
Detector template received a log atom!
...
Detector template received a log atom!
Detector template received a log atom!
Detector template received a log atom!
Detector template received a log atom!
Detector template received a log atom!
Detector template received a log atom!
Detector template received a log atom!

In addition to the output that we already observed before, the aminer prints eight times the debug message "Detector template received a log atom!". This means that each of our input log events was parsed and transmitted to the receive_atom method of the SequenceDetector.

Developing an analysis method

Now we can run some analysis methods on that data. But how exactly does the parsed data look like? Well, let's just see by printing out information about each received log atom. Insert the following code in the receive_atom method of the SequenceDetector.

    def receive_atom(self, log_atom):
        """Receive a log atom from a source."""
        print('Detector template received a log atom!')
        for path, match_element in log_atom.parser_match.get_match_dictionary().items():
            print(' - ' + str(path) + ': ' + repr(match_element.match_object))

This code lists the paths and corresponding values of the parsed log event. In particular, it makes use of the so-called match_dictionary that holds all match_elements for each path defined in one branch of the parser. We then iterate over all these paths and print the match_object of the match_element for each of the paths. Note that we have to print the match_object with repr, since the aminer stores all parsed log data as byte strings to avoid any issues when handling special characters that might occur in parsed logs.

Install and run the aminer as before. You should get the following output (only the final section is shown).

Detector template received a log atom!
 - /accesslog: b'192.168.10.190 - - [29/Feb/2020:14:05:57 +0000] "POST /nag/task/save.php HTTP/1.1" 200 8754 "-" "-"'
 - /accesslog/host: b'192.168.10.190'
 - /accesslog/sp0: b' '
 - /accesslog/ident: b'-'
 - /accesslog/sp1: b' '
 - /accesslog/user: b'-'
 - /accesslog/sp2: b' '
 - /accesslog/time: 1582985157
 - /accesslog/sp3: b'] "'
 - /accesslog/fm/request: b'POST /nag/task/save.php HTTP/1.1'
 - /accesslog/sp6: b'" '
 - /accesslog/status: 200
 - /accesslog/sp7: b' '
 - /accesslog/size: 8754
 - /accesslog/combined: b' "-" "-"'
 - /accesslog/combined/combined: b' "-" "-"'
 - /accesslog/combined/combined/sp9: b' "'
 - /accesslog/combined/combined/referer: b'-'
 - /accesslog/combined/combined/sp10: b'" "'
 - /accesslog/combined/combined/user_agent: b'-'
 - /accesslog/combined/combined/sp11: b'"'
 - /accesslog/fm/request/method: 1
 - /accesslog/fm/request/sp5: b' '
 - /accesslog/fm/request/request: b'/nag/task/save.php'
 - /accesslog/fm/request/sp6: b' '
 - /accesslog/fm/request/version: b'HTTP/1.1'

This output shows that all information about the log data has successfully been parsed. Note that the path /accesslog/fm/request/request holds the match_object b'/nag/task/save.php', which is one of the values we used in our example above. Since these are the token we are interested in, we need to analyze the path /accesslog/request. Check your config.yml - we have already defined this path as a parameter value there! We can therefore use the parameter target_path_list, which is mapped to the paths parameter of the config.yml according to the YamlConfig.py, to filter our analysis procedure.

Let's try this out. Since our desired path is the first and only element of target_path_list, we use it to make a lookup in the match_dictionary. When a match_element is found, we just print the match_object as before. The code looks as follows:

    def receive_atom(self, log_atom):
        match_element = log_atom.parser_match.get_match_dictionary().get(self.target_path_list[0])
        if match_element is not None:
            print(repr(match_element.match_object))

And running the aminer with this code yields the following results:

b'/services/portal/'
b'/nag/'
b'/nag/task/save.php'
b'/services/portal/'
b'/kronolith/'
b'/services/portal/'
b'/nag/'
b'/nag/task/save.php'

We successfully got our values that we want to analyze! However, note that target_path_list does not necessarily only contain a single path, but possibly more than one. In this case, we want to analyze the combined occurrence sequences of all match_objects that correspond to the desired paths. This could come in handy when one match_object alone is not sufficient to describe the event. In our example, we could add the request method, e.g., GET, POST, etc., as a second path. This means that the occurrence of GET /nag/ would be different to the occurrence of POST /nag/, and therefore allow to learn more specific sequences. Alright, so let's adjust the code once more.

We update the code as follows. We iterate over all paths specified in the target_path_list and see if we can find them in the match_dictionary. As long as we find match_elements, we want to append their values to a list. Depending on whether the match_object is a byte string or not, we decode them and then add them to the values array. Once this is done and at least one values has been found, we print the result.

    def receive_atom(self, log_atom):
        """Receive a log atom from a source."""
        values = []
        all_values_none = True
        for path in self.target_path_list:
            match = log_atom.parser_match.get_match_dictionary().get(path)
            if match is None:
                continue
            if isinstance(match.match_object, bytes):
                value = match.match_object.decode()
            else:
                value = str(match.match_object)
            if value is not None:
                all_values_none = False
            values.append(value)
        if all_values_none is True:
            return
        log_event = tuple(values)
        print(log_event)

To check whether this code works as expected, we should add another path in the config. As mentioned before, we want to use the request method in addition to the request itself. This should look like this:

Analysis:
        - type: "SequenceDetector"
          paths: ["/accesslog/fm/request/method", "/accesslog/fm/request/request"]
          seq_len: 3
          learn_mode: True

Running the aminer with this method will yield the following output:

('0', '/services/portal/')
('0', '/nag/')
('1', '/nag/task/save.php')
('0', '/services/portal/')
('0', '/kronolith/')
('0', '/services/portal/')
('0', '/nag/')
('1', '/nag/task/save.php')

So, what happened here? Well, the aminer replaces the match_object with something that is easy to handle - in this case, the index of a list. When you open the ApacheAccessModel that is used to parse the data, you will find that the all allowed request methods are stored in a list. Therefore, the aminer uses '0' for GET and '1' for POST requests. But that's okay, since we are still able to differentiate them. Also, our code created tupels containing the request method along with the request for each log line! Fantastic! Now we can finally get to generating some sequences!

First, add a variable with an empty set that stores all the learned sequences. We use a set rather than a list, because we have to assume that in real world applications, a large number of sequences are possible, and a set allows for a faster lookup than lists. Designing code that supports efficient execution is key when it comes to analyzing large streaming data, such as logs. We also need a variable that holds the currently built sequence. Generate both variables in the init method.

self.sequences = set()
self.current_sequence = ()

In the receive_method code, delete the final print and replace it with code that incrementally builds the current_sequence. Since the sequence should be of a fixed length, we must check its size and either delete the first entry or not before appending the log_event at the end of the list.

    def receive_atom(self, log_atom):
        ...
        if len(self.current_sequence) < self.seq_len:
            self.current_sequence += (log_event,)
            if len(self.current_sequence) != self.seq_len:
                return
        else:
            self.current_sequence = self.current_sequence[1:] + (log_event,)
        print(self.current_sequence)
(('0', '/services/portal/'), ('0', '/nag/'), ('1', '/nag/task/save.php'))
(('0', '/nag/'), ('1', '/nag/task/save.php'), ('0', '/services/portal/'))
(('1', '/nag/task/save.php'), ('0', '/services/portal/'), ('0', '/kronolith/'))
(('0', '/services/portal/'), ('0', '/kronolith/'), ('0', '/services/portal/'))
(('0', '/kronolith/'), ('0', '/services/portal/'), ('0', '/nag/'))
(('0', '/services/portal/'), ('0', '/nag/'), ('1', '/nag/task/save.php'))

Looks like we have our sequences! All that's left to do is to check whether a sequence has been observed before or not. In case that it is new, we want to report that we found an anomaly and add it to the set of known sequences, assuming that we are still training our detector. In detection mode, we do not want to learn the sequence so that we are able to detect it again in future runs. The following code therefore checks whether the sequence is new and adds it to the set of known sequences if necessary, before reporting the anomaly. Just append it to the existing code.

    def receive_atom(self, log_atom):
        ...
        if self.current_sequence not in self.sequences:
            if self.learn_mode is True:
                self.sequences.add(self.current_sequence)
            print('Anomaly found!')

And now we run it again:

(('0', '/services/portal/'), ('0', '/nag/'), ('1', '/nag/task/save.php'))
Anomaly found!
(('0', '/nag/'), ('1', '/nag/task/save.php'), ('0', '/services/portal/'))
Anomaly found!
(('1', '/nag/task/save.php'), ('0', '/services/portal/'), ('0', '/kronolith/'))
Anomaly found!
(('0', '/services/portal/'), ('0', '/kronolith/'), ('0', '/services/portal/'))
Anomaly found!
(('0', '/kronolith/'), ('0', '/services/portal/'), ('0', '/nag/'))
Anomaly found!
(('0', '/services/portal/'), ('0', '/nag/'), ('1', '/nag/task/save.php'))

As expected, the aminer reports a lot of anomalies in the beginning, since no sequences are known. However, the last sequence is the same as the first sequence, and therefore no "Anomaly found!" message is printed. Seems to work! But we can easily do some more testing. While the aminer is running, just open a new terminal and append new lines to the input log file using the commands below. The aminer waits for new lines in the file and will immediately process them.

root@user-1:/home/ubuntu/aminer# echo "192.168.10.190 - - [29/Feb/2020:14:10:35 +0000] \"GET /services/portal/ HTTP/1.1\" 200 4345 \"-\" \"-\"" >> /home/ubuntu/access.log 
root@user-1:/home/ubuntu/aminer# echo "192.168.10.190 - - [29/Feb/2020:14:10:45 +0000] \"GET /kronolith/ HTTP/1.1\" 200 3452 \"-\" \"-\"" >> /home/ubuntu/access.log 
root@user-1:/home/ubuntu/aminer# echo "192.168.10.190 - - [29/Feb/2020:14:10:54 +0000] \"GET /nag/ HTTP/1.1\" 200 25623 \"-\" \"-\"" >> /home/ubuntu/access.log

The first two lines fit our known sequences, i.e., the last POST /nag/task/save.php is followed by a GET /services/portal/ and a GET /kronolith/. However, the next GET /nag/ yields a new sequence that is detected as an anomaly. The output should look like this:

(('0', '/nag/'), ('1', '/nag/task/save.php'), ('0', '/services/portal/'))
(('1', '/nag/task/save.php'), ('0', '/services/portal/'), ('0', '/kronolith/'))
(('0', '/services/portal/'), ('0', '/kronolith/'), ('0', '/nag/'))
Anomaly found!

The aminer output format

Our next task is to obtain the output in a standardized format that is understandable for humans and supports automatic processing by programs. Remember that we configured the StreamPrinterEventHandler in the config.yml to produce output in json format. Telling this event handler to output our anomalies as json objects is as simple as calling a method. All we have to do is prepare all the information that we want to output in the json object and pass it to the handler. For this, replace the previous code block that contained the print('Anomaly found!') command with the following code.

        if self.current_sequence not in self.sequences:
            if self.learn_mode is True:
                self.sequences.add(self.current_sequence)
            if self.output_logline:
                sorted_log_lines = [log_atom.parser_match.match_element.annotate_match('')]
            else:
                sorted_log_lines = [log_atom.raw_data]
            analysis_component = {'AffectedLogAtomPaths': [self.target_path_list], 'AffectedLogAtomValues': list(self.current_sequence)}
            event_data = {'AnalysisComponent': analysis_component}
            for listener in self.anomaly_event_handlers:
                listener.receive_event('Analysis.%s' % self.__class__.__name__, 'New sequence detected', sorted_log_lines, event_data, log_atom, self)

Instead of printing a simple string, we now tell every event handler to output an anomaly in the last line of the code. We provide them with the class name of our detector, a simple message that describes our detection, a list of affected log lines, additional data on the anomaly as event_data, the log_atom that triggered the anomaly, and a reference to our detector. In the lines before that, we define the missing variables: First, we assign the variable sorted_log_lines a fully parsed log line in case that self.output_logline is True, and the raw log data otherwise. We could store a list with the most recently processed log lines to provide all affected log lines of the sequence, but we will skip that for simplicity. We then define a nested dictionary for the event_data that holds the list of affected paths as well as the list of affected values. That's it! Now we run the aminer again and see how that affected the ouput, which is shown in the following.

...
{
  "AnalysisComponent": {
    "AnalysisComponentIdentifier": 3,
    "AnalysisComponentType": "SequenceDetector",
    "AnalysisComponentName": "SequenceDetector3",
    "Message": "New sequence detected",
    "PersistenceFileName": "Default",
    "TrainingMode": true,
    "AffectedLogAtomPaths": [
      [
        "/accesslog/fm/request/method",
        "/accesslog/fm/request/request"
      ]
    ],
    "AffectedLogAtomValues": [
      [
        "0",
        "/services/portal/"
      ],
      [
        "0",
        "/kronolith/"
      ],
      [
        "0",
        "/nag/"
      ]
    ],
    "LogResource": "file:///tmp/access.log"
  },
  "LogData": {
    "RawLogData": [
      "192.168.10.190 - - [29/Feb/2020:14:10:54 +0000] \"GET /nag/ HTTP/1.1\" 200 25623 \"-\" \"-\""
    ],
    "Timestamps": [
      1582985454
    ],
    "DetectionTimestamp": 1669867646.13,
    "LogLinesCount": 1
  }
}

Persisting learned data

As expected, the output now consist of valid json objects that contain the information we provided. These json objects could now be forwared to SIEMs for further analysis using the KafkaEventHandler, however, this is a topic for another wiki. Our detector is still not complete! One of features of the aminer is that the knowledge base that is learned during the training phase can be stored to disk and reused as well as extended in subsequent runs of the aminer. To do this, we have to implement the do_persist method as follows.

    def do_persist(self):
        """Immediately write persistence data to storage."""
        PersistenceUtil.store_json(self.persistence_file_name, list(self.sequences))
        self.next_persist_time = None

Whenever this method is called, the PersistenceUtil component stores the learned sequences in the file persistence_file_name. As visible in the do_timer method, the method is called every 600 seconds, or 10 minutes. However, the aminer also persists whenever it is suspended. It is therefore not necessary to adjust the timer or wait for 10 minutes, just make sure that you stop the aminer with Ctrl + c after you see the anomalies in the output. Also make sure that the line PersistenceUtil.add_persistable_component(self) is present in the init method as in the detector template. We now run the aminer again and check whether the data is written as expected. For this, view the following file.

root@user-1:/home/ubuntu/aminer# cat /var/lib/aminer/SequenceDetector/Default 
[[["string:0", "string:/nag/"], ["string:1", "string:/nag/task/save.php"], ["string:0", "string:/services/portal/"]], [["string:0", "string:/services/portal/"], ["string:0", "string:/kronolith/"], ["string:0", "string:/services/portal/"]], [["string:1", "string:/nag/task/save.php"], ["string:0", "string:/services/portal/"], ["string:0", "string:/kronolith/"]], [["string:0", "string:/services/portal/"], ["string:0", "string:/kronolith/"], ["string:0", "string:/nag/"]], [["string:0", "string:/services/portal/"], ["string:0", "string:/nag/"], ["string:1", "string:/nag/task/save.php"]], [["string:0", "string:/kronolith/"], ["string:0", "string:/services/portal/"], ["string:0", "string:/nag/"]]]

The directory /var/lib/aminer stores all persisted data by default. In the directory for our SequenceDetector, which is automatically generated by the aminer, we see that the file Default contains a nested list with all our learned sequences that we stored in self.sequences. But this does not mean that the aminer also uses that data when it is restarted. For this, we have to append the following code to the __init__ method.

        persistence_data = PersistenceUtil.load_json(self.persistence_file_name)
        if persistence_data is not None:
            for sequence in persistence_data:
                sequence_elem_tuple = []
                for sequence_elem in sequence:
                    sequence_elem_tuple.append(tuple(sequence_elem))
                self.sequences.add(tuple(sequence_elem_tuple))

This code first attemts to load the persisted data from the persistence file. If this is successful, i.e., the file is available and thus the loaded data is not None, we iterate over all loaded sequences and store them into the sequences variable that we use to determine whether a new sequence has been observed or not. Thereby, it is necessary to convert the loaded data into tuples, since json uses lists instead of tuples for storing data as visible in the /var/lib/aminer/SequenceDetector/Default file.

Now, our detector is usable for application on real log data, right? Well, we could probably still come up with some improvements. For example, imagine that more than one users access a web server at the same time. Then, their sequences might overlap and it could look to our algorithm as if new sequences were generated. See the following input data as an example, where two users from two different hosts, that correspond to the IP addresses at the beginning of the lines, generate logs.

192.168.10.190 - - [29/Feb/2020:16:02:02 +0000] "GET /services/portal/ HTTP/1.1" 200 6642 "-" "-"
192.168.10.4 - - [29/Feb/2020:16:02:33 +0000] "GET /services/portal/ HTTP/1.1" 200 7275 "-" "-"
192.168.10.190 - - [29/Feb/2020:16:03:35 +0000] "GET /nag/ HTTP/1.1" 200 1137 "-" "-"
192.168.10.4 - - [29/Feb/2020:16:03:51 +0000] "GET /kronolith/ HTTP/1.1" 200 77254 "-" "-"
192.168.10.190 - - [29/Feb/2020:16:03:53 +0000] "POST /nag/task/save.php HTTP/1.1" 200 2751 "-" "-"
192.168.10.4 - - [29/Feb/2020:16:03:59 +0000] "GET /services/portal/ HTTP/1.1" 200 7427 "-" "-"
192.168.10.4 - - [29/Feb/2020:16:04:08 +0000] "GET /nag/ HTTP/1.1" 200 2242 "-" "-"
192.168.10.190 - - [29/Feb/2020:16:04:28 +0000] "GET /services/portal/ HTTP/1.1" 200 3562 "-" "-"

Only looking at the requests and request methods, it now looks like there is a transition from /kronolith/ to /nag/task/save.php, even though these pages are visited by different users. To solve this issue, it is necessary to differentiate sequences by the IP address that corresponds to parser path /accesslog/host. But do not forget that the IP address itself should not be part of the sequence, otherwise our sequences will only apply for that one user and our learned data is not generally applicable. Are you up for the challenge?

It is planned that a component called EventSequenceDetector that implements this functionality is released with aminer version 2.3.0. Check it out to compare your solution with our detector! If you made some improvements or come up with your own detector that is useful for a common type of log data, the aminer team would be happy to hear from you - just contact us at aecid@ait.ac.at

[1] Forrest, S., Hofmeyr, S. A., Somayaji, A., & Longstaff, T. A. (1996, May). A sense of self for unix processes. In Proceedings of the 1996 IEEE Symposium on Security and Privacy (pp. 120-128). IEEE.

Clone this wiki locally