Picking task from queue – revisit

Some time ago, I wrote blogpost about how to pick a task from queue, without locking.

It was written in 2013, and as such it couldn't reflect everything we have now in PostgreSQL – namely SKIP LOCKED – which was added to PostgreSQL over year later.

Two people mentioned SKIP LOCKED in comments, but it was before it was committed even to git repo. But now, we have, officially released, PostgreSQL version with this mechanism, so let's see what it can do.

I used scripts that were created for the old blogpost:

generate_random_data.py:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import string
import random
import time
 
 
def line_generator(use_time):
    size = 50
    chars = string.letters + string.digits
    random_string = ''.join(random.choice(chars) for x in range(size))
    priority = 49 - len(random_string.lstrip(random_string[0]))
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(use_time))
    return '%s\t%s\t%s\n' % (random_string, str(priority), timestamp)
 
start_time = time.time() - 100000000
f = file('/tmp/md5_data.txt', 'w')
for i in xrange(1, 10001):
    f.write(line_generator(start_time))
    start_time += 1
f.close()

and, processor-3.py:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5960 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('''
    select *
    from queue
    where pg_try_advisory_xact_lock(123, hashtext( priority::text || added_on::text ) )
    order by priority desc, added_on asc limit 1 for update
    ''')
    row = cur.fetchone()
    if row is None:
        break
    ignore = process_item(row)
    processed = processed + 1
    cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)

Table was created, and populated, as previously, with:

$ CREATE TABLE queue (
    job text,
    priority int4,
    added_on timestamptz
);
 
$ \copy queue FROM /tmp/md5_data.txt
 
$ CREATE INDEX queue_idx ON queue (priority DESC, added_on ASC);

Sanity checking:

=$ for i in {1..8}; do ./processor-3.py & done; time wait
[1] 16248
[2] 16249
[3] 16250
[4] 16251
[5] 16252
[6] 16253
[7] 16254
[8] 16255
processed 1249 rows.
processed 1251 rows.
processed 1248 rows.
processed 1251 rows.
processed 1250 rows.
processed 1252 rows.
processed 1249 rows.
processed 1250 rows.
[1]   Done                    ./processor-3.py
[2]   Done                    ./processor-3.py
[3]   Done                    ./processor-3.py
[4]   Done                    ./processor-3.py
[5]   Done                    ./processor-3.py
[6]   Done                    ./processor-3.py
[7]-  Done                    ./processor-3.py
[8]+  Done                    ./processor-3.py
 
real    0m15.792s
user    0m1.056s
sys     0m0.456s

Looks like I processed all 10000 rows, and it took almost 16 seconds.

So, now let's see how it would work using SKIP LOCKED.

I will need to create new processor, let's name it “processor-4.py", with this source:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5960 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('''
    select *
    from queue
    order by priority desc, added_on asc limit 1 for update skip locked
    ''')
    row = cur.fetchone()
    if row is None:
        break
    ignore = process_item(row)
    processed = processed + 1
    cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)

Running it with 8 worker processes looked like:

=$ for i in {1..8}; do ./processor-4.py & done; time wait
[1] 17107
[2] 17108
[3] 17109
[4] 17110
[5] 17111
[6] 17112
[7] 17113
[8] 17114
processed 1252 rows.
processed 1249 rows.
processed 1248 rows.
processed 1252 rows.
processed 1249 rows.
processed 1249 rows.
processed 1251 rows.
processed 1250 rows.
[1]   Done                    ./processor-4.py
[2]   Done                    ./processor-4.py
[4]   Done                    ./processor-4.py
[6]   Done                    ./processor-4.py
[7]-  Done                    ./processor-4.py
[3]   Done                    ./processor-4.py
[5]-  Done                    ./processor-4.py
[8]+  Done                    ./processor-4.py
 
real    0m15.579s
user    0m1.100s
sys     0m0.404s

Time difference is negligible, but the biggest benefit is that we can leave advisory locks for other purposes. And this is clearly a win.

3 thoughts on “Picking task from queue – revisit”

  1. People may wonder how many TPS PostgreSQL can handle when it’s used as a message queue. Lets consider a case when our working set fits into memory. Here are some benchmark results on a more or less regular server (12 cores, 24 Gb ram, HDD)

    schema.sql:

    CREATE TABLE test (k INT PRIMARY KEY, v INT);
    INSERT INTO test SELECT generate_series(0, 1000), 1;

    postgresql.conf:

    max_prepared_transactions = 100
    shared_buffers = 3GB
    wal_level = hot_standby
    wal_keep_segments = 128
    max_connections = 600
    listen_addresses = '*'
    # we can loose some of last enqueued messages but not an entire database
    # thanks to Stas Kelvich for this hint
    synchronous_commit=off
    # don't use autovacuum on a message queue server!
    # or at least do VACUUM on schedule
    autovacuum = off
    

    kv.pgbench:

    \SET maxid 1000
    \setrandom k 1 :maxid
    \setrandom v 1 :maxid
    UPDATE test SET v = :v WHERE k = :k;

    benchmark.sh:

    #!/bin/sh
    pgbench -c 24 -j 12 -T 60 -P 1 -f kv.pgbench -h (ip) (db)

    I got 104 000 TPS which sometimes drops to 90K TPS (when buffers are synced to disk). Without `synchronous_commit=off` I got 1400 TPS, but I doubt that some message queue that honestly fsync’s every enqueued message will show better results.

Comments are closed.