Pick a task to work on

There is new blogpost on this subject!

There are cases where system stores list of things to do, and then there are some worker processes that check the list, pick something to work on, do it, and remove from the list.

Proper solution is to use some kind of queuing system. There is even PgQ which works withing PostgreSQL, but some people are not happy with it, as it requires compilation and installation. So they just use plain selects.

Will that work OK?

Let's imagine, that our queue contains 10 thousand strings, and we want to calculate MD5 checksums of them.

Since I want my test to be at least a bit realistic, I will also need priority. Some tasks are more important than others. Let me assume that priority is based on number of repetitions of first character. So string “ddwhatever" should be processed before “abc" (because of “d" repetition).

Additionally, I will need some timestamp – to simulate the fact that some rows have been inserted earlier, and some later. For now, it will be pretty simple – I will choose some arbitrary timestamp, and then increment with 1 second for every line.

Since I will be redoing the test multiple times, I'll pre-generate list of strings, with simple python:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import string
import random
import time
 
 
def line_generator(use_time):
    size = 50
    chars = string.letters + string.digits
    random_string = ''.join(random.choice(chars) for x in range(size))
    priority = 49 - len(random_string.lstrip(random_string[0]))
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(use_time))
    return '%s\t%s\t%s\n' % (random_string, str(priority), timestamp)
 
start_time = time.time() - 100000000
f = file('/tmp/md5_data.txt', 'w')
for i in xrange(1, 10001):
    f.write(line_generator(start_time))
    start_time += 1
f.close()

The data looks like this:

1hEBJdCz8vpbiBv3oQWZzksmPyROyFRnqCHcxaQIOnf9Bt69DX	0	2010-06-30 03:21:15
3UeHKPjmzKhPOptZtE59XYpAbNcuryUhDW6lrqtNwoGL53kpg6	0	2010-06-30 03:21:16
VTboIg1uaYirybCDFwgBglOkdAV9QD20cafPkQso9vLsggU0WQ	0	2010-06-30 03:21:17
lC2R2dWlCQlEvhNuS991mnsmATeXlAwxvCE3lqmr64J4Eumd81	0	2010-06-30 03:21:18
PPuMHqGcQfxMlPJMBYlYXI4DwMWYqiuyjTeQCobBiDpTQp9kAv	1	2010-06-30 03:21:19
hMJYfwpgu29rR2fTAeGW5cIArEoQdI9kgzXYts4Ca294bCV96H	0	2010-06-30 03:21:20
oepH8Tq4ZrnbI957fnK1ElI6cEuIVZHVicUeHDtVB1dSUKu0iK	0	2010-06-30 03:21:21
gEsvbKdW27jUecvE8mPpwKfs7CMuP2GRxEbTPb8cUz4udIpz3q	0	2010-06-30 03:21:22
EbHEyj6WfV9YxLfWD5UBYiXvFfnHY2aOpX1YqOHQhyyMpNWjWR	0	2010-06-30 03:21:23
9wwIbPy3y1Ec2TlwgQPOXZQCrDzEnJfPZoAciZ2YsXOyMh7x73	0	2010-06-30 03:21:24

So, now, let's make the queue table:

CREATE TABLE queue (
    job text,
    priority int4,
    added_on timestamptz
);
\COPY queue FROM /tmp/md5_data.txt

With this in place, I'll add the index that will let me quickly find rows to process:

CREATE INDEX queue_idx ON queue (priority DESC, added_on ASC);

OK. So the basic query to get 1st row to process would be:

SELECT * FROM queue ORDER BY priority DESC, added_on ASC LIMIT 1;
                        job                         | priority |        added_on        
----------------------------------------------------+----------+------------------------
 66664o28k4haYpPdiLRB7uvh17kYPZA9zg2WIiYv2ka6TxqYAj |        3 | 2010-06-30 12:14:46+02
(1 ROW)
 
EXPLAIN analyze
SELECT * FROM queue ORDER BY priority DESC, added_on ASC LIMIT 1;
                                                          QUERY PLAN                                                          
------------------------------------------------------------------------------------------------------------------------------
 LIMIT  (cost=0.29..0.36 ROWS=1 width=44) (actual TIME=0.036..0.036 ROWS=1 loops=1)
   ->  INDEX Scan USING queue_idx ON queue  (cost=0.29..770.28 ROWS=10000 width=44) (actual TIME=0.035..0.035 ROWS=1 loops=1)
 Total runtime: 0.079 ms
(3 ROWS)

OK. it works, and is clearly indexes. So we're good.

