Building a realtime database listener: Using socket.io and pg_notify

#socket

#postgreSQL

#pg_notify

#prisma

#express

Thumbnail

Introduction

Real-time data updates have become essential for modern web applications. Whether you're working on collaborative tools, financial dashboards, or live notifications, ensuring instant data updates is crucial.

In this guide, we will explore how to build a real-time database listener using Socket.IO and pg_notify in PostgreSQL. Additionally, we will ensure that referenced values update dynamically whenever a table changes.

What You'll Learn

By the end of this guide, you will know how to:

  • Use PostgreSQL triggers and pg_notify to broadcast database changes.
  • Set up a Node.js server with Express.js and Socket.IO to listen for those events.
  • Ensure referenced values are dynamically updated when a table changes.

Why Use Socket.IO and pg_notify?

Socket.IO

Socket.IO is a JavaScript library that enables real-time, bidirectional, and event-based communication between the browser and server. It is widely used in:

  • Real-time dashboards
  • Chat applications
  • Collaborative tools

Using Socket.IO, we can instantly broadcast database updates to connected clients whenever an event occurs.

pg_notify in PostgreSQL

pg_notify is a built-in PostgreSQL function that sends asynchronous notifications when a database event (INSERT, UPDATE, DELETE) occurs. Instead of polling the database for changes, pg_notify helps us broadcast updates instantly and efficiently.


Architecture Overview

In this guide, we will build a real-time database listener using Socket.IO and pg_notify in PostgreSQL. Additionally, we will ensure that referenced values update dynamically whenever a table changes.

This architecture enables real-time database event notifications using PostgreSQL triggers, pg_notify, Node.js (Socket.IO), and a web client. When a table changes (INSERT, UPDATE, DELETE), a trigger fires pg_notify(), which the backend server listens to, processes, and emits via Socket.IO to connected clients. The frontend receives these events and updates the UI dynamically.

Architecture


Step 1: Setting Up the Database Schema and Triggers

1.1 Modeling the Database

For this example, we'll define two tables: User and Book. The User table represents authors, and the Book table stores books associated with users.

schema.prisma
model User {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

name String
phoneNumber String @map("phone_number")
email String @unique
books Book[]

@@map("app_user")
}

model Book {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

title String
ISBN String
description String

author User @relation(fields: [userId], references: [id])
userId String

@@unique([title, userId])
@@map("book")
}

1.2 Creating Database Triggers

To capture changes in our tables and send notifications, we will create a trigger function in SQL. This function will:

  • Capture the event (INSERT, UPDATE, DELETE).
  • Collect data from the affected row.
  • Send a notification using pg_notify.
trigger.sql
CREATE OR REPLACE FUNCTION public.notify_changes()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
payload JSON;
referencing_table RECORD;
referencing_query TEXT;
affected_rows JSON;
referenced_value TEXT;
BEGIN
-- Capture the changed row
IF (TG_OP = 'DELETE') THEN
payload := row_to_json(OLD);
ELSE
payload := row_to_json(NEW);
END IF;

-- Notify PostgreSQL
PERFORM pg_notify('new_event',
json_build_object(
'table', TG_TABLE_NAME,
'operation', TG_OP,
'data', payload
)::text
);

-- Handle referenced values in foreign key relationships
FOR referencing_table IN
SELECT
tc.table_name AS referencing_table,
kcu.column_name AS referencing_column,
ccu.table_name AS referenced_table
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE
tc.constraint_type = 'FOREIGN KEY'
AND ccu.table_name = TG_TABLE_NAME
LOOP
referencing_query := FORMAT(
'SELECT json_agg(row_to_json(t)) FROM %I t WHERE t.%I = $1',
referencing_table.referencing_table,
referencing_table.referencing_column
);

EXECUTE referencing_query INTO affected_rows USING referenced_value;

IF affected_rows IS NOT NULL THEN
PERFORM pg_notify('new_event',
json_build_object(
'table', referencing_table.referencing_table,
'operation', TG_OP || '(via reference)',
'affected_rows', affected_rows
)::text
);
END IF;
END LOOP;

RETURN NEW;
END;
$$;

Here, We are creating a sql function notify_changes to notify our databse listener about the database events happening in our database, The required parameters are declared under the declare section. We will also handle referenced values in other tables through foreign key relation.

