One of the key improvements in Postgres 9.6 has been the ability to distribute a query to multiple worker processes. Yet, with a few tricks this has been feasible almost forever at least for the really heavy stuff.

I prefer to keep the database specific stuff in the database and application related logic in the application. For me that translates to having almost all database access implemented as database functions. That allows for later optimization without touching the application logic. It does not work out always but more often than not. So, for me it's worthwhile.

That said, let's assume you have a query that takes 3 seconds and you know it can be split into 3 parts where every part takes about 1 seconds. Here is an example:

WITH num(i) AS (
    SELECT generate_series(1,3000)
)
, div(i) AS (
    SELECT generate_series(1,3000)
)
SELECT num.i
  FROM num
 CROSS JOIN div
 GROUP BY 1
HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
 ORDER BY 1

This is a pretty inefficient way to find the prime numbers between 1 and 3000. We build the cross join of num and div. That means the result has for each element of num all elements of div. Now, for each of these 9000000 rows num.i is divided by div.i and the remainder is tested to be zero. If so, num.i is divisible by div.i. For a prime number that happens only when div.i = 1 or div.i = num.i that is 2 times.

Since the actual prime numbers do not matter they are sent to /dev/null:

postgres=# \timing on
Timing is on.
postgres=# \o /dev/null
postgres=# WITH num(i) AS (
postgres(#     SELECT generate_series(1,3000)
postgres(# )
postgres-# , div(i) AS (
postgres(#     SELECT generate_series(1,3000)
postgres(# )
postgres-# SELECT num.i
postgres-#   FROM num
postgres-#  CROSS JOIN div
postgres-#  GROUP BY 1
postgres-# HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
postgres-#  ORDER BY 1
postgres-# ;
Time: 3233.566 ms (00:03.234)

The query takes roughly 3 seconds.

The beauty of this query is that it can be divided into separate jobs testing the numbers from 1 to 1000, from 1001 to 2000 and from 2001 to 3000. Here is for instance the middle one:

postgres=# WITH num(i) AS (
postgres(#     SELECT generate_series(1001,2000)
postgres(# )
postgres-# , div(i) AS (
postgres(#     SELECT generate_series(1,3000)
postgres(# )
postgres-# SELECT num.i
postgres-#   FROM num
postgres-#  CROSS JOIN div
postgres-#  GROUP BY 1
postgres-# HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
postgres-#  ORDER BY 1
postgres-# ;
Time: 1107.758 ms (00:01.108)

Now, the goal is to have Postgres execute the 3 jobs simultaneously.

DBLINK

The key ingredient to get there is the dblink extension:

CREATE EXTENSION IF NOT EXISTS dblink;

It provides a couple of functions that allow a backend to connect to another Postgres database. The most prominent of these functions is called just dblink. It is passed 2 parameters, a connection string specifying which database to connect to and the SQL query.

Example:

SELECT *
  FROM dblink('dbname=postgres port=5434', $$
           SELECT setting
             FROM pg_settings
            WHERE name='port'
       $$) t(port int)

This could be the result:

 port
------
 5434
(1 row)

The connection string I am using in this example connects to the same database where the dblink function is called.

Note, dblink returns a set of RECORDs. That means you have to describe the result type each time it is called (the t(port int) part).

Nowadays, dblink may be a little bit frowned upon for being old-fashioned. For a couple of years now, Postgres has been coming with foreign data wrappers that allow you to use tables and views residing in external databases in local queries just like local tables and views. That is true. And foreign data wrappers are an important feature. Yet, I think, dblink has still its niche. It gives you better control how exactly the query on the remote machine looks like and can give you much better performance in more complex situations. Although, the Postgres FDW has improved a lot in recent versions.

Named connections

Apart from the dblink function the extension provides more low-level functions. The following query, for instance, opens 3 connections to the same database naming them cn1, cn2 and cn3:

SELECT dblink_connect(cn, 'dbname=postgres port=5434')
  FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn);

These 3 additional backends can be seen now in the pg_stat_activity view:

postgres=# select pid, state, query from pg_stat_activity order by backend_start;
  pid  | state  |                                 query
-------+--------+------------------------------------------------------------------------
 32544 | active | select pid, state, query from pg_stat_activity order by backend_start;
  1084 | idle   | 
  1085 | idle   | 
  1086 | idle   | 
(4 rows)

Or you can use dblink_get_connections() to return a list of connections used in the current session.

postgres=# select dblink_get_connections();
 dblink_get_connections
------------------------
 {cn3,cn2,cn1}
(1 row)

Here is how these connections can be used:

SELECT *
  FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn)
 CROSS JOIN dblink(conn.cn, $$
           SELECT pg_backend_pid()
       $$) t(pid int)

Here is the result:

 cn  | pid
-----+------
 cn1 | 1084
 cn2 | 1085
 cn3 | 1086
(3 rows)

As expected, we get the same PIDs as from pg_stat_activity.

Note, a named connection like the 3 above is closed only when the current backend is terminated or when they are explicitly closed by dblink_disconnect. It is not automatically closed at the end of a transaction or statement. Also, attempting to open a named connection where the name is already in use results in an error.

Sending asynchronous queries

The next functions that I want to introduce are dblink_send_query and dblink_get_result. The former sends a query on a named connection to the backend and comes back immediately not waiting for the backend to finish. The latter can then be used to wait for and read the result. Dblink has a few more functions in this context. But these two are enough to reach our goal.

SELECT *
  FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn)
 CROSS JOIN dblink_send_query(conn.cn, $$
           SELECT pg_backend_pid()
       $$) t;

Well, that looks pretty similar to the simple dblink call above. Only, dblink_send_query returns 1 or 0 whether or not the query has been sent:

 cn  | t 
-----+---
 cn1 | 1
 cn2 | 1
 cn3 | 1
(3 rows)

Fetching the result

Now, dblink_get_result can be used to fetch the result. Note, this function now returns a set of RECORDs. So, the expected set of columns must be described here.

SELECT *
  FROM (VALUES ('cn1'), ('cn2'), ('cn3')) conn(cn)
 CROSS JOIN dblink_get_result(conn.cn) t(pid int)
 cn  | pid
-----+------
 cn1 | 1084
 cn2 | 1085
 cn3 | 1086
(3 rows)

Putting it all together

CREATE OR REPLACE FUNCTION primes (p_limit BIGINT, p_step BIGINT)
RETURNS TABLE (n BIGINT) AS $def$
DECLARE
    v_q RECORD;
BEGIN
    FOR v_q IN
        WITH RECURSIVE intv AS (
            SELECT 1::BIGINT AS s, least($2, $1) AS e
             UNION ALL
            SELECT e + 1, least(e + $2, $1)
              FROM intv
             WHERE e < $1
        )
        , jobs AS (
            SELECT 'cn' || row_number() OVER () AS cn,
                   intv.s, intv.e
              FROM intv
             ORDER BY intv.s
        )
        , conn AS (
            SELECT *,
                   1/(dblink_connect(cn, 'dbname=postgres port=5434')='OK')::INT AS connstatus
              FROM jobs
        )
        SELECT conn.*, 1/q.status AS sendstatus
          FROM conn
         CROSS JOIN LATERAL dblink_send_query(conn.cn,
               $$
                   WITH num(i) AS (
                       SELECT generate_series($$ || conn.s || $$,
                                              $$ || conn.e || $$)
                   )
                   , div(i) AS (
                       SELECT generate_series(1,$$ || $1 || $$)
                   )
                   SELECT num.i
                     FROM num
                    CROSS JOIN div
                    GROUP BY 1
                   HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
                    ORDER BY 1
               $$) q(status)
    LOOP
        RETURN QUERY 
        SELECT tb.i
          FROM dblink_get_result(v_q.cn) tb(
                   i BIGINT
               );
        PERFORM dblink_disconnect(v_q.cn);
    END LOOP;
END
$def$ LANGUAGE plpgsql;

The 1st part, intv, of the recursive CTE computes a table like

  s   |  e
------+------
    1 | 1000
 1001 | 2000
 2001 | 3000
(3 rows)

This could have been done easier with generate_series(). But in more complex situations I found the CTE approach more flexible. The jobs part then adds cn1, cn2 etc to the table.

Now, conn opens the connections. dblink_connect() returns the string OK on success. If the connection cannot be established, the whole query should fail. Hence, the resulting string is compared with OK and the resulting BOOLEAN converted to INT. That gives either 0 for FALSE or 1 for TRUE. Now a division by 0 throws an exception making the entire query fail.

The main query then uses dblink_send_query to send the query to the backend. The same division by zero approach is used to make sure it has been sent successfully.

When this query is done, we have a result set with several named dblink connections and each of the backends is processing part of the problem. A plpgsql for-loop is then used to iterate through this set and fetch the results from the backends. It also closes the connections on the way.

Does it work?

First we have to check if the result matches the one of the original query. To do that I usually work with temp tables.

CREATE TEMP TABLE orig(i) AS
WITH num(i) AS (
    SELECT generate_series(1,3000)
)
, div(i) AS (
    SELECT generate_series(1,3000)
)
SELECT num.i
  FROM num
 CROSS JOIN div
 GROUP BY 1
HAVING count(CASE WHEN num.i % div.i = 0 THEN 1 END) = 2
 ORDER BY 1

and the function result for various step sizes:

postgres=# CREATE TEMP TABLE a(i) AS select * from primes(3000,3000);
SELECT 430
Time: 3113.399 ms (00:03.113)
postgres=# CREATE TEMP TABLE b(i) AS select * from primes(3000,1500);
SELECT 430
Time: 1599.998 ms (00:01.600)
postgres=# CREATE TEMP TABLE d(i) AS select * from primes(3000,1000);
SELECT 430
Time: 1711.761 ms (00:01.712)
postgres=# CREATE TEMP TABLE c(i) AS select * from primes(3000,800);
SELECT 430
Time: 1695.348 ms (00:01.695)
postgres=# select * from orig except all select * from a;
 i 
---
(0 rows)

Each of the function invocations returns 430 rows. The original query did the same. Since all the temp tables have the same number of rows, it's enough to see if all rows from orig minus all rows from a (or b or c or d) gives the empty set.

Now, what about the timing?

If we want to find all primes up to 3000 and have only one backend (step=3000) the query takes roughly the same time as the original. If the work is distributed to 2 processes the time taken is also cut in half. However, if more jobs are introduces it does not become better. The reason might be that the machine is a relatively cheap AWS instance. Although it shows 4 CPUs, this is probably a lie, kind of.

One step further

So far so good. But isn't it possible to write a single SQL statement that does all of this and also collects the results and closes the connections? That way the necessity of a function could be avoided.

A CTE is an optimization barrier. So, each part of a CTE can in principle be approached like a separate statement. At least so I thought.

WITH v_q AS (
    WITH jobs(cn) AS (
        VALUES ('c1'), ('c2')
    )
    , conn AS (
        SELECT *, 1/(dblink_connect(cn, 'dbname=postgres port=5434')='OK')::INT AS connstatus
          FROM jobs
    )
    SELECT conn.*, 1/q.status AS sendstatus
      FROM conn
     CROSS JOIN LATERAL dblink_send_query(conn.cn,
           $$
               select now(), pg_sleep(3), clock_timestamp()
           $$) q(status)
)
SELECT tb.tx_time, tb.end_time
  FROM v_q
 CROSS JOIN dblink_get_result(v_q.cn) tb(
           tx_time TIMESTAMP,
           dummy TEXT, 
           end_time TIMESTAMP
       );

Here v_q represents the set that was in the function iterated through in the for-loop. The main query then loops through that set collecting the result.

Unfortunately, the 2nd dblink_send_query is started only after the result from the first is there:

          tx_time           |          end_time
----------------------------+----------------------------
 2017-12-07 23:06:19.687594 | 2017-12-07 23:06:22.688422
 2017-12-07 23:06:22.690816 | 2017-12-07 23:06:25.692413
(2 rows)

Since that does not work I didn't try add connection closing as well.