Postgres-XC 聚合原理 以及 如何編寫聚合函數

LinetteWIHS 8年前發布 | 15K 次閱讀 PostgreSQL 數據庫服務器

來自: https://yq.aliyun.com/articles/5060

Postgres-XC聚合與PostgreSQL的聚合有一定的區別, 因為Postgres-XC的數據存儲在datanode, 聚合時數據可能分布在多個datanode上.

Postgres-XC支持傳統的聚合方法, 聚合操作可以將數據從所有的數據節點傳到coordinator節點后, 在coordinator節點進行聚合. 但是這種方法對于數據量較大的情況效率明顯偏低.

Postgres-XC還支持另一種聚合方式, 就是數據在各自的datanode執行, 形成結果后, 將datanode聚合的結果傳輸到coordinator節點再次聚合.

</div>

集群聚合與單節點聚合的區別

Postgres-XC相比PostgreSQL的聚合增加了cfunc :

sfunc( internal-state, next-data-values ) ---> next-internal-state

這個過程是在datanode節點完成的. input 是該datanode節點上的所有行(一次1行的進行調用).

cfunc( internal-state, internal-state ) ---> next-internal-state這個過程是在coordinator節點完成的. input是datanode節點的最終結果.

ffunc( internal-state ) ---> aggregate-value這個過程是在coordinator節點完成的. input是cfunc的結果.

集群聚合函數的語法

CREATE AGGREGATE name ( input_data_type [ , ... ] ) (  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , CFUNC = cfunc ]  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , INITCOLLECT = initial_collection_condition ]  
    [ , SORTOP = sort_operator ]  
)  

單點聚合函數的語法

CREATE AGGREGATE name (  
    BASETYPE = base_type,  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  

sfunc和stype是必須的. 如果沒有定義cfunc那么這個聚合就只支持傳統的PostgreSQL聚合方法.如果定義了cfunc, 那么這個聚合支持傳統的聚合方法, 同時還支持分布式聚合方法.

例子

創建sfunc

digoal=# create or replace function d_sum(int,int) returns int as $$  
select $1+$2;  
$$ language sql strict;  

創建聚合

digoal=# create AGGREGATE d_sum(int)          
(  
sfunc=d_sum,  
stype=int,  
cfunc=d_sum,  
initcond='0',  
initcollect='0'  
);  

創建測試表

digoal=# create table t1(id int, info text) distribute by (id) to group gp0;  
digoal=# insert into t1 select generate_series(1,10),'test';  
digoal=# insert into t1 values (null,'test');  

比較d_sum和sum的聚合結果

digoal=# select d_sum(id) from t1;

d_sum

55  

(1 row)
digoal=# select sum(id) from t1;

sum

55
(1 row) </code></pre>

比較sum和d_sum的執行計劃

digoal=# explain (analyze,verbose,buffers) select d_sum(id) from t1;
QUERY PLAN



Aggregate (cost=250.00..250.01 rows=1 width=4) (actual time=1.657..1.657 rows=1 loops=1)
Output: d_sum((d_sum(t1.id)))
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.703..0.743 rows=5 loops=1)
Output: (d_sum(t1.id))
-> Data Node Scan on "REMOTE_GROUP_QUERY" (cost=0.00..0.00 rows=1000 width=4) (actual time=0.702..0.740 rows=5 loops=
1)
Output: d_sum(t1.id)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_sum(group_1.id) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 1.689 ms
(9 rows) </code></pre>

d_sum, sum兩者都是有了分布式聚合.

digoal=# explain (analyze,verbose,buffers) select sum(id) from t1;
QUERY PLAN



Aggregate (cost=2.50..2.51 rows=1 width=4) (actual time=0.674..0.674 rows=1 loops=1)
Output: pg_catalog.sum((sum(t1.id)))
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.507..0.659 rows=5 loops=1)
Output: (sum(t1.id))
-> Data Node Scan on "REMOTE_GROUP_QUERY" (cost=0.00..0.00 rows=1000 width=4) (actual time=0.507..0.655 rows=5 loops=
1)
Output: sum(t1.id)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT sum(group_1.id) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 0.705 ms
(9 rows) </code></pre>

在沒有cfunc的情況下, 數據必須從所有的datanode匯總到coordinator節點后執行sfunc聚合. 如下:

digoal=# drop AGGREGATE d_sum(int);
DROP AGGREGATE
digoal=# create AGGREGATE d_sum(int)
(
sfunc=d_sum,
stype=int,
initcond='0'
);
CREATE AGGREGATE
digoal=# select d_sum(id) from t1;

