PostgreSQL支持并行编程

这篇文章是“PostgreSQL Advent Calendar 2016”第9天的文章。

首先

昨天,开发中的PostgreSQL10.0终于引入了专门用于分区的语法,下一个版本也是令人期待的。虽然我原本打算写一篇关于分区的文章,但我希望有其他人能写,所以今天我要介绍的是在最新版本的PostgreSQL 9.6中引入了并行查询和并行化的热门话题,以及如何使用PostgreSQL的并行机制进行并行编程的方法。

我创建了一个名为pg_foobar的扩展作为示例程序,请从GitHub存储库下载。

执行示例

在pg_foobar EXTENSION中,我们准备了pg_foobar()函数,当执行SELECT pg_foobar(2, 3, 4)时,两个并行的工作进程会分别说出’foo’3次,’bar’4次。日志的输出格式是worker <用于识别工作进程> <是foo还是bar> <次数>。以下是一个执行示例,可以看到两个工作进程并行地输出了3次foo和4次bar。

=# CREATE EXTENSION pg_foobar;
=# SELECT pg_foobar(2, 3, 4);
 pg_foobar 
-----------

(1 row)

$ cat /path/to/postgresql.log
LOG:  worker 0 foo 0
LOG:  worker 0 foo 1
LOG:  worker 1 foo 0
LOG:  worker 0 foo 2
LOG:  worker 1 foo 1
LOG:  worker 0 bar 0
LOG:  worker 1 foo 2
LOG:  worker 0 bar 1
LOG:  worker 1 bar 0
LOG:  worker 0 bar 2
LOG:  worker 1 bar 1
LOG:  worker 0 bar 3
LOG:  worker 1 bar 2
LOG:  worker 1 bar 3

进行并行化所需的东西

只需要以下这些内容,就可以编写简单的并行处理程序。

    1. 动态共享内存(DSM)和ToC

这是一种在动态时机上分配共享内存的机制。
它用于在各个工作进程之间或与启动并行工作进程的读取进程之间进行数据交换。

并行工作进程的入口点函数

在每个工作进程启动后,定义将要处理的函数。

概述处理

在领导进程中,主要情况如下。带有★标记的部分是此程序特有的处理,根据所需的并行处理和协作数据进行更改。其他处理基本可以直接使用。这样看来,可以发现简单处理的并行化非常容易。下面是添加了日语注释的代码。

/* ★toc用のキーの設定。重複がなければなんでもOK */
#define FOO_KEY 1000 /* foo用のキー番号 */
#define BAR_KEY 1001 /* bar用のキー番号 */

/* リーダプロセスのメイン処理 */
Datum
pg_foobar(PG_FUNCTION_ARGS)
{
    int nworkers = PG_GETARG_INT32(0);
    int n_foo = PG_GETARG_INT32(1);
    int n_bar = PG_GETARG_INT32(2);
    int size = 0
    int keys = 0;
    int *shm_area;
    ParallelContext *pcxt;

    /* パラレル処理開始時のおまじない */
    EnterParallelMode();

    /* 第一引数にエントリポイントの関数名、第二引数に並列度 */
    pcxt = CreateParallelContext(foobar_worker, nworkers);

    /*
     * ★DSM領域の見積り、確保する。
     * fooの出力回数用、barの出力回数用のためにDSMの領域は2つ。キーの数も2つ。
     */
    size += BUFFERALIGN(sizeof(int));
    keys++;
    size += BUFFERALIGN(sizeof(int));
    keys++;
    shm_toc_estimate_chunk(&pcxt->estimator, size);
    shm_toc_estimate_keys(&pcxt->estimator, keys);
    InitializeParallelDSM(pcxt);

    /* ★fooの回数(n_foo)をDSM上のメモリ領域に格納 */
    shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
    shm_toc_insert(pcxt->toc, FOO_KEY, shm_area); /* あとでワーカーが探せるようにキー(FOO_KEY)を設定 */
    *shm_area = n_foo;
    /* ★barの回数(n_bar)をDSM上のメモリ領域に格納 */
    shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
    shm_toc_insert(pcxt->toc, BAR_KEY, shm_area);  /* あとでワーカーが探せるようにキー(FOO_KEY)を設定 */
    *shm_area = n_bar;

    /* ワーカーを起動 */
    LaunchParallelWorkers(pcxt);

    /* 全ワーカーが終了するまで待機 */
    WaitForParallelWorkersToFinish(pcxt);

    /* パラレル処理終了時のおまじない */
    DestroyParallelContext(pcxt);
    ExitParallelMode();

    PG_RETURN_NULL();
}

工人只需要简单地从DSM获取数据,并进行自己的处理。

