Animal Interaction Processing

2009-06-30 16:30, written by Robert Klemme

As stated I intend to exploit locality of interaction log messages by using LRU cache algorithm and handle log lines as efficiently as possible. The general concept looks like this:

  • Whenever a new interaction is seen in the log file, create a new InteractionProcessor for it and put it in some storage which allows for fast access.
  • Any further processing of log lines read for a particular interaction is dealt with by that InteractionProcessor instance.
  • The storage should have an upper limit on the number of entries and expire entries via a cache algorithm in order to avoid overusing memory.
  • To further reduce memory usage, write log lines of an interaction to the corresponding output file as soon as possible.

It should be immediately clear that this confronts us with a number of challenges we will now have to look at.

Challenge: Early Expiry

Considering that we do not have a means to detect the end of an interaction we can only rely on the expiry algorithmof our InteractionProcessor storage. This means, there is always the chance that an InteractionProcessor is evicted from the storage although there are more log records to come for this interaction. This situations becomes more likely if

  • the size limit of the storage is reduced,
  • the number of interactions which are active at one point in time increases.

The latter can be caused by high activity on the original system (many interactions are started in a time interval) or interactions having huge gaps (making them live longer).

Now, what does it mean for our processing? Since we can only detect the initial line of an interaction through the fact that there is not yet an interaction processor for this interaction, we may be in a situation that we believe this is a new interaction while in fact we have seen it already. This may lead to wrong filtering results (for example, if the beginning timestamp of this interaction needs to be evaluated for filtering and the difference between the real first timestamp and the second “first” timestamp makes a difference).

There are a few things that we can do:

  1. Check the filesystem for this interaction and read previous log records into memory before continuing processing this interaction.
  2. Remember all interaction ids along with their initial timestamps or with the file name to know whether an interaction has been seen already and to find it efficiently in the filesystem to read previous records (see 1).
  3. Nothing.

Option 1 is very inefficient, because we do not know the initial timestamp and so must potentially search multiple directories. Option 2 is dangerous because it does not impose any limits on the storage needed for interaction ids with their timestamps. That leaves us with option 3 – a seemingly bad choice on first sight.

To comfort you let’s try to find out how dangerous it actually is to do nothing about it. Assuming for the moment that test-gen.rb has a realistic model of log files we will see in practice. Now let’s also hope I get the math right… Looking into the source code, you will see that the average time interval between two lines of a single interaction is 5 seconds:

Timer = Struct.new :t do
  def tic
    self.t += rand(10_000) / 1000.0
  end
end

Furthermore we see that there are 1000 new interactions per minute (variable commands) or, put differently, a new interaction starts every 60ms. Furthermore there are on average 5.5 (2 + 3.5) log records per interaction (note, I fixed the time distribution of start times which wasn’t uniform in the first version of the generator):

while t < end_time
  # create new entries
  commands.times do
    id = generate_id
    tx.t = t + rand(60_000) / 1000.0

    entries.add(Entry.create(tx.t, id, 'START'))

    (2 + rand(8)).times do
      pick = rand(10)
      msg = pick == 0 ? MESSAGES.last : MESSAGES[pick % 2]
      entries.add(Entry.create(tx.tic, id, msg))
    end

    entries.add(Entry.create(tx.tic, id, 'END'))
  end

  # write entries of this minute
  entries.print t
  t += 60
end

This means, the average life span of an interaction is 22.5 seconds (5 sec * (5.5 – 1)). So now, continuing with our average calculation, this means that there are roughly 375 new interactions in a 22.5 second interval (22.5 sec / 60ms). So, for the average case keeping 400 InteractionProcessors in memory is sufficient. Of course we need to take into accound that this is just an average calculation and also does not cover temporary load surges but if we apply factor 100 (i.e. 40,000 storage limit) this still doesn’t look like it will blast system memory and we should be safe for most situations (unfortunately my stochastic skills are totally rusty today, so if someone cares to calculate the likelyhood of failure that would be interesting to see). Assuming that we need 0.5 KB per InteractionProcessor because of log lines that need to be kept in memory we’re at about 20 MB which isn’t a lot of memory these days.

To sum it up: with the formula (avg entries - 1) * avg interval * new IA per minute / 60s we get the average number of interactions active at a time. It turns out that for our sample log generator this is 375 which leaves a lot of headroom for the interaction processor storage. Without diving into stochastics too much it is obvious that we are pretty safe if we do not implement a solution for dealing with interactions that were expired too early.