d_sum

55  

(1 row)
digoal=# explain (analyze,verbose,buffers) select d_sum(id) from t1;

                                             QUERY PLAN                                                   

Aggregate (cost=250.00..250.01 rows=1 width=4) (actual time=1.981..1.981 rows=1 loops=1)
Output: d_sum(id)
-> Data Node Scan on t1 (cost=0.00..0.00 rows=1000 width=4) (actual time=0.567..0.610 rows=11 loops=1)
Output: id, info
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT id, info FROM ONLY t1 WHERE true
Total runtime: 2.015 ms
(7 rows) </code></pre>

聚合數據處理流程簡介

-1. 數據匯總聚合方法數據處理流程 :

Two phased aggregation - is used when the entire aggregation takes place on the Coordinator node.
In first phase called transition phase, Postgres-XC creates a temporary variable of data type stype to hold the current internal state of the aggregate.
創建初始變量 tmpvar(stype)

At each input row, the aggregate argument value(s) are calculated and the state transition function is invoked with the current state value and the new argument value(s) to calculate a new internal state value.
per row迭代計算tmpvar : sfunc(tmpvar, vardict parameters) -> tmpvar

After all the rows have been processed, in the second phase or finalization phase the final function is invoked once to calculate the aggregate's return value.
If there is no final function then the ending state value is returned as-is.
final函數 : ffunc(tmpvar) -> result(對應聚合函數輸出類型) </code></pre>

-2. 分布式聚合方法數據處理流程 :

Three phased aggregation - is used when the process of aggregation is divided between Coordinator and Datanodes.   
In this mode, each Postgres-XC Datanode involved in the query carries out the first phase named transition phase.   
This phase is similar to the first phase in the two phased aggregation mode discussed above, except that, every Datanode applies this phase on the rows available at the Datanode.   
The result of transition phase is then transferred to the Coordinator node. Second phase called collection phase takes place on the Coordinator.   
Postgres-XC Coordinator node creates a temporary variable of data type stype to hold the current internal state of the collection phase.   
For every input from the Datanode (result of transition phase on that node), the collection function is invoked with the current collection state value and the new transition value (obtained from the Datanode) to calculate a new internal collection state value.   
After all the transition values from data nodes have been processed, in the third or finalization phase the final function is invoked once to calculate the aggregate's return value.   
If there is no final function then the ending collection state value is returned as-is.  

聚合中的數據類型簡介

sfunc( internal-state, next-data-values ) ---> next-internal-state  
cfunc( internal-state, internal-state ) ---> next-internal-state  
ffunc( internal-state ) ---> aggregate-value  

聚合語法中的 input_data_type 對應 next-data-values

聚合語法中的 state_data_type 對應 internal-state

聚合最終輸出類型有兩種aggregate-value 或者 internal-state.

-1. 當定義了finalfunc時是finalfunc的返回類型.

-2. 當沒有定義finalfunc時, 輸出類型是cfunc或者sfunc的返回類型, 也就是internal-state類型, 既stype.

從流程來分析, sfunc的第一個參數的數據類型以及返回數據類型都必須和stype定義的類型相同.

sfunc的第二個參數必須和聚合的input類型相同.

cfunc的兩個參數以及返回類型都必須和stype定義的類型相同.

ffunc的輸入參數必須和stype定義的類型相同.

所以整個聚合涉及了3個數據類型:

-1. 聚合的輸入類型,

-2. stype , (同時initcond, initcollect類型=stype);

-3. 以及 finalfunc的返回類型.

</div>

聚合對空值的處理簡介

與sfunc,cfunc,finalfunc的定義有關, 是否strict. 是則忽略null值, 不進行調用.不是strict則需要函數自己處理null值. 所以要特別小心.

An aggregate function can provide an initial condition, that is, an initial value for the internal transition or collection state value. This is specified and stored in the database as a value of type text, but it must be a valid external representation of a constant of the state value data type. If it is not supplied then the state value starts out null.

If the collection function is declared "strict", then it cannot be called with null inputs. With such a collection function, aggregate execution behaves as follows. Null state transition results are ignored (the function is not called and the previous collection state value is retained). If the initial state value is null, then at the first non-null state transition result replaces the collection state value, and the collection function is invoked at subsequent rows with all-nonnull transition values. This is handy for implementing aggregates like max.

