How We Built the Fastest Postgres DB for Analytics 🎁

Joe Sciarrino, CEO

2022-12-13

Hydra is a fully-managed open source data warehouse built on Postgres. It’s easy to use and designed to scale analytics (OLAP) & hybrid transactional workloads. Our team is excited to announce that Hydra is the fastest Postgres database for analytics. Learn how we enabled columnar storage, vectorization and query parallelization to deliver 23x faster performance than Postgres!

Schedule time with our team to get your 14 day free trial 🐘🎄💨 of cloud-managed Hydra!

Postgres OLAP is here

Hydra scales analytic performance with:

  • Columnar storage
  • Parallelized query execution
  • Vectorization

Benchmarks

We chose to use the ClickBench benchmarks due to their relevance and usefulness to analytic workloads. ClickBench methodology is as follows:

This benchmark represents typical workload in the following areas: clickstream and traffic analysis, web analytics, machine-generated data, structured logs, and events data. It covers the typical queries in ad-hoc analytics and real-time dashboards. The dataset from this benchmark was obtained from the actual traffic recording of one of the world's largest web analytics platforms.[1]

Query Speed Benchmarks (via ClickBench)

Jump to the full ClickBench report.

Columnar Storage

Columnar storage is a key part of the data warehouse, but why is that? Let’s review what columnar storage is and why it’s important for scalable analytics.

By default, data in Postgres is stored in a heap table. Heap tables are arranged row-by-row, similar to how you would arrange data when creating a large spreadsheet. Columnar tables are organized transversely from row tables. Instead of rows being added one after another, rows are inserted in bulk into a stripe. Within each stripe, data from each column is stored next to each other. Imagine rows of a table containing:

| a | b | c | d |
| a | b | c | d |
| a | b | c | d |

This data would be stored as follows in columnar:

| a | a | a |
| b | b | b |
| c | c | c |
| d | d | d |

Heap row tables are organized into pages. Each page holds 8kb of data. Each page holds pointers to the start of each row in the data. In columnar, you can think of each stripe as a row of metadata that also holds up to 150,000 rows of data. Within each stripe, data is divided into chunks of 1000 rows, and then within each chunk, there is a “row” for each column of data you stored in it. Additionally, columnar stores the minimum, maximum, and count for each column in each chunk.

Advantages to columnar

Columnar is optimized for table scans — in fact, it doesn’t use indexes at all. Using columnar, it’s much quicker to obtain all data for a particular column. The database doesn’t need to read data that you are not interested in at the moment. It also uses the metadata about the values in the chunks to eliminate reading data. This is a form of “auto-indexing” the data.

In addition, very high data compression is possible because similar data is stored next to each other. Data compression is an important benefit because columnar is often used to store huge amounts of data. By compressing the data, you can effectively read data more quickly from disk, which both reduces I/O and increasing effective fetch speed. It has the additional effect of making better use of disk caching as data is cached in its compressed form. Lastly, you greatly reduce your storage costs.

Our approach to columnar

Our approach is to use a Postgres extension. Extensions allows us to maintain 100% compatibility between Hydra and the community Postgres versions & ecosystem. We looked into existing open source columnar storage engines and chose the columnar access method developed at Citus[2]. Testing Citus Columnar we found performance was better than row-based tables for analytics, but could be improved with vectorization and parallel query execution.

Parallelization + Vectorization

It was clear that columnar storage was missing features that were essential to boost performance. The obvious choice was to enable the parallelization of queries. We started with single process columnar execution and enabled parallel execution of SELECT queries. This provided an impressive performance boost.

The next step was to enable vectorization when processing queries. We started by enabling vectorized execution on WHERE clauses. Work will continue as we invest resources in vectorized execution of aggregate functions, use of explicit SIMD execution, support for more types but in general vectorized execution.

Parallelization

PostgreSQL introduced parallelization feature in version 9.6. Only single process was doing all the work before that. Parallelism in PostgreSQL was implemented as part of multiple features which cover sequential scans, aggregates, and joins.

By definition, query parallelization works by dividing the work of a query into smaller tasks, which are then executed simultaneously on multiple cores or machines. This allows the database to take advantage of modern processors, which are designed to perform multiple operations simultaneously and can significantly improve the performance of queries that involve large amounts of data.

