Twig: Part 1 – An introduction

The analysis module I’ll be building is going to be very straightforward and extremely simple in nature. I called it Twig because of it’s architecture. It can be best described as a hierarchical, message passing framework where each node modifies the incoming message and passes it on. Each node also has any number pf modifiers (filters/overrides, etc.) which provides a way to change a pipeline’s structure and message flow without having to write an entirely new node.

Once the foundation is built, the possibilities are infinite. Nodes can become anything including file parsers, database loaders or even network streams. The largest benefit of such a system is the extreme level of code reuse it offers. It also allows the coder to fundamentally alter the data flow with very few LOC.

However, the stream processing paradigm has been around for decades, with entire languages built around the idea of simple input and outputs of messages. I was first inspired to go down this path by the Linux command-line | (pipe) operator. Being able to pipe the output into another program / process seemed like it would make an extremely useful python module. However, the command-line is limited somewhat in the complexity of data flows which can be built.I first built a larger version at work and it was a resounding success. For projects outside of work I want a similar framework that I can use for my own projects.

Get to the code already!

Perhaps the best place to start is the Node class. For starters, every node has a name and a list of child nodes. The Node class controls the structure of the pipeline and is used to manage the data flow.

The constructor

class Node(object):
    """docstring for Node"""
    def __init__(self, name):
        super(Node, self).__init__(name) = name

        # children
        self.children = {}

The structural methods

    def addChild(self, child, lazy=False):
        """Adds a child node to this node.
        If a child of the same name already
        exists, an error will be raised unless
        the lazy argument is True. If lazy is
        True, an integer will be added to the child's name.
        if not lazy:
            if in self.children:
                raise Exception("Same name siblings are not allowed. " \
                    "Child node, %s, already exists." %
            self.children[] = child
            name = str(
            num = 0
            while name in self:
                num += 1
                name = ("-"+str(num))
   = name
            self.children[name] = child
        return self

    def getChildren(self):
        """Returns a list of children for this Node."""
        return self.children.itervalues()

    def removeChild(self, name):
        """Removes the child with the given name."""
        del self.children[name]
        return self

    def clear(self):
        """Removes all children from this node"""
        self.children = {}

    def __getitem__(self, name):
        """Returns the child Node if it exists, otherwise returns None."""
        return self.children.get(name, None)

    def __contains__(self, name):
        """Returns True if the node has a particular node."""
        return name in self.children

Utility methods

    def __len__(self):
        return len(self.children)

    def __str__(self):
        return "<%s: name=\"%s\">" % (self.__class__.__name__,

    def __iter__(self):
        return self.getChildren()

Now that the node structure is in place, the pipeline methods have to be written. As a matter of preference, I like to keep the structure and the data processing methods separate. The only real reason is to keep the different methods divided by functionality so the code base stays manageable.

For now the Stream class holds only 6 methods:

class Stream(Node):
    """Streams manage the data processing for
    all the messages in the pipeline."""
    def __init__(self, name):
        super(Stream, self).__init__(name)
        self.initialized = False

    def preProcess(self, msg=None):
        """Handles any initialization
        for the data Stream. The default implementation
        does nothing except allow for future

    def process(self, msg=None):
        """The main processor for each Node."""
        raise Exception("'process' method not implemented")

    def postProcess(self, msg=None):
        """Handles any processing when the Stream closes."""

    def scatter(self, msg=None):
        """Sends the results of this node to its children."""
        for child in self.children.values():

    def gather(self):
        """Handles any post processing of the
        child nodes after a message was 'scattered'"""

    def execute(self, msg=None):
        """Handles the message flow through the Stream node. """
        # The preProcess method is called when the first
        # message is received.
        if not self.initialized:

        # Only process non-null messages
        if msg is not None:
            result = self.process(msg)

            # scatter process result is non-null
            if results is not None:
            # 'Closes' the stream with the postProcess method.
            if self.initialized:

                # By scattering 'None', it signals all the
                # child nodes to exit as well.

        # Ensures the Stream is initialized on the first message
        if not self.initialized:
            self.initialized = True

Now what?

The Node and Stream classes are the core of the new Twig module. We’ve developed both the node structure and the processing flow. Now how do we use it?

The messages are currently simple Python dictionaries. In the past when dealing with larger amounts of data, the dictionary approach requires too much memory. However, it really depends on the use case as to how much memory will be used. If a Node doesn’t cache any messages and passes them along as it receives them, then there’s no problem. But if a Node reads an entire file before sending a list of lines to it’s child nodes, then you can run into an issue when the files increase in size. Therefore, if you know that you may be dealing with large files you need to keep these things in mind. Streaming is always preferred, however, sometimes it’s unavoidable.

Next time we’ll look at processing simple delimited files using Twig. We’ll also add some functionality to the base Stream class which will allow for Stream modification and filtering.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>