Maintenance of a Multi-Database Citus Cluster

This guide is designated for database administrators (DBA) who manages an on-premise cluster of PostgreSQL nodes with Citus, an extension for PostgreSQL for horizontal scalability and columnar storage.

Every DBA at some point reaches a threshold when manual maintenance of a cluster becomes an arduous chore, and the necessity of some automated solution becomes more and more apparent. Here will be discussed an example of such an automated solution.

Setup

We use the following Citus cluster for our analytics:

Every database in the cluster contains its own unique set of tables and views, including materialized, permissions and configuration parameters. 

Philosophy 

During the process of automation, we came to the following set of traits that the automated solution should possess:

Implementation

Foundation

In order to address those principles, we decided to employ the Flyway database migration tool. It allows incremental evolution of database schema via versioned migration scripts. 

Migration scripts are stored in a Git repository with a CD mechanism configured to run the migration on every commit. This way, in order to apply a change to a cluster, DBA should create a commit with migration SQL script (scripts) and push it.

This setup is quite common and suitable even for vanilla PostgreSQL, but with Citus, there are nuances: some commands should be executed on all nodes in the cluster, and sometimes, on the specific node. Luckily, there are enough mechanisms in PostgreSQL and Citus to cover almost every use case.

Single Point of Maintenance

In order to perform maintenance of the databases in a cluster, it is preferable to create a dedicated database. In further examples, it will be referred to as maintenance. It is a convenient place for maintenance-related extensions and functions, but most important is that it holds Flyway's history table, which reflects the evolution of all databases in the cluster instead of having separate history tables in each database.

Migration scripts that will be executed on the maintenance should have the ability to create other databases, as well as execute SQL on them. That's where the dblink extension comes in: it allows you to connect to any other PostgreSQL server, including localhost, and execute arbitrary SQL there. Having that in mind, that's what a migration script that creates a database with Citus extension looks like:

SQL
 
CREATE DATABASE new_citus_database;
SELECT DBLINK_EXEC('dbname=new_citus_database user=postgres', $remote$
CREATE EXTENSION citus;
$remote$);


One thing to note: databases can not be created in a transaction, so it is necessary to disable it for this migration via a script configuration file.

It is not enough to create the Citus extension on the fresh database. In order for Citus to work, according to the docs, it is necessary to:

  1. Create databases with the same name on worker nodes.
  2. Create the Citus extension on those databases.
  3. Invoke citus_add_node() with an address of every worker node in the cluster.

It is cumbersome since it requires a manual connection to workers or a dedicated Ansible playbook. Luckily, the maintenance database already contains all the tools necessary to do this from an SQL script:

SQL
 
-- Create database on every worker
SELECT run_command_on_workers($cmd$CREATE DATABASE new_citus_database;$cmd$);

-- Connect to the fresh database on worker nodes and create the Citus extension
WITH citus_workers AS (SELECT node_name FROM citus_get_active_worker_nodes())
SELECT DBLINK_EXEC(FORMAT('host=%s dbname=new_citus_database user=postgres', node_name), $remote$
CREATE EXTENSION citus;
$remote$)
FROM citus_workers;

-- Add workers to the fresh database on the coordinator
WITH citus_workers AS (SELECT node_name FROM citus_get_active_worker_nodes() ORDER BY node_name)
SELECT DBLINK_EXEC('dbname=new_citus_database user=postgres', format($remote$
    START TRANSACTION;                                                                 
    SELECT citus_add_node('%s', 5432);
    COMMIT ;
$remote$, node_name))
FROM citus_workers;


Here, DBLINK_EXEC is used to connect to all worker nodes in the cluster, as well as the coordinator. For the SELECT statements, which are not supported by DBLINK_EXEC, there is a workaround START TRANSACTION; ... COMMIT; statements.

Configuration of the newly created database can be performed in a similar fashion:

SQL
 
ALTER DATABASE new_citus_database SET WORK_MEM = '256MB';
SELECT run_command_on_workers($cmd$
ALTER DATABASE new_citus_database SET WORK_MEM = '256MB';
    $cmd$);


As well as the creation of roles, granting permissions, and any other statement.

Maintenance of Multiple Databases

A similar approach is used to manage a few databases within one script. For example, let's assume that we have created another database, another_citus_database, and it is necessary to create the same table and view there. It could be easily achieved using CTE:

SQL
 
WITH databases AS (SELECT *
                   FROM (VALUES ('new_citus_database'),
                                ('another_citus_database')) AS t(db_name))