To enable parallelization, custom scan needs to implement specific CustomExecMethods callbacks.

  Size (*EstimateDSMCustomScan) (CustomScanState *node,
                                 ParallelContext *pcxt);
  void (*InitializeDSMCustomScan) (CustomScanState *node,
                                   ParallelContext *pcxt,
                                   void *coordinate);
  void (*ReInitializeDSMCustomScan) (CustomScanState *node,
                                     ParallelContext *pcxt,
                                     void *coordinate);
  void (*InitializeWorkerCustomScan) (CustomScanState *node,
                                      shm_toc *toc,
                                      void *coordinate);

PostgreSQL parallelization communication is done via shared memory and parallelization is achieved with multiple processes rather than threads.

Queries start with single process called leader that will provide size of custom scan shared memory via EstimateDSMCustomScan. After that, leader needs to initialize shared memory with InitializeDSMCustomScan. Custom scan shared memory structure is shared between workers and each worker needs to setup it own memory custom scan structures in InitializeWorkerCustomScan callback. ReinitializeDSMCustomScan is used when there is request to re-initialize scan, which means that workers will shutdown and than be created again - e.g. behavior of nested-loop joins.

There was question that needed to be answered when doing custom scan parallel implementation - how to divide data between workers to scan and process unique tuples? Luckily, internal columnar storage structure helped us with this problem and solution was straight forward - each worker will work on different stripe. It provided us with logical and physical barrier that will satisfy the uniqueness of data that is processed in parallel.

Shared memory variable is used to track allocation of next stripe. Variable is defined as atomic so fetch-and-add CPU instruction will handle atomicity between workers’ assignments.

Vectorization

Vectorized execution is a technique that is used to improve the performance of database queries by executing multiple operations simultaneously. This is in contrast to traditional execution, where each operation is performed one at a time.

Vectorized execution works by dividing data into small chunks, called vectors, and then executing multiple operations on each vector in parallel. This allows the database to take advantage of modern processors, which are designed to perform multiple operations simultaneously, and can significantly improve the performance of queries that involve large amounts of data.

Vectorization currently supported in Hydra, for now, is limited to WHERE clauses. These clauses are processed directly in custom scan node so we have full control how to execute code.

Vectorized execution will be used only if there exists vectorized function that accepts same arguments and operator between them. Currently, we support basic types and compression operator between them. Supported types are CHAR, SMALLINT, INT, BIGINT, DATE, TIME.

Another limitation of current implementation is that only work with single variable and constant value.

WHERE clause are stored in list and and top-level clauses will be evaluated sequentially by AND operator. More complex clauses will construct multiple tree like structures.

Hydra vectorization expects that all clauses in single tree can be vectorized and if this is not true vectorized execution will not be used for that branch.

CREATE TABLE t(a INT, b TEXT) using columnar;
INSERT INTO t SELECT g, g % 20, 'random' || g FROM generate_series(0,1000000 - 1) g;
postgres=# set columnar.enable_vectorization to false;
SET
postgres=# EXPLAIN SELECT sum(a) FROM t WHERE b < 15;
                                   QUERY PLAN
--------------------------------------------------------------------------------
 Aggregate  (cost=2592.51..2592.52 rows=1 width=8)
   ->  Custom Scan (ColumnarScan) on t  (cost=0.00..92.51 rows=1000000 width=4)
         Filter: (b < 15)
         Columnar Projected Columns: a, b
         Columnar Chunk Group Filters: (b < 15)

postgres=# set columnar.enable_vectorization to default;
SET
postgres=# EXPLAIN SELECT sum(a) FROM t WHERE b < 15;
                                   QUERY PLAN
--------------------------------------------------------------------------------
 Aggregate  (cost=2592.51..2592.52 rows=1 width=8)
   ->  Custom Scan (ColumnarScan) on t  (cost=0.00..92.51 rows=1000000 width=4)
         Columnar Projected Columns: a, b
         Columnar Chunk Group Filters: (b < 15)
         Columnar Vectorized Filter: (b < 15)

In this first example single WHERE clause is used which can be vectorized so we can trigger vectorized execution for this query. Clause list will be evaluated

  • AND( b < 15 )
postgres=# set columnar.enable_vectorization to false;
SET
postgres=# EXPLAIN SELECT sum(a) FROM t WHERE b < 15 AND c <> '';
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Aggregate  (cost=2638.77..2638.78 rows=1 width=8)
   ->  Custom Scan (ColumnarScan) on t  (cost=0.00..138.77 rows=1000000 width=4)
         Filter: ((b < 15) AND (c <> ''::text))
         Columnar Projected Columns: a, b, c
         Columnar Chunk Group Filters: (b < 15)

