Developing a nose-test plugin to time python tests

by Mahmoud on February 28, 2011

Nose is a fantastic testing framework. What surprises me though, is that there’s no out of the box plugin to time tests to see which tests are the slowest, and most likely, problematic. After all, unit tests are supposed to be wicked fast. I googled, but nothing really came up except an insightful google-groups post.

I figured, what the hey, might as well just write a simple nose plugin to time the tests. I modeled it slightly off the xunit nose plugin. With the xunit plugin as a guiding example, I thought it was pretty trivial to write a nose plugin, definitely a testament to the great design chosen by nose.

Here’s the plugin below:

"""This plugin provides test timings to identify which tests might be
taking the most. From this information, it might be useful to couple
individual tests nose's `--with-profile` option to profile problematic
tests.

This plugin is heavily influenced by nose's `xunit` plugin.

Add this command to the way you execute nose::

--with-test-timer

After all tests are executed, they will be sorted in ascending order.

(c) 2011 - Mahmoud Abdelkader (http://github.com/mahmoudimus)

LICENSE:
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004

Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>

Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.

DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION

0. You just DO WHAT THE FUCK YOU WANT TO.

"""

import operator
from time import time

import nose
from nose.plugins.base import Plugin


class TestTimer(Plugin):
    """This plugin provides test timings

"""

    name = 'test-timer'
    score = 1

    def _timeTaken(self):
        if hasattr(self, '_timer'):
            taken = time() - self._timer
        else:
            # test died before it ran (probably error in setup())
            # or success/failure added before test started probably
            # due to custom TestResult munging
            taken = 0.0
        return taken

    def options(self, parser, env):
        """Sets additional command line options."""
        super(TestTimer, self).options(parser, env)

    def configure(self, options, config):
        """Configures the test timer plugin."""
        super(TestTimer, self).configure(options, config)
        self.config = config
        self._timed_tests = {}

    def startTest(self, test):
        """Initializes a timer before starting a test."""
        self._timer = time()

    def report(self, stream):
        """Report the test times"""
        if not self.enabled:
            return
        d = sorted(self._timed_tests.iteritems(), key=operator.itemgetter(1))
        for test, time_taken in d:
            stream.writeln("%s: %0.4f" % (test, time_taken))

    def _register_time(self, test):
        self._timed_tests[test.id()] = self._timeTaken()

    def addError(self, test, err, capt=None):
        self._register_time(test)

    def addFailure(self, test, err, capt=None, tb_info=None):
        self._register_time(test)

    def addSuccess(self, test, capt=None):
        self._register_time(test)


if __name__ == '__main__':
    nose.main(addplugins=[TestTimer()])

Something cool about nose was that you don’t have to install a plugin package-wide using setuptools, but you can actually just dynamically add it during run-time. All you have to do is:

import nose
from yourplugin import YourPlugin

if __name__ == '__main__':
    nose.main(addplugins=[YourPlugin()])

This way, you can just execute your tests as normal, like so:

python nose-testtimers.py --with-test-timer -sv --debug=sqlalchemy.engine

and you get a nice little output of test times :)

{ 5 comments }

Arbitrary Stack Trace in Python

by Mahmoud on February 11, 2011

I had a need to trace a function’s call stack to identify call chain paths in some difficult-to-follow Python code that was laced with lots of magic and abstractions.

I tried stepping through ipdb, but it took forever. So, I thought to myself, why can’t I just take a stack trace and I’ll be able to get the call stack. It didn’t help that this function was apparently called multiple times from different places so when I tried raising an unhandled exception, it only helped display the stack trace for the first call to the function, not the successive ones. I also tried throwing and catching the exception, assuming that a stack trace can share some information about its call chain. That didn’t work too well because the exception masked the stack[1].

So I wrote some simple code that will do exactly that, take a stack trace and format it as if it looked like an exception traceback. Without further ado, here is, as far as I know, the only way to get an arbitrary stack trace in Python from any line of code.

    import inspect
    import traceback

    # get the currently frames' stack
    # this returns the frameobject, the filename,
    # the line number of the current line, the
    # function name, a list of lines of context from
    # the source code, and the index of the current
    # line within that list.
    stack = inspect.stack()
    # reverse the stack trace so the most recent is at the bottom of the stack
    stack.reverse()
    try:
        stack_list = []
        for s in stack:
            _, filename, line_no, func_name, code_list, index_in_code_list = s
            stack_list.append(
                (filename, line_no, func_name, code_list[index_in_code_list]))
        print ''.join(traceback.format_list(stack_list))
    finally:
        # avoid memory leak issues
        del stack

Hope this helps in your debugging adventures.

I’d love to hear if there are any other ways of doing the same thing.

[1] I should probably revisit this later to see why it didn’t work. In theory it should do what the stack trace snippet above does.

EDIT:

My original assumption was right. Using a stack trace to get the call chain is one way of doing it.

Turns out Python already does this in the traceback.print_stack function.

Thanks to @teepark for the comment!

{ 3 comments }

I’ve recently had to do some work that required sorting a very large CSV file, containing fields with embedded newlines, quickly. As it turns out, Linux comes with a sort implementation that has a “–zero-terminated” option, which sorts on null-terminated delimited strings instead of the default newline separator.

Writing null-terminated CSV files

Since I was writing a process to generate these CSV files, I figured I can just use Python’s CSV module, which has support for different types of dialects. Inheriting from csv.Dialect, we can write a simple dialect that will allow us to terminate all lines with a null byte.

import csv
import struct

class null_terminated(csv.excel):
    lineterminator = struct.pack('B', 0)

csv.register_dialect("null-terminated", null_terminated)

Essentially, we’ve registered a global csv dialect called "null-terminated" that inherits from the excel dialect, which has sensible standard defaults.

Here’s a simple snippet that shows the usage of the new "null-terminated" dialect that I created above.

from csv import DictWriter

with open("/tmp/file.csv", "w") as f:
	dwriter = DictWriter(f, fieldnames=["id","field"], dialect="null-terminated")

	for i, field in enumerate(("foo", "bar", "baz", "bif")):
    		dwriter.writerow({"id": i, "field": field})

Now, /tmp/file.csv will contain a file with four rows that are separated by a null-terminator. As you can see, it’s pretty easy to write a null-terminated CSV file, but unfortunately, it’s a bit tricky to read a null-terminated csv file due to some inflexible hardcoded defaults.

Reading null-terminated CSV files

The CSV module’s unintuitive restriction for Dialect.lineterminator is hard-coded to recognize '\r' or '\n' as the end of line terminator, which unfortunately, means we will need to handle null-termination and implement reading ourselves.

There are many ways of writing a procedure to read null-terminated strings, but I figured the simplest algorithm is to read character-by-character, concatenating everything into a string until we reach a null byte, then we can just return the string. I’d figure an implementation might go something like this:

def read(fobj):
    current_string = ""
    while True:
        char = fobj.read(1)
        if char and char != nullbyte:
            current_string += char
        elif char == nullbyte:
            yield current_string
            current_string = ""
        elif not char:
            if current_string:
                yield current_string
            raise StopIteration

Looks awesome, but, how can we integrate this into the CSV module? We would want to just plug and play with the existing CSV module. A simple solution is to wrap the function above to iterate over each line, like so:

 # we use StringIO since cStringIO has poor unicode support
from StringIO import StringIO
from csv import reader

class NullTerminatedDelimiterReader(object):
    """
    A CSV reader which will iterate over lines in the CSV file 'f',
    which are line terminated by a null byte

    """

    def __init__(self, f,  dialect, *args, **kwds):
        # satisfying DictReader instance
        self._line_num = 0
        self.fobj = f
        self.dialect = dialect
        self.reader = self._read()
        self.string_io = StringIO()

    def _properly_parse_row(self, current_string):
        self.string_io.write(current_string)
        # seek to the first byte
        self.string_io.seek(0)
        # we instantiate a reader here to properly parse the row
        # taking into account escaping, and various edge cases
        return next(reader(self.string_io, dialect=self.dialect))

    def _read(self):
        current_string = ""
        while True:
            char = self.fobj.read(1)  # read one byte
            if char and char != null_byte:
                # keep appending to the current string
                current_string += char
            elif char == null_byte:
                yield self._properly_parse_row(current_string)
                # increment instrumentation
                self._line_num += 1
                # clear internal reading buffer
                self.string_io.seek(0)
                self.string_io.truncate()
                # clear row
                current_string = ""
            elif not char:
                if current_string:
                    yield self._properly_parse_row(current_string)
                raise StopIteration

    @property
    def line_num(self):
        return self._line_num

    def next(self):
        return next(self.reader)

    def __iter__(self):
        return self

To use the DictReader class, we’ll inherit from the DictReader class and override the reader object. It’s the cleanest and simplest way of doing it.

class NullByteDictReader(csv.DictReader):
    def __init__(self, f, *args, **kwds):
        csv.DictReader.__init__(self, f, *args, **kwds)
        self.reader = NullTerminatedDelimiterReader(f, *args, **kwds)

with open("/tmp/file.csv", "r") as f:
    for line in NullByteDictReader(f, dialect="null-terminated"):
        print line["id"], line["field"]

Voila :)

Conclusions and Future Work

Something that might be interesting to pursue further is the possibility of writing, or wrapping a python interface around, a C library as a substitute for the current CSV module. It should be able to support different line terminators, multi-byte delimiters, and have unicode detection outside the box, which happen to be my main three gripes with the CSV module.

You can find working source code and implementation of this here and you should follow me on twitter here.

{ 6 comments }