Now. The first solution to the queue processing, is to write simple script that will read the data from queue, do the work on every row, and delete it from queue after the work is done.

Like this one:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5930 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('select * from queue order by priority desc, added_on asc limit 1')
    row = cur.fetchone()
    if row is None:
        break
    ignore = process_item(row)
    processed = processed + 1
    cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)

I ran the script three times. Each of course processed 10k rows (between runs I refilled the table, with the same data). Average time was 63.822 seconds.

Now, we can parallelize it. What will happen if I'd run 2 processing scripts? Let's see:

=$ ./processor-1.py & ./processor-1.py & TIME wait
[1] 14231
[2] 14232
processed 10000 ROWS.
processed 10000 ROWS.
[1]-  Done                    ./processor-1.py
 
REAL    1m4.833s
USER    0m4.304s
sys     0m2.940s

Whoa. This looks bad. Total time is more or less the same, but both of the scripts did process 10k rows? So each row was processed twice. That's not good.

The reason is that while I do “select" – the row is fully available to the other processor.

So maybe I'll add there “select for update"? Make the cur.execute line:

    cur.execute('select * from queue order by priority desc, added_on asc limit 1 for update')

Result:

=$ ./processor-1.py & ./processor-1.py & TIME wait
[1] 14409
[2] 14410
processed 5000 ROWS.
processed 5000 ROWS.
[1]-  Done                    ./processor-1.py
[2]+  Done                    ./processor-1.py
 
REAL    1m3.487s
USER    0m2.336s
sys     0m1.792s

Rows were splitted now correctly, but the total time didn't decrease. This is because when first processor does the select for update, it locks the row and second processor has to wait. It can't skip the row, so it has to wait.

There is “FOR UPDATE NOWAIT" version of the query, but the problem is that it is raising exception, which forces application to rollback, and retry. Not good.

Of course, I could add some random offset, so that usually not the first row would be selected, and chances of interlocking would be small, but then – it doesn't sound as sane solution – we want the rows to be processed in order.

Now. Another solution would be to add column: “in_process" which would be updated to true, change would be committed. And then, after processing is done – we can remove the row.

This would work, but it has one huge disadvantage. What will happen if the processing application dies without removing the row and without changing back the “in_process" flag?

The row would stay forever as “in_process".

Of course – we could add some maintenance, and make “in_process" not boolean but timestamp. In this way we could assume that if something is in process for too long (5 minutes?) we assume it has to be redone.

Not really cool. But there is another approach. Let's store not timestamp, but integer – the integer would mark which backend is processing the row. If the backend is no longer there – it has to be redone.

Let's modify the table:

ALTER TABLE queue ADD COLUMN processed_by INT4;

