Wednesday, 25 November 2009

A simple task queue

I'm working on a little sample framework - really only to keep my sanity and practice my chosen craft - that allows you to string together tasks in a pipeline for processing. To exercise that framework - to flush out the pros and cons of the implementation - I'm writing a sample application.

The basic idea is that each task is interested in an event. That event could be the arrival of a file, or the completion of another task etc. To string tasks together, I've created a simple database queue system - as a task completes, it writes to the queue and then other tasks which are interested will see that event and then begin. Tasks complete either successfully (by returning no errors) or unsuccessfully (by returning more than one error).

An important concept is to be able to add (or remove) tasks from the pipeline programmatically and without changing the database schema. So, quite simply, when a task completes, that event is written to the queue - but how do we find which events another task is interested in? And how do we do this in a way that won't take longer over time?

So, lets assume Job2 is interested in the successful completion of Job1. We need to find all successful Job1 events which haven't already been processed by Job2 (successfully or otherwise):

[sourcecode lang='sql']
SELECT * FROM QUEUE where job='job1' and result='SUCCEEDED' and task not in (SELECT task FROM QUEUE where job='job2' and result is not null)
[/sourcecode]

I'm no SQL ninja, so this is my first simple solution. It works, but it doesn't scale. With a thousand rows, it completes pretty quickly, but with tens of thousands of rows it takes way too long (nearly 8 minutes with 40000 rows running on my Dell Inspiron laptop).

Pragmatically, we can restrict the query based on time - we don't need to search the whole table, just the more recent events:

[sourcecode lang='sql']
SELECT * FROM QUEUE where tstamp >= ? and job='job1' and result='SUCCEEDED' and task not in (SELECT task FROM QUEUE where tstamp >= ? and job='job2' and result is not null)
[/sourcecode]

You could easily just look over the events in the last hour if your tasks are short running and you get less than a thousand events an hour - it'd probably perform okay. To give an indication of performance I've written a little groovy script to show some basic trends - included below.

paul@paul-laptop:~$ groovy sqltest
2005 rows took: 4
4005 rows took: 5
6005 rows took: 3
8005 rows took: 3
10005 rows took: 2
12005 rows took: 3
14005 rows took: 3
16005 rows took: 4
18005 rows took: 5
20005 rows took: 3


Notice though, that this is a great example of where indexes really help - without the index times get larger as the rowcount gets larger:

paul@paul-laptop:~$ groovy sqltest
2005 rows took: 13
4005 rows took: 34
6005 rows took: 123
8005 rows took: 134
10005 rows took: 229
12005 rows took: 237
14005 rows took: 330
16005 rows took: 342
18005 rows took: 428
20005 rows took: 442


Its a pity I have to worry about the time restriction, but it does make the solution workable and at least in my intended application, appropriate. I was hoping to use the SQL MINUS function, but it appears MYSQL doesn't support it.

Here's the groovy script I used to generate the results. The difference between using the time restriction and the index is so dramatic and obvious, it is a great example.

[sourcecode lang='java']
import groovy.sql.Sql
import groovy.grape.Grape


Grape.grab(group:'mysql', module:'mysql-connector-java', version:'5.1.10', classLoader: this.class.classLoader.rootLoader)

enum STATUS { SUCCEEDED, FAILED }

def go() {
def sql = Sql.newInstance("jdbc:mysql://localhost:3306/spike", "spike","password", "com.mysql.jdbc.Driver")
try {
sql.execute("drop table QUEUE")
} catch(Exception e){}

sql.execute("CREATE TABLE QUEUE (id INTEGER NOT NULL,job varchar (20) NOT NULL,task varchar (20) NOT NULL,result VARCHAR (20) NOT NULL, tstamp timestamp, PRIMARY KEY (id)) ENGINE = MyISAM")
sql.execute("create index idx1 on QUEUE(tstamp, job, result)")

(1..10).each() {
def d = createHistory(sql,1000*it)
findJobsToProcess(sql, d)
}
}

def findJobsToProcess(sql, d) {
def rowcount = 0
sql.eachRow("select count(*) from QUEUE") { row ->
rowcount = row[0]
}

long start, end

start = System.currentTimeMillis()
// now find the 'job1' items that haven't been processed by 'job2'
sql.eachRow("SELECT * FROM QUEUE where tstamp >= ? and job='job1' and result=? and task not in (SELECT task FROM QUEUE where tstamp >= ? and job='job2' and result is not null) order by tstamp asc", [d,STATUS.SUCCEEDED.name(),d]) {
// println "${it.id} ${it.job} ${it.task} ${it.result} ${it.tstamp}"
}
end = System.currentTimeMillis()
println "${rowcount} rows took: ${end-start}"

}

def createHistory(sql, count) {
sql.execute("truncate table QUEUE")
int id = 1
int task = 1
// create a "history" of previous jobs
STATUS.each() { status ->
(1..count/2).each() {
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job1',task.toString(),status.name(), new Date()])
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job2',task.toString(),status.name(), new Date()])
task++
}
}
sleep(1000)
def d = new Date()
// insert some successful 'job1' items which 'job2' hasn't processed
(1..5).each() {
sql.execute("insert into QUEUE (id,job,task,result,tstamp) values (?,?,?,?,?)", [id++,'job1',task.toString(),STATUS.SUCCEEDED.name(), new Date()])
task++
}
return d
}