Challenge: How to keep as few log lines in memory as possible?

Since our concept includes getting rid of log lines from memory as soon as possible, we will have to look at how this might be achieved. Remember, we want to only output interactions which are included in our filter criteria, which can be quite different and need to look at

  • interaction id,
  • initial timestamp,
  • last timestamp,
  • a string matched somewhere in any of the log lines.

It should be immediately clear that not all of these criteria can be evaluated when the initial line of an interaction is seen. A simple matches / does not match logic won’t help here. We need a filter that can tell us “matches”, “does not match” and “maybe matches” where the first two answers must only be given if there is no new information (log lines) which can make it invalid. So we will use a filter that returns any of :yes, :no or :maybe. This is our dummy filter, which also has a changed interface:

    YES = Class.new do
      def first(iap, line, ts) :yes end
      def initial(iap, line, ts) :yes end
      def followup(iap, line) :yes end
    end.new

We do not use the complete InteractionProcessor as argument but add the current line and also the timestamp if needed. This also leads to three processing states in our InteractionProcessor: undecided, including, excluding. We handle this using the state pattern:

  class InteractionProcessor

    UNDECIDED = Class.new do
      def process_initial(iap, line, time_stamp)
        case iap.coord.filter.first(iap, line, time_stamp)
        when :yes
          # we'll improve this once LRU is there
          iap.entries << Entry.new(time_stamp, line)
          INCLUDE
        when :no
          iap.entries.clear
          EXCLUDE
        when :maybe
          iap.entries << Entry.new(time_stamp, line)
          self
        else
          raise 'Illegal return'
        end
      end

      def process(iap, line, time_stamp)
        case iap.coord.filter.initial(iap, line, time_stamp)
        when :yes
          # we'll improve this once LRU is there
          iap.entries << Entry.new(time_stamp, line)
          INCLUDE
        when :no
          iap.entries.clear
          EXCLUDE
        when :maybe
          iap.entries << Entry.new(time_stamp, line)
          self
        else
          raise 'Illegal return'
        end
      end

      def append_line(iap, line)
        case iap.coord.filter.followup(iap, line)
        when :yes
          # we'll improve this once LRU is there
          l = iap.entries.last and l.line << line
          INCLUDE
        when :no
          iap.entries.clear
          EXCLUDE
        when :maybe
          l = iap.entries.last and l.line << line
          self
        else
          raise 'Illegal return'
        end
      end
    end.new

    INCLUDE = Class.new do
      def process_initial(iap, line, time_stamp)
        iap.entries << Entry.new(time_stamp, line)
        self
      end

      def process(iap, line, time_stamp)
        iap.entries << Entry.new(time_stamp, line)
        self
      end

      def append_line(iap, line)
        l = iap.entries.last and l.line << line
        self
      end
    end.new

    EXCLUDE = Class.new do
      def process_initial(iap, line, ts)
        self
      end

      def process(iap, line, ts)
        self
      end

      def append_line(iap, line)
        self
      end
    end.new

  # ...

    # Process the first line
    def process_initial(time_stamp, line)
      @state = @state.process_initial(self, line, time_stamp)
    end

    # Process an initial line
    def process(time_stamp, line)
      @state = @state.process(self, line, time_stamp)
    end

    # Append a continuation line to the last entry
    def append_line(line)
      @state = @state.append_line(self, line)
    end

  # ...

  end

Challenge: Number of Open File Descriptors

If we store 40,000 InteractionProcessors in memory then the worst case with regard to file descriptors is that all have one open. That’s more than usually allowed for user processes. We’ll remedy this by using a LRU strategy here as well: we will have a set of open files and close the least recently used if we hit a fixed limit. I will cover this in one of the next versions, when we’ll have LRU handling integrated. (Maybe there is even a gem we can reuse.)

Summary and Outlook

This has grown into a rather largish article but I wanted to cover InteractionProcessor with reasonable completion. Note, that we still do not have the LRU mechanics and a couple of other things so this class will change again. Next articles will have to deal with LRU implementation if I do not find anything suitable and integration of that into the rest of the application. Furthermore, I expect the implementation of the filtering to be one more complex and thus interesting piece. Stay tuned.

blog comments powered by Disqus