Now, the script that processes rows is:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5930 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('''
        select *
        from queue
        where processed_by is null
            or not exists (select * from pg_stat_get_activity( processed_by ) )
        order by priority desc, added_on asc limit 1 for update
        ''')
    row = cur.fetchone()
    if row is None:
        break
    cur.execute('update queue set processed_by = pg_backend_pid() where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
    ignore = process_item(row)
    processed = processed + 1
 
    cur.execute('delete from queue where priority = %s and added_on = %s and processed_by = pg_backend_pid()', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)

Test run:

=$ ./processor-2.py & ./processor-2.py & time wait
[1] 16959
[2] 16960
processed 5000 rows.
[1]-  Done                    ./processor-2.py
processed 5000 rows.
[2]+  Done                    ./processor-2.py
 
real    2m6.649s
user    0m3.772s
sys     0m3.564s

What?

I checked, the Pg logs. Apparently summarized times of all queries previously (select for update/delete) is ~ 114 seconds. Now – the time skyrocketed to 240s.

First thing – what will happen with more backends?

=$ for i in 1 2 3 4; do ./processor-2.py & done; time wait
[1] 17346
[2] 17347
[3] 17348
[4] 17349
processed 2087 rows.
processed 2676 rows.
processed 2859 rows.
processed 2378 rows.
[1]   Done                    ./processor-2.py
[3]-  Done                    ./processor-2.py
[4]+  Done                    ./processor-2.py
[2]+  Done                    ./processor-2.py
 
real    2m3.399s
user    0m3.064s
sys     0m2.444s

Why is it so slow? I looked at the pg query log the time try with two processors, and found out that 167 second of the time (out of total 268) was taken by COMMIT. 39 seconds by “select for update", and 31 by “delete".

My guess is that doing the update and commit we are simply forcing more IO (at the very least the xlog has to be fsynced).

I could have tested with unlogged tables, but I think that the queue table should be logged – so that it wouldn't vanish in case of sudden system restart.

So – it looks that we're stuck – processing in parallel takes the same time due to locks, and if we'll add information about lock that can be bypassed by other processors – it's even slower due to more writes.

Or are we?

I wrote couple of times about magnificent thing called advisory locks. Let's see if they can help us now.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5930 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('''
    select *
    from queue
    where pg_try_advisory_xact_lock(123, hashtext( priority::text || added_on::text ) )
    order by priority desc, added_on asc limit 1 for update
    ''')
    row = cur.fetchone()
    if row is None:
        break
    ignore = process_item(row)
    processed = processed + 1
    cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)

First test:

=$ for i in 1 2; do ./processor-3.py & done; time wait
[1] 20100
[2] 20101
processed 5000 rows.
[1]-  Done                    ./processor-3.py
processed 5000 rows.
[2]+  Done                    ./processor-3.py
 
real    1m2.254s
user    0m2.656s
sys     0m2.144s

It's actually not bad. A bit faster than serial processing. And what if I'd add more processors?

=$ for i in 1 2 3 4; do ./processor-3.py & done; time wait
[1] 20203
[2] 20204
[3] 20205
[4] 20206
processed 2500 rows.
processed 2499 rows.
processed 2501 rows.
processed 2500 rows.
[1]   Done                    ./processor-3.py
[2]   Done                    ./processor-3.py
[4]+  Done                    ./processor-3.py
 
real    0m30.911s
user    0m1.936s
sys     0m1.508s

Now this is nice. Please note that I got uneven distribution – one of the processors did 2499, two 2500 and one 2501. Which is great.

Finally, let's up the count one more time, to see how it will work out:

=$ for i in {1..8}; do ./processor-3.py & done; time wait
...
processed 1249 rows.
processed 1250 rows.
processed 1250 rows.
processed 1250 rows.
processed 1250 rows.
processed 1250 rows.
processed 1251 rows.
processed 1250 rows.
...
real    0m14.704s
user    0m2.148s
sys     0m1.112s

It's all good.

Now – if you can, please do use proper solution (pgq or something other having “mq" in the name).

But if you can't – you can actually get pretty good results with some advisory locks and simple sql queries. Without even plpgsql.

8 thoughts on “Pick a task to work on”

  1. Hi,

    could the for update in the last version be dropped? If I understand correctly, the advisory lock predicate is sufficient to hold back any other process from getting at that row during your transaction, and at the end of your transaction the row does not exist any more.
    If I am not wrong, a select for update causes I/O to occur, so dropping the for update may improve performance.

    Thank for your valuable posts.

  2. thx depesz for good use case for advisory locks. btw, pg_try_advisory_xact_lock is just another reason to upgrade from 9.0 🙂

    @Luca, I think the “for update” locks the rows in case any misconfigured worker tries to delete / update them while the current worker is processing.

  3. @Luca:
    Filip is right. The purpose of “for update” is to prevent other clients from causing problems. This could be rogue (or sloppy) dba, previous version of script, or anything similar.

  4. A way to skip locked rows, SELECTing the first that’s not locked FOR UPDATE would be ideal.

    A “SELECT … FOR UPDATE SKIP LOCKED” has been discussed and described several times. I’ve tested a prototype patch I was given and it certainly makes all this a great deal easier. I’m glad to see others are interested in the problem too.

    Do you know of a good simple and widely used queuing system that might be easily adapted to use advisory locking and/or for key skip locked?

  5. @Craig Ringer, for SKIP LOCKED DATA, I guess you are referring to the work-in-progress patch I sent to the list in Feb 2012. Recently, I was contacted off-list by a PG contributor who has kindly volunteered to help test and review the patch. I have rejuvenated it for recent PG (it clashed with the recent lock strength changes). It doesn’t yet work correctly in certain scenarios (like when used from plpgsql), and we need to understand the cleanup scenarios better. We plan to submit this patch to a commit fest at some stage.

    Here is the latest work-in-progress patch: https://github.com/macdice/postgres/commit/f4c83e0f2aeba9eca877da7a4200f02ba852f5f8

    Working on this branch: https://github.com/macdice/postgres/tree/skip-locked-data

  6. Also with PGQ infrastructure we have possibility to check the second attempt of perform process_item() in destination DB deployed in another cluster if disaster happens while the main script executes. And PGQ has graceful cooperative consumer too.

  7. In PostgreSQL 9.3 and newer you could lock the rows using FOR NO KEY UPDATE, instead of FOR UPDATE, which causes less contention when you have foreign keys pointing to the queue table.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.