The secret ingredient in the webscale sauce

( title, in case you don't know, comes from excellent “movie" about databases )

The secret ingredient is of course sharding.

Can we do sharding in PostgreSQL? First, let's define what sharding is:

Wikipedia shows nice, correct, and absolutely unhelpful answer:

A database shard is a horizontal partition in a database or search engine.

But later on in the text we find:

Horizontal partitioning splits one or more tables by row, usually within a single instance of a schema and a database server.

So, now we have something we can understand – sharding is splitting table (or tables) data into multiple instances, each having subset of the data, but the subset is defined as subset of rows, and not columns – i.e. all shards have all columns, just not all rows.

The simplest possible example, is splitting table (let's say users) and keeping users with odd ids on server #1 and even ids on server #2. And then we'd need some logic so that we'll get the data that we need regardless on which shard it exists.

Please note that sharding has nothing to do with failover – each shard should have its own failover capabilities (for example using normal master-slave replication).

In its basic form sharding doesn't need anything from the database – you just create two instances, preferably on two different servers, create the same table in both databases, and in your app you decide what goes where.

It will work. But, given that we (I and readers of this blog) are interested in database solutions – is there any way to do it in database itself?

Well – it just so happens that yes. It is possible.

Some time ago Skype released its Skytools. One of released tools was PL/Proxy. PL/Proxy is (as name suggests) Procedural Language – that is – language to write stored procedures in.

So how does it help us with sharding? Lets try.

First of all, I will need 5 instances of PostgreSQL – four of them will be shards, and one of them will be used for routing. What that really is let me explain in a bit.

To set it up we will need PL/Proxy itself:

=# wget
=# tar xvzf plproxy-2.3.tar.gz
=# cd plproxy-2.3/
=# make
=# make install

Afterwards, we got two new files:

  • /opt/pgsql-9.1.1/lib/ – implementation of the plproxy itself
  • /opt/pgsql-9.1.1/share/contrib/plproxy.sql – installation sql

It is important to note that PL/Proxy has to be installed only on the router machine, not on shards themselves (i.e. it doesn't hurt to have it installed, it's just not needed).

As for users (these will need to be configured) – on the routing machine we'll use user “app", and on the shards we'll have user “plproxy".

Let's create some test tables:

=$ mkdir router shard{1,2,3,4}
=$ for a in router/ shard{1,2,3,4}; do initdb -D $a; done

Since all of them are on the same physical machine, I will need to change some settings so that they will work without breaking each other. Namely – port:

=$ for a in 1 2 3 4
    export a
    perl -pi -e 's/\A \s* (?: [#] \s* )? port \s* = \s*.*/port = 600$ENV{a}/x' shard$a/postgresql.conf

Thanks to this, I have my router on default port (5910 in my case), and shards on port 6001, 6002, 6003 and 6004.

Now just some quick logging changes so that starting pg will not mess with my terminal:

=$ perl -pi -e '
  s/\A \s* (?: [#] \s* )? listen_addresses \s* = \s*.*/listen_addresses = \047*\047/x;
  s/\A \s* (?: [#] \s* )? log_destination \s* = \s*.*/log_destination = \047stderr\047/x;
  s/\A \s* (?: [#] \s* )? logging_collector \s* = \s*.*/logging_collector = on/x;
  s/\A \s* (?: [#] \s* )? log_min_duration_statement \s* = \s*.*/log_min_duration_statement = 0/x;
  s/\A \s* (?: [#] \s* )? log_line_prefix \s* = \s*.*/log_line_prefix = \047\%m \%u\@\%d \%p \%r \047/x;
  s/\A \s* (?: [#] \s* )? log_temp_files \s* = \s*.*/log_temp_files = 0/x;
  s/\A \s* (?: [#] \s* )? (log_checkpoints|log_connections|log_disconnections|log_lock_waits) \s* = \s*.*/$1 = on/x;
' */postgresql.conf

and I can start them all:

=$ for a in shard* router/; do pg_ctl -D $a start; done

With Pg working, I can setup databases for tests. First the shards:

=$ for a in 600{1,2,3,4}; do psql -p $a -d postgres -c "create user plproxy with password 'whatever'"; done
=$ for a in 600{1,2,3,4}; do psql -p $a -d postgres -c "create database shard_$a with owner plproxy"; done

Now. All shards have database named “shard_port" which is owned by plproxy user. It could be owned by anyone, but since we want to access it via plproxy – it makes some sense to give plproxy user all privileges – not very secure, but definitely convenient.

=$ psql -d postgres -c "create user app with password 'whatevs'"
=$ psql -d postgres -c "create database app with owner app"

Above was sent to router database – so we have in there app user owning app database.

So far – it's merely preparation. But we're getting fast to usable examples.

For our tests, let's create “users" table, with very simple structure:

    id serial PRIMARY KEY,
    username text UNIQUE,
    last_updated timestamptz

and create it on all shards:

=$ for a in 600{1,2,3,4}; do psql -p $a -U plproxy -d shard_$a -f create.users.table.sql; done

One note though – if we'd insert just like that to them, we'd end up with users having the same id (in different shards). To alleviate the problem, let's modify the sequences:

=$ FOR a IN 1 2 3 4; do psql -p 600$a -U plproxy -d shard_600$a -c "alter sequence users_id_seq increment by 4 restart with $a"; done

This made the sequences provide different numbers:

=$ for shard in 600{1,2,3,4}
    echo "$shard:"
    psql -p $shard -U plproxy -d shard_$shard -c "select array( select nextval( 'users_id_seq' ) from generate_series(1,10))" -qAtX

As you can see the ids assigned by sequence do not overlap. Of course it is not fault-proof – badly written app can force the id in bad way and cause problems – but the whole idea lies on app being sensible, and not breaking things intentionally.

At this moment we have 4 shards, each with users table, where ids are assigned in such a way that they will not conflict between shards.

Now we need some work to be done in the router. Luckily it's not much of a work:

=$ psql -U postgres -d app -f /opt/pgsql-9.1.1/share/contrib/plproxy.sql

That creates basic objects. Now we just need to configure it – to let plproxy know where the shards are, and how to log there:

OPTIONS (connection_lifetime '1800',
         p0 'dbname=shard_6001 host= port=6001',
         p1 'dbname=shard_6002 host= port=6002',
         p2 'dbname=shard_6003 host= port=6003',
         p3 'dbname=shard_6004 host= port=6004'
        OPTIONS (USER 'plproxy', password 'whatever');

Above should be done using superuser account.

This basically lists connection details, when connecting from router to shards.

First thing we want to test is that we even can do anything over PL/Proxy. Like – getting some settings. To do so, on router I create a function:

CREATE FUNCTION shard_setting(setting_name text) RETURNS SETOF text AS $$
    CLUSTER 'appplproxy';
    TARGET pg_catalog.current_setting;
$$ LANGUAGE plproxy;
GRANT EXECUTE ON FUNCTION shard_setting( text ) TO app;

And now, I can connect as app, and do:

$ SELECT * FROM shard_setting('port');
(4 ROWS)

This is pretty cool – just got port numbers from all shards, via router.

But what exactly does the function mean?

  • CLUSTER ‘appplproxy'; – defines which cluster to use. We can have multiple clusters defined, and connect to them all via the same router.
  • RUN ON ALL; – means that the function on shards will be run on all of them, in parallel, and you'll get all of the rows together.
  • TARGET pg_catalog.current_setting; – by default when calling (in router) function x, the same function (i.e. with the same name) will be called on shards. But in our case, we can't make (in router) new version of pg_catalog.current_setting, so instead we change the target function

One thing that could catch your attention is the parallel thing.

Let's test it. I will create simple function, on all shards:

CREATE FUNCTION test_parallel( OUT shard_port int4, OUT sleep_time FLOAT8 ) RETURNS setof record AS $$
    shard_port  := current_setting( 'port' )::int4;
    sleep_time := random() * 5;
    perform pg_sleep( sleep_time );
$$ LANGUAGE plpgsql;

This function, when run directly on shard returns:

=$ FOR a IN 600{1,2,3,4}; do TIME psql -p $a -U plproxy -d shard_$a -qAtX -c "select * from test_parallel()"; done
REAL    0m1.647s
USER    0m0.000s
sys 0m0.000s
REAL    0m0.402s
USER    0m0.000s
sys 0m0.000s
REAL    0m4.642s
USER    0m0.000s
sys 0m0.004s
REAL    0m0.463s
USER    0m0.000s
sys 0m0.000s

With function ready on shards, we can test parallel call via router. For this we'll add another function on router:

CREATE FUNCTION test_parallel( OUT shard_port int4, OUT sleep_time FLOAT8 ) RETURNS setof record AS $$
    CLUSTER 'appplproxy';
$$ LANGUAGE plproxy;
GRANT EXECUTE ON FUNCTION test_parallel( ) TO app;

I'll run it with timing information:

$ \timing
Timing IS ON.
$ SELECT * FROM test_parallel();
 shard_port |    sleep_time
       6001 |  4.79576680576429
       6002 | 0.104651227593422
       6003 |  1.38062899466604
       6004 |  1.76556967897341
(4 ROWS)
TIME: 4803.826 ms

It's great – I got the data, it even looks ordered by shard, and it clearly ran in parallel – if it was sequential the time would be ~ 3 second longer.

So, let's add some more interesting function, this time actually using the shard for their intended purpose: splitting data across instances.

To run on single shard, instead of many, we should provide a way for PL/Proxy to choose which shard to use. This is used by providing an integer that is used for choosing shard.

It works like this: in your PL/Proxy function you tell PL/Proxy how to obtain single integer value that represents your sharding criteria. Then, based on this integer, PL/Proxy will choose shard.

PostgreSQL has simple function – hashtext() which returns hash of given text. Looks like:

$ SELECT x, hashtext(x) FROM ( VALUES ('depesz'), ('postgresql'), ('plproxy') ) AS q (x);
     x      |  hashtext
 depesz     | 1671716498
 postgresql | 2058439964
 plproxy    |  -47058505
(3 ROWS)

It just so happens that the hash is integer, so it's perfect for our task. And that's how we can use it. First, of course, we need worker function on shards:

CREATE FUNCTION make_new_user( IN p_username TEXT, OUT created BOOL, OUT id INT4 ) RETURNS setof record AS $$
    o_created ALIAS FOR created;
    o_id ALIAS FOR id;
            INSERT INTO users (username, last_updated) VALUES (p_username, now()) RETURNING id INTO o_id;
            created := TRUE;
            RETURN NEXT;
        EXCEPTION WHEN unique_violation THEN
            SELECT INTO o_id FROM users WHERE username = p_username;
            IF FOUND THEN
                created := FALSE;
                RETURN NEXT;
            END IF;
$$ LANGUAGE plpgsql;

with this created on all shards, we make another one on router:

CREATE FUNCTION make_new_user( IN p_username TEXT, OUT created BOOL, OUT id INT4 ) RETURNS setof record AS $$
    CLUSTER 'appplproxy';
    RUN ON hashtext( p_username );
$$ LANGUAGE plproxy;
GRANT EXECUTE ON FUNCTION make_new_user( text ) TO app;

All looks to be in place, so we can now add some users:

=$ psql -U app -p 5910 -c "select * from make_new_user('depesz')"
 created | id
 t       | 43
(1 ROW)
=$ psql -U app -p 5910 -c "select * from make_new_user('whatever')"
 created | id
 t       | 42
(1 ROW)
=$ psql -U app -p 5910 -c "select * from make_new_user('whatevs')"
 created | id
 t       | 46
(1 ROW)
=$ psql -U app -p 5910 -c "select * from make_new_user('postgresql')"
 created | id
 t       | 41
(1 ROW)
=$ psql -U app -p 5910 -c "select * from make_new_user('plproxy')"
 created | id
 t       | 44
(1 ROW)
=$ psql -U app -p 5910 -c "select * from make_new_user('magic')"
 created | id
 t       | 55
(1 ROW)

Of course it also handles cases where the user already exists:

=$ psql -U app -p 5910 -c "select * from make_new_user('depesz')"
 created | id
 f       | 43
(1 ROW)

You can ask yourself – OK, but how can I get user data by ID? I cannot use “RUN ON hashtext( user_id )" because generated hash would make PL/Proxy use, usually, wrong shard.

Solution is very simple – just make it RUN ON ALL – 3 shards will not return any data, and one will.

Shard function:

CREATE OR REPLACE FUNCTION get_user_by_id( IN p_user_id INT4, OUT username TEXT, OUT last_updated TIMESTAMPTZ ) RETURNS setof record AS $$
    temprec RECORD;
    FOR temprec IN SELECT * FROM users WHERE id = p_user_id LOOP
        username := temprec.username;
        last_updated := temprec.last_updated;
        RETURN NEXT;
$$ LANGUAGE plpgsql;

Router function:

CREATE FUNCTION get_user_by_id( IN p_user_id INT4, OUT username TEXT, OUT last_updated TIMESTAMPTZ ) RETURNS setof record AS $$
    CLUSTER 'appplproxy';
$$ LANGUAGE plproxy;

And now, I can:

$ SELECT * FROM get_user_by_id( 44 );
 username |         last_updated
 plproxy  | 2011-12-02 13:33:40.710641+01
(1 ROW)

Of course it's not ideal solution, but will work. If I'd use everywhere natural keys, and would shard using natural keys – it wouldn't be a problem, as the getter function would also use “RUN ON hashtext(…)".

Now you might wonder – whether we can somehow tell which hash will cause data to go to given partition. And yes, we can. It's trivial.

Let's take user ‘plproxy'. hashtext of ‘plproxy' is -47058505. We have 4 partitions, so we take the integer returned by hashtext, and do bitwise and with 3 (number of partitions minus 1).

$ SELECT hashtext('plproxy') & 3;
(1 ROW)

This means that the data is on 3rd (but counting from 0, so actually it's 4th) partition. Based on the FOREIGN SERVER configuration we know that 4th shard is the one on port 6004, so we can:

=$ psql -U plproxy -p 6004 -d shard_6004 -c "select * from users where username = 'plproxy'"
 id | username |         last_updated
 44 | plproxy  | 2011-12-02 13:33:40.710641+01
(1 ROW)

And here it is.

This information (how to convert integer to shard number) will be important if/when you'll want to add new shards.

PL/Proxy itself doesn't support migrations, so you'll have to do it on your own, manually. This is complicated due to number of reasons (there can be concurrent writes from app when you're migrating, it will require changes to cluster configuration), so developers suggest (and I think that it's an excellent idea to simply create more shards than you'll need.

If you have 3 servers – create 32 shards, and just make 10 of them to go to server #1, another eleven to #2, and final eleven to #3 – each shard can be simply different database in the same PostgreSQL instance. And when you'll need to add more HW power, you'll just copy whole shard database(s) to another machine and change config – no need for complicated merging/swapping rows between shards.

Thanks to sharded approach even operations that require full table scans are faster.

For example – let's consider getting count of rows.

Normally you'd need something like: select count(*) from table;. With PL/Proxy you can do:

  • on every shard you install function that does the count(*) on this shards partition of the table
  • on router you create PL/Proxy function that will RUN ON ALL function to get the counts from shards
  • on router you create SQL or pl/PgSQL function that will run the PL/Proxy function and aggregate the results

In this way, application can call the SQL (or pl/PgSQL) function on router, which in turn will call PL/Proxy, which in turn will run counts on all shards in parallel. Does it remind you anything? <cough>map & reduce</cough>?

The other thing is – RUN ON function() can run any function. Usually you will want some function that does more or less equal distribution across the shards. But what if you'd want really equal? Or some specific?

Consider: RUN ON nextval(‘some_sequence'); – First call will run it on partition 1, next on 2, next on 3, next on 0, and so on.

Or consider: RUN ON extract(epoch from now())::int4 / 86400 (this will need to be wrapped in a function, but it's just idea) – this would change the shard daily.

The cool thing is that with some tinkering I think it would be possible to use many shards within the same database, to use parallel “RUN ON ALL" to speed up time consumming jobs by running them in smaller packets. For example:

OPTIONS (connection_lifetime '1800',
         p0 'dbname=depesz host= port=5910 application_name=s1',
         p1 'dbname=depesz host= port=5910 application_name=s2',
         p2 'dbname=depesz host= port=5910 application_name=s3',
         p3 'dbname=depesz host= port=5910 application_name=s4',
         p4 'dbname=depesz host= port=5910 application_name=s5',
         p5 'dbname=depesz host= port=5910 application_name=s6',
         p6 'dbname=depesz host= port=5910 application_name=s7',
         p7 'dbname=depesz host= port=5910 application_name=s8'
        OPTIONS (USER 'depesz', password 'whatever');
CREATE OR REPLACE FUNCTION shard_get_length( OUT shard text, OUT len int4 ) RETURNS setof record AS $$
    shard := current_setting( 'application_name' );
    IF shard = 's1' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id < 1250000;
    elsif shard = 's2' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id >= 1250000 AND id < 2500000;
    elsif shard = 's3' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id >= 2500000 AND id < 3750000;
    elsif shard = 's4' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id >= 3750000 AND id < 5000000;
    elsif shard = 's5' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id >= 5000000 AND id < 6250000;
    elsif shard = 's6' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id >= 6250000 AND id < 7500000;
    elsif shard = 's7' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id >= 7500000 AND id < 8750000;
    elsif shard = 's8' THEN
        SELECT SUM(LENGTH(payload)) INTO len FROM testit WHERE id >= 8750000;
        shard := 'WHAT?!';
        len := 0;
    END IF;
$$ LANGUAGE plpgsql;
CREATE FUNCTION get_length_helper(OUT shard text, OUT len int4) RETURNS SETOF record AS $$
    CLUSTER 'testit';
    TARGET public.shard_get_length;
$$ LANGUAGE plproxy;
SELECT SUM( len ) FROM get_length_helper();

And now I can either get the data directly:

$ SELECT SUM(LENGTH(payload)) FROM testit;
(1 ROW)
TIME: 6240.375 ms

Or via PL/Proxy multiplicator:

$ SELECT get_length();
(1 ROW)
TIME: 3008.539 ms

(both of the times are with filled caches).

Of course the approach showed above is just a proof of concept – in reality the ranges would be configurable in a nicer way, but it just shows what is possible to achieve with PL/Proxy.

Final thing to say are the drawbacks.

Well – PL/Proxy on its own has only one drawback – it's a new thing for most of the people, and it takes some getting used to to understand it well.

It does require writing everything using stored procedures, but this approach is (in my, no so humble, opinion) very good anyway.

Of course as every other approach at sharding it has one important drawback – you can't have constraints reaching across shards. So there is no way to declare some field unique globally, or add FOREIGN KEY that would make sure that some row in some table in other shard will exist.

Still – this is pretty good idea for a number of usecases, and I feel kind of bad that it's not known more. Maybe now it will be.

Just in case you'd wonder – what will happen if we'll have so many queries that the router will not handle them all – after all, all queries go through router. Solution is simple. Add more router servers, with the same configuration and functions, and make application choose randomly (or via some configuration) which router to connect to. Simple as that.

17 thoughts on “The secret ingredient in the webscale sauce”

  1. If you’re interested in sharding – you may also want to try giving ScaleBase ( a chance – they build a transparent sharding solution for relational databases.

  2. @Liran:
    i’m very tempted to assume it’s a spam comment, but I’ll give you benefit of the doubt. scalebase is not relevant, because they don’t support postgresql.

  3. @DEPESZ – Not intended as spam – just as a another resource. PostgreSQL is something we’re looking into for Scalebase. Basically, platform support depends on market share and a customer desire to utilize sharding – so interest in the space in general is encouraging.

  4. @Gj:
    you can, and it makes sense, to partition within the same database. Shards are more or less the same thing, but on separate servers. At the very least that’s how I see the difference.

  5. You wrote
    “Of course it is not fault-proof – badly written app can force the id in bad way and cause problems – but the whole idea lies on app being sensible, and not breaking things intentionally.”., we can do some CHECKS on tables like

    CREATE TABLE test (test INTEGER CHECK (test % 3 = 0));
    CREATE TABLE test (test INTEGER CHECK (test % 3 = 1));

    It will be slower on inserts but no one nothing break in this table.

  6. @Orcus:
    Sure. It’s possible. But I’m not sure if adding such checks makes sense in long term.

  7. Hi,

    Very very nice article. Ive been kind of on cross roads with this for a while. One thing that was *worrying* me was that the number of “slave” shards required had to be a power of 2 (I am desperately hoping I read that wrong and that it is not true). Is there a way to get around that?


  8. @Depesz: Oo, I know this one: Because you never have ANY idea when you design the database how many orders of magnitude you’re going to have to scale.

    When we rebuilt the AOL mail system in 1994, 128 shards seemed plenty for our 75,000 simultaneous users. But once you have 16 big-iron servers cranking at over 50% utilization, you can’t actually pull one shard off to another server; the overall speed of the system limits demand, and if you make one server slightly faster, and one slightly slower, the increased demand (aka “Field of Dreams effect: if you build it, they will come) for the faster server will bring the rest of the system to its knees. The only way to scale is in actual powers-of-2 physical servers.

    Sadly, your network only supports 16 machines. You get the manufacturer to build you a special network interface that supports 32, raid their test lab for the hardware, and swear that next time, you will not make this mistake, because you will have 4096 shards.

    You then migrate your shards to the new system one-by-one, trying very very hard to deal with the fact that (a) old hash buckets don’t map onto new hash buckets because your old hashing algorithm didn’t produce keys evenly enough for 4096 buckets, and (b) your servers are already at 0% idle and can’t really afford a batch job on top of that, and (c) you’re global and there is no “off-peak”.

    I imagine those 4096 shards told a very similar story by now, and this year they migrated off to a newer system yet. I don’t know how many shards, but luckily for AOL, they probably don’t have to worry about growth anymore.

    TL;DR: You might need about two orders of magnitude more shards than you think you will EVER need in your WILDEST dreams, shards tend to turn into physical hardware whether you like it or not, and if you didn’t pick the right number up front, you are in for a very painful few years.

  9. very informative artile, especially with examples considering there is very limited info on the web around pl/proxy usage.

    though i have one question, for which i couldnt find any answer anywhere. When PLProxy makes connection to real DB (where storage is or call them shards), does it make a connection every time? or there is inbuilt connection pool within PLProxy for all DBs that it connects to?
    i do understand the concept of PGBouncer sitting in between the PL Proxy and real DBs to avoid a possibility where all plproxy nodes makes connections to all shard/DB nodes but PG bouncer runs as a seperate process outside PLProxy node, which means that from PLProxy still for every outgoing connection a connection to PGBouncer is made on demand (which can be improved by having a precreated set of connections picked from a pool)

  10. If I issue a query ( select ) with appropriate where clause will it be redirected to appropriate shard ? or I have to RUN ON ALL ?

  11. @Anonymous:
    PL/Proxy is language for functions. Normal queries (like: select … from table) do not pass through pl/proxy, so it’s not routed.

    Long story short – you have to encapsulate your query in a function, and when doing it you will specify whether it’s run on all or something else.

  12. So, if i did not specify RUN ON ALL or shard id, will the pl/proxy select appropriate shard ?

    Ex: Procedure
    CREATE OR REPLACE FUNCTION dynamic_query(q text)
    CLUSTER ‘mycluster’;
    $$ LANGUAGE plproxy;

    Query :
    SELECT * FROM dynamic_query(‘SELECT id, username FROM sometable’) AS (id integer, username text);


    My intention is to do the CRUD operations where pl/proxy do the shard selection ? Is this possible ? If not does postgres has similar tools to do automatic sharding ?

  13. To be honest, I’m not sure whaty is the default, btu I know that plproxy will *not* dig into your text-supplied query to find out whether there is anything in it that could be used for limiting # of shards.

Comments are closed.