SELECT DBLINK_EXEC(FORMAT('dbname=%I user=postgres', db_name), $remote$
START TRANSACTION;
CREATE TABLE test_table
    (user_id TEXT, data jsonb);
SELECT create_distributed_table('test_table', 'user_id');
CREATE VIEW test_table_view AS SELECT * FROM test_table;
COMMIT;
$remote$)
FROM databases;


In practice, the creation of view should be extracted into a special repeatable script.

Instead of CTEs, it is possible and preferable to create utility PL/SQL functions. For example, when on the same instance there are databases with and without the Citus extension installed, it would be convenient to have a function to run SQL statements only on the databases with Citus. An example of such a function can look like this:

SQL
 
CREATE OR REPLACE PROCEDURE public.execute_on_databases_with_citus(statement TEXT)
    LANGUAGE plpgsql AS
$$
DECLARE
    db_name TEXT;
BEGIN
    FOREACH db_name IN ARRAY (SELECT ARRAY_AGG(datname)
                              FROM pg_database
                              WHERE EXISTS(SELECT *
                                           FROM DBLINK(FORMAT('dbname=%s', datname),
                                                       $cmd$SELECT TRUE FROM pg_extension WHERE extname = 'citus'$cmd$) AS t(citus_installed BOOLEAN))
                                AND datname NOT IN ('template0', 'template1'))
        LOOP
            RAISE NOTICE 'EXECUTING ON %', db_name;
            EXECUTE FORMAT('SELECT * FROM dblink_exec(''dbname=%s'', $_CMD_$%s$_CMD_$);', db_name,
                           statement);
        END LOOP;
END
$$;


With such a function in place, it would be easy to run ALTER EXTENSION citus UPDATE;; For example:

SQL
 
CALL execute_on_databases_with_citus($cmd$ALTER EXTENSION CITUS UPDATE$cmd$);


The described way of administration is very flexible and allows a DBA to implement every piece of logic needed to achieve a smooth administration experience.

Things To Note

Depending on your setup, it may be necessary to configure the .pgpass file in order to be able to connect to worker nodes via dblink. Historically it was done as part of Citus security configuration, but with the release of Citus 11, it has changed.

Putting All Together

Let's put all the steps described earlier into migration scripts. The sequence of migration scripts could look like the following on a disk:

Shell
 
.
└── db
    └── migration
        ├── R__test_table_view.sql
        ├── V1__init.sql
        ├── V2.0__create_new_citus_database.sql
        ├── V2.0__create_new_citus_database.sql.conf
        ├── V2.1__new_citus_database_configuration.sql
        ├── V3__another_citus_database.sql
        ├── V3__another_citus_database.sql.conf
        ├── V4__no_citus_database.sql
        ├── V5__common_table.sql
        └── V6__update_citus_extension.sql


With such a structure in place, it is possible now to invoke flyway migrate if you are using the CLI tool, or ./gradlew flywayMigrate -i if you prefer the Gradle plugin, as we do. Push it into a Git and configure your favorite CI/CD tool, like GitLab or GitHub Actions, and you'll have the solution with desired traits.

Drawbacks

The described approach has one serious limitation: due to the nature of DBLINK_EXEC,multi-database statements are non-transactional. It requires migration scripts to be idempotent in some way: either via IF NOT EXISTS kinds of clauses in Data Manipulation Language (DML) statements or via recreating objects via DROP. It could be a little trickier to implement with Citus objects, but there almost always exists a workaround. For example, the creation of a table could be made idempotent like this:

SQL
 
WITH databases AS (SELECT *
                   FROM (VALUES ('new_citus_database'),
                                ('another_citus_database')) AS t(db_name))
SELECT DBLINK_EXEC(FORMAT('dbname=%I user=postgres', db_name), $remote$
START TRANSACTION;
CREATE TABLE IF NOT EXISTS test_table (user_id TEXT, data jsonb);
DO $$
BEGIN
EXECUTE $cmd$SELECT create_distributed_table('test_table', 'user_id');$cmd$;
EXCEPTION
WHEN SQLSTATE '42P16' THEN
    RETURN;
END;$$;
COMMIT;
$remote$)
FROM databases;


Summary

This guide has demonstrated basic principles and tools to achieve the best administration experience with Citus clusters. Capabilities of the Flyway tool combined with abilities provided by the Citus, dblink, and PL/pgSQL allow a DBA to manage clusters of every scale with ease.
An executable example of the setup, described in the post, could be found here.

 

 

 

 

Top