Executing Citus Across Nodes for PostgreSQL

Welcome back to our Citus blog series! So far in this series, we have installed and configured the Citus extension for PostgreSQL. Now that we are up and running, it is time to distribute some data.

Let’s begin by inserting some sample data for demonstration purposes. First, we need a table in which to put the data. On the coordinator node, let’s enter our demo database, in which we have loaded the Citus extension:

sudo -iu postgres psql demo

Now that we are in the demo database, let’s create our table.

CREATE TABLE stores (
store_id SERIAL NOT NULL,
address VARCHAR(255),
city VARCHAR(255),
state CHAR(2),
zip INT,
PRIMARY KEY(store_id)
);

Now, insert some data into our new table:

INSERT INTO stores (address, city, state, zip)
VALUES ('123 Test Dr.', 'Narnia', 'CA',56789);
INSERT INTO stores (address, city, state, zip)
VALUES ('456 Pickup Ave.', 'Tiny Town', 'BZ',13579);
INSERT INTO stores (address, city, state, zip)
VALUES ('789 Sesame St.', 'Generic', 'AP',12345);
INSERT INTO stores (address, city, state, zip)
VALUES ('321 Barber Blvd.', 'Anywhere', 'AP',12345);

Before we distribute the data, let’s confirm that there is no data on either worker node. Let’s run this command within the demo database on each worker node:

SELECT * FROM stores;

Since we have not created the stores table in the demo database on either worker, it will return:

ERROR: relation "stores" does not exist

Our table has data on the coordinator, but we need to distributed the data to the worker nodes. To do so, we will need to tell Citus which table we will be distributing and on which distribution key it will be distributed. The distribution key determines to what nodes Citus will distribute the shards created by the following command. We will create a distributed table from the stores table, and store_id will be the distribution key:

SELECT create_distributed_table('stores', 'store_id');

We get output to the screen that looks like this:

NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run:
SELECT truncate_local_data_after_distributing_table($$public.stores$$)
create_distributed_table
--------------------------

(1 row)

This output tells us that Citus has successfully copied the data, but that the data remains on the disk (although hidden). It is suggested that we remove the local data from the coordinator node, since the coordinator node ideally contains only metadata and no production data. Before doing so, let’s check whether we can access the data from the coordinator and each worker node with:

SELECT * FROM stores;

We should get the output of data we previously inserted:

store_id | address | city | state | zip
----------+------------------+-----------+-------+-------
1 | 123 Test Dr. | Narnia | CA | 56789
2 | 456 Pickup Ave. | Tiny Town | BZ | 13579
3 | 789 Sesame St. | Generic | AP | 12345
4 | 321 Barber Blvd. | Anywhere | AP | 12345
(4 rows)

Now that we have confirmed we can access the data from each node, we should remove the data from local data as suggested. We are given an option to remove the local data from the coordinator node with:

SELECT truncate_local_data_after_distributing_table($$public.stores$$);

Another option to drain the coordinator node is:

SELECT citus_drain_node('10.1.1.1', 5432);

Since we are able to view the data from each worker node. It appears as though we have distributed the data, but how do we know for sure. Thankfully, the coordinator node has tables and views that provide information about the data across the nodes. One such view will show us where each shard is located, what kind of table it belongs to, and its size. Let’s view this information with:

SELECT * FROM citus_shards;

It should return something that looks like this:

table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
------------+---------+---------------+------------------+---------------+------------+----------+------------
stores | 102008 | stores_102008 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102009 | stores_102009 | distributed | 1 | 10.2.2.2 | 5432 | 8192
stores | 102010 | stores_102010 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102011 | stores_102011 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102012 | stores_102012 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102013 | stores_102013 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102014 | stores_102014 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102015 | stores_102015 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102016 | stores_102016 | distributed | 1 | 10.1.1.1 | 5432 | 8192
stores | 102017 | stores_102017 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102018 | stores_102018 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102019 | stores_102019 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102020 | stores_102020 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102021 | stores_102021 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102022 | stores_102022 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102023 | stores_102023 | distributed | 1 | 10.2.2.2 | 5432 | 8192
stores | 102024 | stores_102024 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102025 | stores_102025 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102026 | stores_102026 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102027 | stores_102027 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102028 | stores_102028 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102029 | stores_102029 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102030 | stores_102030 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102031 | stores_102031 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102032 | stores_102032 | distributed | 1 | 10.1.1.1 | 5432 | 8192
stores | 102033 | stores_102033 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102034 | stores_102034 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102035 | stores_102035 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102036 | stores_102036 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102037 | stores_102037 | distributed | 1 | 10.2.2.2 | 5432 | 0
stores | 102038 | stores_102038 | distributed | 1 | 10.1.1.1 | 5432 | 0
stores | 102039 | stores_102039 | distributed | 1 | 10.2.2.2 | 5432 | 0
(32 rows)

So, what are we looking at? Let’s break down the view:

  • This query has returned 32 rows, indicating that we have 32 shards of data. This is the default number of shards produced when creating a distributed table unless otherwise specified.
  • Each shardid corresponds with the shard_name for identification
  • The citus_table_type column indicates what type of table the shard belongs to, which is the distributed table stores, indicated by the table_name column
  • The colocation_id column indicates which shards belong to the same group. The value is 1 for each row, so they are placed together.
  • As you can see, the nodename column lists the IP addresses of the worker nodes we designated during configuration.
  • The shard_size column shows the size of the shard. You can see that two shards with data reside on one worker node, while the two other shards reside on the other worker node, with even distribution of the shards.

Summary

And there you have it! We have used Citus to distribute data across multiple nodes. Citus has made distributed tables into a turn-key solution for PostgreSQL. Tune in for the next Citus blog post where we will add a node to the cluster!