Code Explanation

  1. Iterating With For Loop:
  • The SELECT query inside the FOR loop retrieves all tables (referencing_table) and columns (referencing_column) that have a foreign key constraint referencing the table that triggered the trigger (TG_TABLE_NAME).
  1. Dynamic Query Construction:
  • For each referencing table, a dynamic SQL query is constructed using the FORMAT function. This query selects all rows from the referencing table where the referencing column matches the value of the row being modified (referenced_value).
  • The json_agg(row_to_json(t)) function is used to aggregate the results into a JSON array.
  1. Query Execution:
  • The constructed query is executed using EXECUTE, and the result is stored in the affected_rows variable.
  • The USING referenced_value clause passes the value of the row being modified to the query.
  1. Notification:
  • If affected_rows is not NULL (i.e., there are rows in the referencing table that reference the modified row), a notification is sent using pg_notify.

  • The notification includes:

    • The name of the referencing table (referencing_table.referencing_table).
    • The operation that triggered the notification (TG_OP || '(via reference)').
    • The affected rows (affected_rows).

1.3 Applying Triggers to All Tables

Now, we will create a trigger named notification_trigger in each table existing in the database. At first we will remove triggers if there any exists in table with the same name to avoid conflicts. And then execute a dynamic query that execute the function notify_changes() whenever INSERT, UPDATE or DELETE, event happens inside the table. :

trigger.sql
DO $$
DECLARE
table_rec RECORD;
BEGIN
FOR table_rec IN
SELECT tablename FROM pg_tables WHERE schemaname = 'public'
LOOP
EXECUTE format(
'DROP TRIGGER IF EXISTS notification_trigger ON %I',
table_rec.tablename
);

EXECUTE format(
'CREATE TRIGGER notification_trigger
AFTER INSERT OR UPDATE OR DELETE ON %I
FOR EACH ROW
EXECUTE FUNCTION notify_changes()',
table_rec.tablename
);
END LOOP;
END $$;


1.4 Applying trigger to your database

Run the following command to apply the SQL script:

psql -U your_username -d your_database -f trigger.sql

If you are using docker, copy the trigger file into your docker container and run the command:

docker cp trigger.sql your_container_name:/path/to/trigger.sql
docker exec your_container_name psql -U your_username -d your_database -f /path/to/trigger.sql

Note: You can modify the commands to suit your needs.

Step 2: Setting Up the Node.js Server

2.1 Install Dependencies

Run the following command to install required dependencies:

npm install express socket.io pg

2.2 Listening for PostgreSQL Notifications

We will set up a Node.js server to listen for notifications from PostgreSQL.

server.ts
const { Client } = require("pg");
const express = require("express");
const http = require("http");
const { Server } = require("socket.io");

const app = express();
const server = http.createServer(app);
const io = new Server(server);

const client = new Client({
connectionString: "your_postgres_connection_string",
});

(async () => {
await client.connect();
await client.query("LISTEN new_event");

client.on("notification", (msg) => {
console.log("Incoming notification:", msg.payload);
const payload = JSON.parse(msg.payload ?? "{}");
io.emit(payload.table, { action: payload.operation, data: payload.data });
});
})();

server.listen(3000, () => console.log("Server running on port 3000"));

Demo

You can now start your server, run your database queries to define triggers on your target database and start listening for events. With things in place and proper configuration you can see your table changes in real time like in this video.

Happy Learning.

Pros and Cons

Pros

  • Real-time updates without polling.
  • Automated updates across related tables.
  • Enables real-time UI updates for connected clients.

Cons

  • Requires PostgreSQL setup and configuration.
  • May require additional server resources.
  • Limited to PostgreSQL and Socket.IO.

Conclusion

By combining PostgreSQL triggers, pg_notify, and Socket.IO, we have built a real-time database listener that dynamically updates both direct and referenced values whenever a table changes.

Key Takeaways

  • pg_notify eliminates inefficient polling.
  • Triggers automate updates across related tables.
  • Socket.IO enables real-time UI updates for connected clients.

References:

PostgreSQL Docs: https://www.postgresql.org/docs/current/sql-notify.html
Socket.io docs: https://socket.io/docs/v4/server-initialization/

Explore the complete project on GitHub: trigger-io





Made w. 🤍 by Dikshyanta Aryal