If the state transition function is declared "strict", then it cannot be called with null inputs. With such a transition function, aggregate execution behaves as follows. Rows with any null input values are ignored (the function is not called and the previous state value is retained). If the initial state value is null, then at the first row with all-nonnull input values, the first argument value replaces the state value, and the transition function is invoked at subsequent rows with all-nonnull input values. This is handy for implementing aggregates like max. Note that this behavior is only available when state_data_type is the same as the first input_data_type. When these types are different, you must supply a nonnull initial condition or use a nonstrict transition function.

If the state transition and/or collection function is not strict, then it will be called unconditionally at each input row, and must deal with null inputs and null transition/collection values for itself. This allows the aggregate author to have full control over the aggregate's handling of null values.

If the final function is declared "strict", then it will not be called when the ending state value is null; instead a null result will be returned automatically. (Of course this is just the normal behavior of strict functions.) In any case the final function has the option of returning a null value. For example, the final function for avg returns null when it sees there were zero input rows. </code></pre>

例如count, id有一條為null :

digoal=# select count(id) from t1;

count

10  

(1 row)
digoal=# select count(*) from t1;

count

11  

(1 row) </code></pre>

假如把d_sum(int,int)函數改成called on null input.這種情況下, 即使row取到的是空值也會調用func. 因此null+val=null, 最后得到的就是null了.

digoal=# alter function d_sum(int,int) called on null input;
ALTER FUNCTION
digoal=# select d_sum(id) from t1;

d_sum

(1 row) </code></pre>

聚合的優化

使用btree.

sort_operator
The associated sort operator for a MIN- or MAX-like aggregate. This is just an operator name (possibly schema-qualified). The operator is assumed to have the same input data types as the aggregate (which must be a single-argument aggregate).

Aggregates that behave like MIN or MAX can sometimes be optimized by looking into an index instead of scanning every input row. If this aggregate can be so optimized, indicate it by specifying a sort operator. The basic requirement is that the aggregate must yield the first element in the sort ordering induced by the operator; in other words:

SELECT agg(col) FROM tab;
must be equivalent to:
SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;

Further assumptions are that the aggregate ignores null inputs, and that it delivers a null result if and only if there were no non-null inputs. Ordinarily, a data type's < operator is the proper sort operator for MIN, and > is the proper sort operator for MAX. Note that the optimization will never actually take effect unless the specified operator is the "less than" or "greater than" strategy member of a B-tree index operator class. </code></pre>

定義類似count(*)這樣的無參數聚合

digoal=# select * from pg_aggregate where aggfnoid::text ~ 'count'; 
     aggfnoid     | aggtransfn | aggcollectfn | aggfinalfn | aggsortop | aggtranstype | agginitval | agginitcollect ------------------+-----------------------+------------------+------------+-----------+--------------+------------+---------------- pg_catalog.count | int8inc_any | int8_sum_to_int8 | - | 0 | 20 | 0 | 0 pg_catalog.count | int8inc | int8_sum_to_int8 | - | 0 | 20 | 0 | 0 digoal=# \df+ int8inc_any List of functions Schema | Name | Result data type | Argument data types | Type | Volatility | Owner | Language | Source code | Description ------------+-------------+------------------+---------------------+--------+------------+----------+----------+-------------+------ ------------------------------ pg_catalog | int8inc_any | bigint | bigint, "any" | normal | immutable | postgres | internal | int8inc_any | incre ment, ignores second argument (1 row) digoal=# \df+ int8inc List of functions Schema | Name | Result data type | Argument data types | Type | Volatility | Owner | Language | Source code | Descripti on ------------+---------+------------------+---------------------+--------+------------+----------+----------+-------------+---------- --- pg_catalog | int8inc | bigint | bigint | normal | immutable | postgres | internal | int8inc | increment (1 row) digoal=# select proisstrict from pg_proc where proname='int8inc'; proisstrict ------------- t (1 row) digoal=# select proisstrict from pg_proc where proname='int8inc_any'; proisstrict ------------- t (1 row) 

以上count實際上分了兩種, 一種是count(列名字), 一種是count(*). int8inc_any用于count(列名), int8inc用于count(*).舉例 :

-- 創建sfunc
digoal=# create or replace function d_count(int8) returns int8 as $$ select $1+1;
$$ language sql strict;
-- 創建cfunc
digoal=# create or replace function d_count(int8,int8) returns int8 as $$ select $1+$2;
$$ language sql strict;
-- 創建聚合
digoal=# create aggregate d_count() ( sfunc=d_count, stype=int8, cfunc=d_count, initcond='0', initcollect='0' ); digoal=# select d_count() from t1;

d_count

  11  