go()

[/sourcecode]

In running this test, I used:

  • Dell Inspiron 1525, Intel(R) Core(TM)2 Duo Processor T5550, 1.83 GHz, 2MB Cache, 667 MHz FSB, 2GB (2 X 1024MB) 667MHz Dual Channel DDR2 SDRAM, 160GB 7200RPM Performance Hard Drive

  • Ubuntu 9.10 32 bit desktop edition

  • Groovy Version: 1.6.5

  • JVM: 1.6.0_03

  • mysql Ver 14.14 Distrib 5.1.37, for debian-linux-gnu (i486)

Monday, 16 November 2009

Rapache on Ubuntu 9.10

I've just now stumbled across Rapache, a useful GUI tool that makes configuring apache easy. I found it by accident in the Ubuntu Software Center, but unfortunately it would freeze while trying to add a new domain. I searched the web for answers, and found a bug report.

This didn't specifically reference Ubuntu 9.10 (rather, 9.04) and the file that needed to be patched didn't exist in the given location.

I found it easily enough:

paul@paul-laptop:~$ sudo find / -name RapacheGui.py[sudo] password for paul:
/usr/lib/pymodules/python2.5/RapacheGtk/RapacheGui.py
/usr/lib/pymodules/python2.6/RapacheGtk/RapacheGui.py
/usr/share/pyshared/RapacheGtk/RapacheGui.py
paul@paul-laptop:~$


I edited the last one (/usr/share/pyshared/RapacheGtk/RapacheGui.py) as documented in comment 23 to add the following at line 79:

if not Shell.command.ask_password(): sys.exit(1)


Note, this line MUST be preceeded by 8 spaces - indentation is important in Python.

Now, rapache would prompt for the system password and then close!

I've got it working now, by starting it with sudo:


sudo rapache


Everything seems to work. I've added a domain, turned on the include module, and added "AddOutputFilter INCLUDES .html" to the virtual host definition - all very quickly using rapache.

So, server side includes now work on my new virtual host - which is what I started out wanting to do before going off on this tangent.

Saturday, 7 November 2009

Tagging with Appengine DataStore

I've been looking into how to implement a tagging mechanism with Appengine (Python). By using a StringListProperty, you can associate a list of tags with an entity. The model would look something like this:


from google.appengine.ext import db

class Sample(db.Model):
name = db.StringProperty(required=True)
tags = db.StringListProperty()


Now, assuming we want to find the entities tagged with a given word, we can use a query like this:


q = db.GqlQuery("SELECT * FROM Sample where tags = :1",'z')


If we want to find all entities that have ANY of these tags (OR) we can query with GqlQuery or filter:


q = db.GqlQuery("SELECT * FROM Sample where tags in :1",['a','z'])

q = domain.Sample.all()
q.filter("tags in ", ['b','d'])


When we want to find entities with ALL of the given tags:


q = db.GqlQuery("SELECT * FROM Sample where tags = :1 and tags = :2",'b','c')
q = domain.Sample.all()
q.filter("tags = ", 'b')
q.filter("tags = ", 'c')



Too easy!

If you have an appengine project, you can try this code out by adding the entity (Sample - listed above) to your domain model and run the following in your development console (http://localhost:8080/_ah/admin/interactive):


import domain

domain.Sample(name='t1',tags=['a','b','c']).put()
domain.Sample(name='t2',tags=['b','c','d']).put()
domain.Sample(name='t3',tags=['c','d','e']).put()
domain.Sample(name='t3',tags=['x','Y','z']).put()

def display(r):
for r in results:
print r.name +" tags = "+' '.join(r.tags)


print "FIND WITH THIS TAG"
q = db.GqlQuery("SELECT * FROM Sample where tags = :1",'z')
results = q.fetch(5)
display(results)

print "FIND WITH ANY OF THESE TAGS (OR)"
q = db.GqlQuery("SELECT * FROM Sample where tags in :1",['a','z'])
results = q.fetch(5)
display(results)

print "FIND WITH ANY OF THESE TAGS (OR)"
q = domain.Sample.all()
q.filter("tags in ", ['a','z'])
results = q.fetch(5)
display(results)

print "FIND WITH ANY OF THESE TAGS (AND)"
q = db.GqlQuery("SELECT * FROM Sample where tags = :1 and tags = :2",'b','c')
results = q.fetch(5)
display(results)

print "FIND WITH ALL OF THESE TAGS (AND)"
q = domain.Sample.all()
q.filter("tags = ", 'b')
q.filter("tags = ", 'c')

results = q.fetch(5)
display(results)


This should produce the following result:


FIND WITH THIS TAG
t3 tags = x Y z
FIND WITH ANY OF THESE TAGS (OR)
t1 tags = a b c
t3 tags = x Y z
FIND WITH ANY OF THESE TAGS (OR)
t1 tags = a b c
t3 tags = x Y z
FIND WITH ANY OF THESE TAGS (AND)
t1 tags = a b c
t2 tags = b c d
FIND WITH ALL OF THESE TAGS (AND)
t1 tags = a b c
t2 tags = b c d


So, it's easier than I thought. But I'm not experienced with DataStore yet, and I don't know if there are inherit limitations with this approach - remember, there are limits on the way you retrieve data.