static void
parallel_worker(dsm_segment *seg, shm_toc *toc)
{
    int n_foo;
    int n_bar;
    int *shm_area;
    int i;

    /* FOO_KEYを使ってtocからデータを取得 */
    shm_area = (int *) shm_toc_lookup(toc, FOO_KEY);
    n_foo = *shm_area;

    /* BAR_KEYを使ってtocからデータを取得 */
    shm_area = (int *) shm_toc_lookup(toc, BAR_KEY);
    n_bar = *shm_area;

    /* 以下はワーカー独自の処理 */

    /* foo */
    for (i = 0; i < n_foo; i++)
        elog(LOG, "[%d] worker %d foo", i, ParallelWorkerNumber);

    /* bar */
    for (i = 0; i < n_bar; i++)
        elog(LOG, "[%d] worker %d bar", i, ParallelWorkerNumber);
}

实施要点1:DSM和ToC

生成工作进程是由postmaster进程执行的,因此工作进程无法访问领导进程之前的处理内容和变量。因此,如果想要从领导进程传递数据,就需要通过DSM进行传递。同样地,如果要将工作进程的处理结果传递给领导进程也是如此。

PostgreSQL的并行机制中提供了Shmem ToC(Table of Contents,目录),通过DSM(Distributed Shared Memory,分布式共享内存)可相对容易地实现进程间的数据共享。

步骤1:确保DSM

为了使用ToC,我们预先估计并确保DSM领域。在示例代码的开头,以下部分适用。在pg_foobar中,为了将’foo’说的次数(int)和’bar’说的次数(int)这两个数据与每个工作者共享,我们需要两个int区域和一个键。

:
    int size = 0;
    int keys = 0;
:
    size += BUFFERALIGN(sizeof(int));
    keys++;
    size += BUFFERALIGN(sizeof(int));
    keys++;
    shm_toc_estimate_chunk(&pcxt->estimator, size); /* sizeバイト分のDSMを確保 */
    shm_toc_estimate_keys(&pcxt->estimator, keys); /* keys個のキーを確保 */
    InitializeParallelDSM(pcxt);
:

第二步:创建目录

由于只完成了第一步,在该步骤中预留了DSM领域,因此在第二步中将创建ToC。具体操作是从事先预留的DSM中划分区域,并使用标识符作为键来注册到ToC中。在pg_foobar中,为了区分保留的两个区域,已准备了FOO_KEY和BAR_KEY。以下是代码示例中相关的部分。

:
    /* ★fooの回数を格納 */
    shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
    shm_toc_insert(pcxt->toc, FOO_KEY, shm_area); /* あとでワーカープロセスが探せるようにキー(FOO_KEY)を設定 */
    *shm_area = n_foo;
    /* ★barの回数を格納 */
    shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
    shm_toc_insert(pcxt->toc, BAR_KEY, shm_area);  /* あとでワーカープロセスが探せるようにキー(FOO_KEY)を設定 */
    *shm_area = n_bar;
:

创建ToC主要有以下两个步骤:
1. 使用shm_toc_allocate()分配并切割DSM区域。→ 返回指向切割区域的指针。
2. 使用shm_toc_insert()将切割的区域与键一起注册到ToC中。
然后,只需将数据存储在通过shm_toc_allocate()分配的区域中,即可在后续工作人员中引用。

实施重点2:从ToC中提取数据

由于已经完成了ToC的设置,工作人员在启动后会从ToC中自行获取数据。在开头的代码示例中,以下部分适用:工作人员的入口函数参数应为(dsm_segment *seg,shm_toc *toc),使用shm_toc_lookup()和键来从toc变量中找到所需的数据。

/* ワーカーのエントリポイントとなる関数 */
static void
parallel_worker(dsm_segment *seg, shm_toc *toc)
{
:
    /* FOO_KEYを使ってtocからデータを取得 */
    shm_area = (int *) shm_toc_lookup(toc, FOO_KEY);
    n_foo = *shm_area;

    /* BAR_KEYを使ってtocからデータを取得 */
    shm_area = (int *) shm_toc_lookup(toc, BAR_KEY);
    n_bar = *shm_area;
:

其他:平行工作者和领导过程

当在每个场合上需要进行自己的并行处理时,可以让每个工作者进行不同的处理,或者在工作者进程和领导进程间分配处理。

    • ParallelWorkerNumber変数

ワーカー毎にユニークにつけられたID
リーダープロセスは-1、ワーカープロセスは0から順番につけられる。

IsParallelWorker()マクロ

自分がリーダーか、ワーカーかを判別する。

总结

本文采用PostgreSQL的并行机制来对简单处理进行并行化的方法进行了介绍。更详细的信息请参阅PostgreSQL的README.parallel文件。
pg_foobar拥有用于自定义并行处理所需的最基本的代码,因此可以作为创建自定义并行处理的参考。从那里开始,根据使用方法,无限的可能性将展开。您可以让主进程在等待期间执行其他处理操作,也可以通过DSM共享每个工作进程的处理结果,由主进程在最后进行聚合。您还可以准备像bgworker这样的常驻进程,并从那里启动更多的并行工作者。请尝试创建自己的并行处理吧!

bannerAds