(1 row)
digoal=# explain (analyze,verbose,buffers) select d_count() from t1; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------ --- Aggregate (cost=250.00..250.01 rows=1 width=0) (actual time=1.308..1.308 rows=1 loops=1) Output: sj.d_count()
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.712..0.725 rows=5 loops=1)
Output: (d_count()) -> Data Node Scan on "REMOTE_GROUP_QUERY" (cost=0.00..0.00 rows=1000 width=0) (actual time=0.712..0.721 rows=5 loops= 1) Output: d_count()
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_count() FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1 Total runtime: 1.339 ms (9 rows) </code></pre>

注意, 為什么d_count(id)得到的結果不是10呢? 因為我沒有定義d_count("any"), 只定義了d_count() :</p>

digoal=# select d_count(id) from t1;

d_count

   2  
   3  
   4  
   9  
  10  
   5  
   6  
   7  
   8  
  11  

(11 rows) </code></pre>

因為這里的d_count是普通函數, 而不是聚合.

實際上調用的是d_count(int8)函數.

將sfunc,cfunc改名后就更能看出來了.

</div>

digoal=# alter function d_count(int8) rename to s_d_count; 
ALTER FUNCTION
Time: 18.591 ms
digoal=# alter function d_count(int8,int8) rename to c_d_count; ALTER FUNCTION
Time: 4.049 ms
digoal=# create aggregate d_count() (
sfunc=s_d_count,
stype=int8,
cfunc=c_d_count,
initcond='0',
initcollect='0'
);
CREATE AGGREGATE
Time: 21.752 ms
digoal=# explain (analyze,verbose,buffers) select d_count(
) from t1; QUERY PLAN



Aggregate (cost=250.00..250.01 rows=1 width=0) (actual time=1.732..1.732 rows=1 loops=1)
Output: sj.d_count()
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.986..1.008 rows=5 loops=1)
Output: (d_count(
))
-> Data Node Scan on "REMOTE_GROUP_QUERY" (cost=0.00..0.00 rows=1000 width=0) (actual time=0.985..1.002 rows=5 loops=
1)
Output: d_count()
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_count(
) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 1.766 ms
(9 rows)

Time: 3.088 ms
digoal=# explain (analyze,verbose,buffers) select d_count(id) from t1; ERROR: function d_count(integer) does not exist
LINE 1: explain (analyze,verbose,buffers) select d_count(id) from t1...
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts. </code></pre>

定義類似count(*)和count(列名)的函數還有一個count(列名)是怎么定義的呢

digoal=# create or replace function s_d_count(int8,"anyelement") returns int8 as $$
select $1+1;
$$ language sql strict;
CREATE FUNCTION
Time: 4.715 ms
digoal=# create aggregate d_count("anyelement")
(
sfunc=s_d_count,
stype=int8,
cfunc=c_d_count,
initcond='0',
initcollect='0'
);
CREATE AGGREGATE
Time: 69.536 ms
digoal=# select d_count(id) from t1;

d_count

  10  

(1 row)
Time: 3.694 ms
digoal=# explain (analyze,verbose,buffers) select d_count(id) from t1;
QUERY PLAN



Aggregate (cost=250.00..250.01 rows=1 width=4) (actual time=1.913..1.913 rows=1 loops=1)
Output: d_count((d_count(t1.id)))
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.893..0.923 rows=5 loops=1)
Output: (d_count(t1.id))
-> Data Node Scan on "REMOTE_GROUP_QUERY" (cost=0.00..0.00 rows=1000 width=4) (actual time=0.892..0.917 rows=5 loops=
1)
Output: d_count(t1.id)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_count(group_1.id) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 1.947 ms
(9 rows) </code></pre>

第二個d_count聚合復用了前面定義的c_d_count(int8,int8).

digoal=# select * from pg_aggregate where aggfnoid::text ~ 'd_count'; 
  aggfnoid  | aggtransfn | aggcollectfn | aggfinalfn | aggsortop | aggtranstype | agginitval | agginitcollect ------------+--------------+--------------+------------+-----------+--------------+------------+---------------- sj.d_count | sj.s_d_count | c_d_count | - | 0 | 20 | 0 | 0 sj.d_count | sj.s_d_count | c_d_count | - | 0 | 20 | 0 | 0 (2 rows) Time: 2.811 ms 

[注意]

-1. 如果你定義的聚合既支持分布式聚合同時又支持傳統的數據匯總聚合, Postgres-XC會根據成本選擇合適的聚合方法, 所以在這種情況下, 必須確保兩種聚合方法得到的聚合結果是一致的, 否則會有問題.-2. 聚合最好不要與函數名重復. 否則會難以排錯.

[參考]

 本文由用戶 LinetteWIHS 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!