Waiting for 9.6 – Support parallel aggregation.

On 21st of March, Robert Haas committed patch:

Support parallel aggregation.
Parallel workers can now partially aggregate the data and pass the
transition values back to the leader, which can combine the partial
results to produce the final answer.
David Rowley, based on earlier work by Haribabu Kommi.  Reviewed by
Álvaro Herrera, Tomas Vondra, Amit Kapila, James Sewell, and me.

This is big.

Some time ago, we got parallel-able sequential scans. Then, two months later parallel joins.

Now, it's aggregates.

How does it work?

Let's create test table, and see what we can do with it:

        i AS id,
        now() - random() * '5 years'::INTERVAL AS ts,
        (random() * 100000000)::int4 AS NUMBER,
        repeat('x', (10 + 40 * random())::int4) AS txt
        generate_series(1, 10000000) i;
SELECT 10000000

First, lets see if the parallelization works, and if yes, how to spot it. Sanity check:

$ EXPLAIN analyze SELECT MIN(ts) FROM test;
                                                       QUERY PLAN
 Aggregate  (cost=226506.30..226506.31 ROWS=1 width=8) (actual TIME=1279.564..1279.565 ROWS=1 loops=1)
   ->  Seq Scan ON test  (cost=0.00..201505.84 ROWS=10000184 width=8) (actual TIME=0.032..664.339 ROWS=10000000 loops=1)
 Planning TIME: 0.624 ms
 Execution TIME: 1279.630 ms
(4 ROWS)
$ SET max_parallel_degree = 2;
$ EXPLAIN analyze SELECT MIN(ts) FROM test;
                                                                 QUERY PLAN
 Finalize Aggregate  (cost=154588.51..154588.52 ROWS=1 width=8) (actual TIME=476.593..476.593 ROWS=1 loops=1)
   ->  Gather  (cost=154588.29..154588.50 ROWS=2 width=8) (actual TIME=476.539..476.588 ROWS=3 loops=1)
         NUMBER OF Workers: 2
         ->  Partial Aggregate  (cost=153588.29..153588.30 ROWS=1 width=8) (actual TIME=471.074..471.074 ROWS=1 loops=3)
               ->  Parallel Seq Scan ON test  (cost=0.00..143171.43 ROWS=4166743 width=8) (actual TIME=0.040..240.595 ROWS=3333333 loops=3)
 Planning TIME: 0.148 ms
 Execution TIME: 477.425 ms
(7 ROWS)

OK. We see now that when parallelization is enable (max_parallel_degree > 0), plan is clearly different, and shows information about workers, gathering, and finalizing.

I tested it for min(ts). Can we also do average? sum?

$ EXPLAIN analyze SELECT avg(NUMBER) FROM test;
                                                       QUERY PLAN                                                        
 Aggregate  (cost=226506.30..226506.31 ROWS=1 width=32) (actual TIME=1241.411..1241.411 ROWS=1 loops=1)
   ->  Seq Scan ON test  (cost=0.00..201505.84 ROWS=10000184 width=4) (actual TIME=0.017..595.584 ROWS=10000000 loops=1)
 Planning TIME: 0.241 ms
 Execution TIME: 1241.460 ms
(4 ROWS)
$ EXPLAIN analyze SELECT SUM(LENGTH(txt)) FROM test;
                                                                 QUERY PLAN                                                                  
 Finalize Aggregate  (cost=165005.37..165005.38 ROWS=1 width=8) (actual TIME=730.439..730.439 ROWS=1 loops=1)
   ->  Gather  (cost=165005.15..165005.36 ROWS=2 width=8) (actual TIME=730.401..730.435 ROWS=3 loops=1)
         NUMBER OF Workers: 2
         ->  Partial Aggregate  (cost=164005.15..164005.16 ROWS=1 width=8) (actual TIME=728.536..728.536 ROWS=1 loops=3)
               ->  Parallel Seq Scan ON test  (cost=0.00..143171.43 ROWS=4166743 width=31) (actual TIME=0.012..217.970 ROWS=3333333 loops=3)
 Planning TIME: 0.177 ms
 Execution TIME: 731.256 ms
