First of all I took the existing cassandra-unit library and tweaked it. Normally when starting embedded Cassandra via cassandra-unit, you specify a configuration file (cassandra.yaml) and optionally a temporary directory. Cassandra-unit would then load the file as a classpath resource and copy the contents to a new file into the specified temp directory (default target/embeddedCassandra). Hence the file has to be on the classpath and the path has to be relative. I thought it was much nicer if you could send instead an absolute path and the configuration file was used directly from where it was located. So from Python we could later modify the Cassandra config file the way we wanted and also put it in the final location. So the first thing you want to do is to clone the tweaked embedded-cassandra-starter code from git and create a jar artifact by running (yes you need to use Maven)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mvn clean package |
The outcome of this will be a jar file that you can put into your Python project, i.e. under resources/cassandra. The next thing you need is a vanilla Cassandra configuration file (cassandra.yaml) in your project. We also have that one checked in along with the jar file, i.e. under resource/cassandra and we called it cassandra.yaml.template
We use the .template extension because the file contains two placeholders ({{CASSANDRA_DIR}} and {{CASSANDRA_PORT}}) which will be replaced later. A great co-worker of mine then wrote a Python class called EmbeddedCassandra. This class will in its __init__ method find an available port and create a random directory in the systems temporary directory (let’s call it work directory for now). EmbeddedCassandra also has a start and a stop method. The start method will copy the configuration template file into the work directory and replace the two placeholders mentioned above. Finally it will start a new Java process using using the subprocess module. It will basically invoke the jar file that we build earlier in the same way as you would from the command-line (Example can be found here). The stop method in EmbeddedCassandra will bring down the process and do some cleanup.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: UTF-8 | |
import shutil | |
import socket | |
import subprocess | |
from time import sleep | |
import uuid | |
import os, tempfile | |
from logging import getLogger | |
CASSANDRA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../resources/cassandra')) # change me | |
JAR = os.path.join(CASSANDRA_DIR, 'embedded-cassandra-starter-1.0-SNAPSHOT.jar') | |
CONFIG_TEMPLATE = os.path.join(CASSANDRA_DIR, 'cassandra.yaml.template') | |
class EmbeddedCassandra(object): | |
def __init__(self, port=None): | |
""" | |
Initializes a embedded Cassandra server on the specified port. | |
port -- a available port in range 1024-65535 | |
""" | |
self.process = None | |
self.logfile = None | |
self.port = port or EmbeddedCassandra.sort_of_find_free_port() | |
self.logger = getLogger('embedded cassandra') | |
work_dir_selected = False | |
while not work_dir_selected: | |
random_dir = "%s_%s" % ('cassandra', uuid.uuid1()) | |
proposed_dir = os.path.join(tempfile.gettempdir(), 'embcassandra', random_dir) | |
if os.path.exists(proposed_dir): | |
continue | |
os.makedirs(proposed_dir) | |
work_dir_selected = True | |
self.work_dir = proposed_dir | |
self.config_file = os.path.join(self.work_dir, 'cassandra.yml') | |
self.logger.debug("Cassandra work dir: %s" % (self.work_dir,)) | |
@staticmethod | |
def sort_of_find_free_port(): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(('127.0.0.1', 0)) | |
port = s.getsockname()[1] | |
s.close() | |
return port | |
def _write_yml(self): | |
yml = open(CONFIG_TEMPLATE).read() | |
yml = yml.replace('{{CASSANDRA_DIR}}', self.work_dir) | |
yml = yml.replace('{{CASSANDRA_PORT}}', str(self.port)) | |
with open(self.config_file, 'w') as f: | |
f.write(yml) | |
def start(self): | |
self.logger.info("Starting embedded cassandra on port %d" % (self.port,)) | |
self._write_yml() | |
self.logfile = open(os.path.join(self.work_dir, 'cassandra.log'), 'w', buffering=1) | |
args = ['java', '-cp', JAR, 'se.javasplitter.CassandraStarter', self.config_file] | |
self.process = subprocess.Popen( | |
args, | |
stdout=self.logfile, | |
stderr=self.logfile, | |
) | |
cassandra_started = False | |
while not cassandra_started: | |
with open(self.logfile.name) as logfile_read: | |
log = logfile_read.read() | |
if "Listening for thrift clients" in log: | |
cassandra_started = True | |
self.logger.info("Started embedded cassandra on port %d" % (self.port,)) | |
def stop(self): | |
self.logger.info("Stopping embedded cassandra on port %d" % (self.port,)) | |
self.process.kill() | |
self.logfile.close() | |
sleep(1) # wait for the java process to unlock files in self.work_dir | |
shutil.rmtree(self.work_dir) |
All of this is now wrapped into a EmbeddedCassandraTestCase class, which acts as a base class for unit tests that want to test against Cassandra. This class invokes start and stop in its setup and tearDown method.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: UTF-8 | |
import logging | |
import unittest | |
import sys | |
from javasplitter.embedded_cassandra import EmbeddedCassandra | |
class EmbeddedCassandraTestCase(unittest.TestCase): | |
def __init__(self, methodName='runTest', port=None): | |
super(EmbeddedCassandraTestCase, self).__init__(methodName) | |
self.desired_port = port | |
def setUp(self): | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
self.embedded_cassandra = EmbeddedCassandra(self.desired_port) | |
self.embedded_cassandra.start() | |
self.cassandra_host = 'localhost:%d' % self.embedded_cassandra.port | |
def tearDown(self): | |
self.embedded_cassandra.stop() |
So now you are able to write some nice Python unittests (rather integration tests) against Cassandra, for instance using the great Pycassa library. Here is a simple example.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: UTF-8 | |
import random | |
import string | |
from pycassa import ConnectionPool, SystemManager, ColumnFamily, SIMPLE_STRATEGY, UTF8_TYPE, INT_TYPE | |
from javasplitter.embedded_cassandra_test import EmbeddedCassandraTestCase | |
class RandomWordTest(EmbeddedCassandraTestCase): | |
def setUp(self): | |
super(RandomWordTest, self).setUp() | |
self.keyspace = "javasplitter" | |
self.column_family = "awesome_cf" | |
system_manager = SystemManager(server=self.cassandra_host) | |
system_manager.create_keyspace(self.keyspace, SIMPLE_STRATEGY, {'replication_factor': '1'}) | |
system_manager.create_column_family(self.keyspace, self.column_family, | |
key_validation_class=INT_TYPE, comparator_type=UTF8_TYPE, default_validation_class=UTF8_TYPE) | |
system_manager.close() | |
def test_inserting(self): | |
self.pool = ConnectionPool(self.keyspace, [self.cassandra_host], timeout=5.0) | |
awesome_cf = ColumnFamily(self.pool, self.column_family) | |
for x in xrange(10): | |
awesome_cf.insert(x, {'word': self._random_word(10)}) | |
row = awesome_cf.get(3) | |
self.assertTrue(row) | |
row = dict(row) | |
self.assertTrue('word' in row) | |
self.assertEquals(10, len(row['word'])) | |
def _random_word(self, length): | |
return ''.join(random.choice(string.ascii_lowercase) for x in range(length)) |