Postgresql – How to parallelise remote queries in Postgres

dblinkpostgresql

I am querying N tables remotely using dblink.
This queries represent a sort of "sharding" of the data.
Each remote query takes roughly the same time (~ 0.5 sec).

However when I gather all the data in one query for a simple count(*) using either join, union all or CTE/with the total amount of time became "linear" which means 0.5*N.

That of course kills a lot of the purpose for the "sharding" process.

Is there any way to force Postgres (11) to gather the data in parallel instead of sequentialising it?

UPDATE

The remote query I am running on the single "shard" is based on a function that wraps a single spatial query along with an aggregation and it uses indexes and partitions (spatial query along with an aggregation):

CREATE OR REPLACE FUNCTION my_func(pt character(1), bd smallint, polyg geometry, tf date, tt date) RETURNS
TABLE(import_date date, bedrooms smallint, property_type character(1),
      .....,r_rent_paid float[]) AS $$
BEGIN
    RETURN query
    select
          coalesce(s.import_date, r.import_date) as import_date,
          coalesce(s.bedrooms, r.bedrooms) as bedrooms,
          coalesce(s.property_type, r.property_type) as property_type,
          s.s_size,
          .....,
          r.rent_paid
    from
    (
        select

           sc.import_date,
           sc.bedrooms,
           sc.property_type,

           ......multiple percentile_cont......,

           percentile_cont(array[0.25,0.5,0.75,0.9]) within group (order by sc.discount) filter(where sc.discount > 0) as discount

           from node.sales sc
           where sc.property_type = pt and  sc.bedrooms = bd and st_Intersects(polyg,sc.geom)
           and sc.import_date between tf and tt
           group by sc.import_date, sc.bedrooms, sc.property_type
    ) s
    full join
    (
        select
           rc.import_date,
           rc.bedrooms,
           rc.property_type,

           ......multiple percentile_cont......,


           percentile_cont(array[0.25,0.5,0.75,0.9]) within group (order by rc.asking_rent_sqft) filter(where rc.asking_rent_sqft > 0) as rent_paid_sqft

           from node.rents rc
           where rc.property_type = pt and  rc.bedrooms = bd and st_Intersects(polyg,rc.geom)
           and rc.import_date between tf and tt
           group by rc.import_date, rc.bedrooms, rc.property_type
    ) r
    on r.import_date = s.import_date and r.bedrooms = s.bedrooms and r.property_type = s.property_type;
END $$ language plpgsql;

I have tried to run this functions using postgres_fdw creating this function only on the "master" and running it like this:

select import_date, bedrooms, property_type, count(*)
from (
SELECT * FROM my_func(…point import foreign schema shard-1…)
union all
SELECT * FROM my_func(…point import foreign schema shard-2…)
union all
…. 6 more
) K
GROUP BY import_date, bedrooms, property_type;

However the call is blocking for each select so it kills the process.

As alternative so far I tried using dblink as it create no blocking queries (async):

SELECT dblink_connect('dtest1', 'host=xxxx ....');
SELECT dblink_connect('dtest2', 'host=xxxx ....');
.....
SELECT dblink_connect('dtest8', 'host=xxxx ....');

SELECT dblink_send_query('dtest1', 'select * from my_func(...)');
SELECT dblink_send_query('dtest2', 'select * from my_func(...)');
.....
SELECT dblink_send_query('dtest8', 'select * from my_func(...)');

and finally:

select import_date, bedrooms, property_type, count(*)
from (
    SELECT * FROM dblink_get_result('dtest1') AS t1(....)
    union all
    SELECT * FROM dblink_get_result('dtest2') AS t2(....)
    union all
    .....
    union all
    SELECT * FROM dblink_get_result('dtest8') AS t8(....)
) K
GROUP BY import_date, bedrooms, property_type;

which is still going "linear" and if I run the query soon after the first run it may no return results (unless I wait a few seconds from the previous).

UPDATE 2

What is look a working solution so far is what @Richard Huxton suggested, about using PL/Proxy which allows to run remote functions (in parallel when you use Cluster/RUN ALL). I have also tried CITUS but I could not get the "parallel/run" (percentile_cont is not supported and also won't change the concept of having a single machine with a table partitioned for that specific case as you have to pass through the query planner.

Best Answer

Regarding your dblink method being linear, one possibility is that the bottleneck is simply in transferring the data. That is going to be a linear step: the data is only transferred one connection at a time.

It is also possible that the query you have encapsulated in the function does a lot of intermediate aggregate calculations which are only being started once you call for another row. So each query runs only so far before it stops, and then resumes once dblink_get_result is called on that query. Since dblink_get_result runs to completion once called, it would result in a large part of the work being serial. I can't tell if the query you have is likely to be that type of query or not. Seeing an EXPLAIN plan of the query might provide some insight there.

If the above theory is correct, then wrapping the query in an ORDER BY (which cannot be satisfied by any index) might help things, by forcing each foreign server to run the query to completion and buffer up all the results before returning the first row.

if I run the query soon after the first run it may no return results (unless I wait a few seconds from the previous).

I think you have ignored the note "It [dblink_get_result] must be called once for each query sent, and one additional time to obtain an empty set result, before the connection can be used again."

...although if you have ignored that, then I don't understand how waiting a few seconds could fix the issue.