postgres=# set columnar.enable_vectorization to default;
SET
postgres=# EXPLAIN SELECT sum(a) FROM t WHERE b < 15 AND c <> '';
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Aggregate  (cost=2638.77..2638.78 rows=1 width=8)
   ->  Custom Scan (ColumnarScan) on t  (cost=0.00..138.77 rows=1000000 width=4)
         Filter: (c <> ''::text)
         Columnar Projected Columns: a, b, c
         Columnar Chunk Group Filters: (b < 15)
         Columnar Vectorized Filter: (b < 15)

This example shows that even if we don’t have support for all where clauses still vectorization will be used. This is possible because top-level clauses are independent and can be processed sequentially. Clauses for this example are evaluated as

  • AND( b < 15, c <> ‘’ )
postgres=# set columnar.enable_vectorization to false;
SET

postgres=# EXPLAIN SELECT sum(a) FROM t WHERE b < 15 OR c <> '';
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Aggregate  (cost=2916.30..2916.31 rows=1 width=8)
   ->  Custom Scan (ColumnarScan) on t  (cost=0.00..416.30 rows=1000000 width=4)
         Filter: ((b < 15) OR (c <> ''::text))
         Columnar Projected Columns: a, b, c

postgres=# set columnar.enable_vectorization to default;
SET
postgres=# EXPLAIN SELECT sum(a) FROM t WHERE b < 15 OR c <> '';
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Aggregate  (cost=2916.30..2916.31 rows=1 width=8)
   ->  Custom Scan (ColumnarScan) on t  (cost=0.00..416.30 rows=1000000 width=4)
         Filter: ((b < 15) OR (c <> ''::text))
         Columnar Projected Columns: a, b, c

In this example, vectorization will not be used. OR operator is used between two clauses where one of clauses can be vectorized while second cannot. Clauses are evaluated as

  • AND( OR ( b < 15, c <> ‘’ ) )

Since not all tree leafs for OR branch cannot be vectorized we don’t use vectorization for this query.

postgres=# EXPLAIN SELECT sum(a) FROM t WHERE b < 15 AND (c <> '' OR a < 40);
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Aggregate  (cost=2638.77..2638.78 rows=1 width=8)
   ->  Custom Scan (ColumnarScan) on t  (cost=0.00..138.77 rows=1000000 width=4)
         Filter: ((c <> ''::text) OR (a < 40))
         Columnar Projected Columns: a, b, c
         Columnar Chunk Group Filters: (b < 15)
         Columnar Vectorized Filter: (b < 15)

Last example shows how vectorized execution is used for one of the clauses but not for the second clause.

Where clause is evaluated as:

  • AND( b < 15, OR ( c <> ‘’ , a < 40 ) )

Vectorization can be enabled or disabled with columnar.enable_vectorization GUC variable which is by default set to true.

Vectorization is there to to speed up execution but if non-vectorized performance is fast enough vectorization can even add overhead to performance. We are expecting in OLAP cases that are dealing with complex queries and big data that this feature can improve performance.

Optimizing Postgres Settings

Getting the most performance out of PostgreSQL isn’t complete without tuning. Postgres default parameters do not take full advantage of the hardware you are running it on without telling Postgres how much it can use. If you are running Hydra on a dedicated machine, these are the settings we recommend for Hydra. You may wish to reduce work_mem and hash_mem_multiplier depending on your available memory and concurrency load.

max_worker_processes = 2 * num CPUs
max_parallel_workers = num CPUs
columnar.min_parallel_processes = num CPUs
shared_buffers = 25% of your total available memory
effective_cache_size = 75% of your total available memory
effective_io_concurrency = 100
hash_mem_multiplier = 8
work_mem = 64MB

ClickBench Report

You can see the results for yourself to review the queries, methodology, and data size.

Screenshot of ClickBench results

Conclusion and Future Work

With Hydra, it’s never been easier to scale analytics and eliminate data silos with Postgres. This is just the start- the Hydra team will keep investing our resources to drive performance faster. Hydra is open source and 100% compatible with community Postgres. You can test this out for yourself using hydra docker-compose!

Starting today, we’re excited to extend 14-day free trials of Hydra’s managed cloud databases! Schedule time with our team to unlock your free trial.

We’d love your support, GH stars, and contributions to Hydra open source. Join our Discord channel to share your test results and ideas for future enhancements. Together, we can take Hydra to the next level of performance!

Meanwhile, our team is already hard at work to bring the next level of scale and speed to the Hydra data warehouse! Follow us on Twitter, LinkedIn, or Discord to be the first to hear.

References

Get a fully managed data warehouse at cloud scale with no vendor lock-in.