(7 ROWS)

OK. Looks like we can't do parallel avg(), but this can be worked around:

$ EXPLAIN analyze SELECT SUM(NUMBER)::float8 / COUNT(NUMBER)::float8 FROM test;
                                                                 QUERY PLAN                                                                 
 Finalize Aggregate  (cost=165005.37..165005.39 ROWS=1 width=8) (actual TIME=481.096..481.096 ROWS=1 loops=1)
   ->  Gather  (cost=165005.15..165005.36 ROWS=2 width=16) (actual TIME=481.047..481.076 ROWS=3 loops=1)
         NUMBER OF Workers: 2
         ->  Partial Aggregate  (cost=164005.15..164005.16 ROWS=1 width=16) (actual TIME=479.455..479.455 ROWS=1 loops=3)
               ->  Parallel Seq Scan ON test  (cost=0.00..143171.43 ROWS=4166743 width=4) (actual TIME=0.023..211.117 ROWS=3333333 loops=3)
 Planning TIME: 0.196 ms
 Execution TIME: 481.936 ms
(7 ROWS)

OK. Now, for the million dollar question: is it fast?

To test it, I will run the “avg" thing, like above, with various max_parallel_degree, 10 times with each, and will pick the best time for every max_parallel_degree. My desktop has four cores, so we should stop seeing progress after max_parallel_degree = 3.


max_parallel_degree best time:
0 976.813 ms
1 507.961 ms
2 352.671 ms
3 284.120 ms
4 274.086 ms
5 281.515 ms
6 280.768 ms
7 279.903 ms
8 279.399 ms
9 279.838 ms

This is pretty good. First of all – speedup is immense. Second of all – it's almost perfect parallelization. This is absolutely amazing.

The only gripe I have, is lack of documentation. While checking this, I found undocumented column (aggcombinefn) in pg_aggregate (not sure if it's related or not), and I didn't find any information on how to make my own aggregate parallelizable. Which would be interesting, as creating own aggregates is very simple in Pg, so it would be really cool if we could make it possible to be defined in such a way that parallelization would work.

Small gripe about docs aside it's a great patch, that will definitely be welcome in many cases. Thanks a lot.

2 thoughts on “Waiting for 9.6 – Support parallel aggregation.”

  1. Hi,

    Thanks for the post!
    Just a couple of notes: If I do 976.813 / 2 / 507.961, I come back to 96.15% efficiency for 1 worker, vs 0 workers. It’s good, but I found it to be 99.13% on some tests I did. Likely the reason is due to my test being longer running which will help drown the parallel worker setup costs a little more. It would be interesting to see if you find that too if you have, say, 10 times more rows.

    Also this percentage will start to tail off with more groups as the Finalize Aggregation stage, which runs only on the lead process, has to combine the groups from each worker. Also more workers likely means yet more partial groups too, which would be another (small) reason why the linear parallelisation won’t remain as high with higher worker counts, although this is really only going to be noticeable when many more groups are involved. Also there’s going to be other factors coming into play there too, CPU cache thrashing etc.

    For the AVG() aggregate, we will hopefully see some pending follow-on patches committed soon which allow many more aggregates to participate in parallelism.


  2. The avg() aggregate parallelization is on the final 9.6 release. Doing the test with the same dataset, I got:

    test=> explain analyze select avg(number) from test;
    Finalize Aggregate (cost=127517.23..127517.24 rows=1 width=32) (actual time=519.046..519.046 rows=1 loops=1)
    -> Gather (cost=127516.70..127517.21 rows=5 width=32) (actual time=517.739..519.034 rows=6 loops=1)
    Workers Planned: 5
    Workers Launched: 5
    -> Partial Aggregate (cost=126516.70..126516.71 rows=1 width=32) (actual time=512.356..512.356 rows=1 loops=6)
    -> Parallel Seq Scan on test (cost=0.00..121516.76 rows=1999976 width=4) (actual time=0.023..343.913 rows=1666667 loops=6)
    Planning time: 0.165 ms
    Execution time: 520.030 ms

Comments are closed.