# ลองใช้ PostgreSQL แทน Redis
URL: /posts/redis-to-postgres
Source: https://raw.githubusercontent.com/codepunk000/blog/refs/heads/main/content/blog/redis-to-postgres.mdx
Tags: postgresql, redis, database, system-design

บทความนี้จะเล่าถึงการทดลองใช้ PostgreSQL แทน Redis ไม่ว่าจะเป็นการทำ Cache, Pub/Sub หรือ Message Queue



บทนำ [#บทนำ]

อย่างที่เรารู้กันว่า **Redis** เป็น **In-memory data structure store** ที่นิยมใช้กัน ไม่ว่าจะเป็นการทำ **Cache**, **Pub/Sub** หรือ **Message Queue** แต่บทความนี้จะเล่าถึงการทดลองใช้ **PostgreSQL** ที่เป็น **persistent relational database** ที่มีความ **feature-rich** มากๆ แทน **Redis** กัน โดยใจบทความนี้จะใช้ **Typescript** เป็นภาษา หลักในการเขียนตัวอย่างโค๊ดเพราะคิดว่าน่าจะเป็นภาษาที่คนส่วนใหญ่ใช้กันมากที่สุด (แต่เจ้าตัวเขียน **Go** กับ **Rust** เป็นหลักนะ 😂)

***

Cache [#cache]

Cache เล่าเร็วๆ คือการเก็บข้อมูลที่เข้าถึงบ่อยๆ ไว้ในที่ที่เข้าถึงได้เร็วกว่า เช่น เก็บไว้ใน In-memory ด้วย Redis แทนที่จะไปดึงจาก Database ทุกครั้ง ซึ่งจะช่วยลด Latency และ Load ของ Database ลงได้มาก

ตัวอย่างฝั่ง Redis [#ตัวอย่างฝั่ง-redis]

```typescript
import Redis from 'ioredis';

const redis = new Redis();

redis.set('key', 'value', 'EX', 3600); 
const value = await redis.get('key');

console.log(value);
```

ตัวอย่างฝั่ง PostgreSQL [#ตัวอย่างฝั่ง-postgresql]

<div className="fd-steps">
  <div className="fd-step">
    สร้าง Table สำหรับ Cache [#1-สร้าง-table-สำหรับ-cache]

    ```sql
    CREATE UNLOGGED TABLE cache (
      key TEXT PRIMARY KEY,
      value TEXT,
      expires_at TIMESTAMPTZ
    );

    CREATE INDEX idx_cache_expires_at ON cache (expires_at);
    ```

    UNLOGGED Table คืออะไร? [#unlogged-table-คืออะไร]

    `UNLOGGED TABLE` เป็น table ที่ไม่บันทึก log ของการเปลี่ยนแปลงข้อมูลลงใน **Write-Ahead Log (WAL)** ทำให้การเขียนข้อมูลเร็วขึ้นมาก แลกกับความเสี่ยงที่ข้อมูลอาจหายได้ถ้าเกิด crash ดังนั้น table แบบนี้เหมาะกับการเก็บพวก Temporary data, session หรือข้อมูลที่ไม่สำคัญมากนัก
  </div>

  <div className="fd-step">
    Function สำหรับ Set และ Get [#2-function-สำหรับ-set-และ-get]

    ```typescript
    import { Pool } from 'pg';

    const pool = new Pool();

    async function setCache(key: string, value: string, ttl: number) {
      const expiresAt = new Date(Date.now() + ttl * 1000);

      await pool.query(
        `INSERT INTO cache (key, value, expires_at) 
            VALUES ($1, $2, $3) 
            ON CONFLICT (key) 
              DO UPDATE 
                SET value = EXCLUDED.value, 
                expires_at = EXCLUDED.expires_at`,
        [key, value, expiresAt]
      );
    }

    async function getCache(key: string) {
      const res = await pool.query(
        'SELECT value FROM cache WHERE key = $1 AND expires_at > NOW()',
        [key]
      );

      return res.rows[0]?.value || null;
    }
    ```
  </div>

  <div className="fd-step">
    Cleanup Job สำหรับลบ Cache ที่หมดอายุแล้ว [#3-cleanup-job-สำหรับลบ-cache-ที่หมดอายุแล้ว]

    ```typescript
    setInterval(async () => {
      await pool.query('DELETE FROM cache WHERE expires_at <= NOW()');
    }, 60 * 1000);
    ```
  </div>
</div>



***

Pub/Sub [#pubsub]

ส่วน **Pub/Sub** เป็น tool ที่ช่วยในการทำ Inter-service communication โดยที่ Publisher จะส่ง message ไปยัง channel แล้ว Subscriber ที่ subscribe channel นั้นๆ ก็จะได้รับ message ไปทำงานต่อ

ตัวอย่างฝั่ง Redis [#ตัวอย่างฝั่ง-redis-1]

Publisher [#publisher]

```typescript
import Redis from 'ioredis';

const redis = new Redis();

redis.publish('channel', 'message');
```

Subscriber [#subscriber]

```typescript
import Redis from 'ioredis';

const redis = new Redis();

redis.subscribe('channel', (err, count) => {
  if (err) {
    console.error(err);
    return;
  }

  console.log(`Subscribed to ${count} channel(s).`);
});

redis.on('message', (channel, message) => {
  console.log(`Received message from ${channel}: ${message}`);
});
```

ตัวอย่างฝั่ง PostgreSQL [#ตัวอย่างฝั่ง-postgresql-1]

Publisher [#publisher-1]

```typescript
import { Pool } from 'pg';

const pool = new Pool();

async function publish(channel: string, message: string) {
  await pool.query('NOTIFY $1, $2', [channel, message]);
}
```

Subscriber [#subscriber-1]

```typescript
import { Pool } from 'pg';

const pool = new Pool();

async function subscribe(channel: string) {
  const client = await pool.connect();

  await client.query(`LISTEN ${channel}`);

  client.on('notification', (msg) => {
    console.log(`Received message from ${msg.channel}: ${msg.payload}`);
  });
}
```

***

Message Queue [#message-queue]

**Message Queue** ก็เป็น **tool** ที่ช่วยในการทำ **Inter-service communication** เหมือน **Pub/Sub** แต่จะมี **pattern** ต่างกันตรงที่ **Message Queue** ทำงานแบบ `FIFO` และจะมี **Consumer** 1 ตัวหรือหลายตัวก็ได้เพื่อมาคอยดึง **message** ไปทำงานต่อ

ตัวอย่างฝั่ง Redis [#ตัวอย่างฝั่ง-redis-2]

Producer [#producer]

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('emails');

await queue.add('welcome-email', { to: 'user@example.com' });
```

Consumer [#consumer]

```typescript
import { Worker } from 'bullmq';

new Worker('emails', async (job) => {
  console.log(`Sending email to ${job.data.to}`);
});
```

ตัวอย่างฝั่ง PostgreSQL [#ตัวอย่างฝั่ง-postgresql-2]

<div className="fd-steps">
  <div className="fd-step">
    สร้าง Table สำหรับ Message Queue [#1-สร้าง-table-สำหรับ-message-queue]

    ```sql
    CREATE TABLE jobs (
      id BIGSERIAL PRIMARY KEY,
      queue TEXT NOT NULL,
      payload JSONB NOT NULL,
      attempts INT DEFAULT 0,
      max_attempts INT DEFAULT 3,
      scheduled_at TIMESTAMPTZ DEFAULT NOW(),
      created_at TIMESTAMPTZ DEFAULT NOW()
    );

    CREATE INDEX idx_jobs_queue ON jobs (queue, scheduled_at)
    WHERE attempts < max_attempts;
    ```
  </div>

  <div className="fd-step">
    สร้าง enqueue function สำหรับยัด Job เข้าไปใน Queue [#2-สร้าง-enqueue-function-สำหรับยัด-job-เข้าไปใน-queue]

    ```typescript
    import { Pool } from 'pg';

    const pool = new Pool();

    async function enqueue(queue: string, payload: Record<string, unknown>) {
      await pool.query('INSERT INTO jobs (queue, payload) VALUES ($1, $2)', [
        queue,
        payload,
      ]);
    }
    ```
  </div>

  <div className="fd-step">
    สร้าง consume function เพื่อมา consume Job จาก Queue [#3-สร้าง-consume-function-เพื่อมา-consume-job-จาก-queue]

    ใช้ `FOR UPDATE SKIP LOCKED` เพื่อให้แน่ใจว่าแต่ละ Job จะถูก consume โดย Worker แค่ตัวเดียว และถ้าเกิด Worker ตัวไหนล่มระหว่างการทำงาน Job ที่ถูกล็อคไว้ก็จะไม่ถูกปล่อยให้ Worker ตัวอื่นมาทำงานต่อจนกว่าจะหมดเวลาล็อค (ซึ่งจะช่วยลดปัญหา Job หายไปได้มาก)

    ```typescript
    async function consume(queue: string) {
      while (true) {
        const res = await pool.query(
          `WITH next_job AS (
              SELECT id FROM jobs
              WHERE queue = $1
                AND attempts < max_attempts
                AND scheduled_at <= NOW()
              ORDER BY scheduled_at
              LIMIT 1
              FOR UPDATE SKIP LOCKED
            )
            UPDATE jobs
              SET attempts = attempts + 1
              FROM next_job
              WHERE jobs.id = next_job.id
            RETURNING jobs.*`,
          [queue]
        );

        if (res.rows.length > 0) {
          const job = res.rows[0];

          console.log(`Processing job ${job.id}:`, job.payload);

          await pool.query('DELETE FROM jobs WHERE id = $1', [job.id]);
        } else {
          await new Promise((resolve) => setTimeout(resolve, 1000));
        }
      }
    }
    ```

    ***
  </div>
</div>

Summary [#summary]

ผมเลยลองเขียนโค๊ดแบบเล่นๆ เพื่อลองวัด latency แบบเร็วๆ ของทั้ง 3 use case บนเครื่องเดียว (**Apple M3 Max**) เพื่อเทียบ **Redis** กับ **PostgreSQL** ตัวเลขก็จะได้ประมาณนี้ (ค่าเป็น **p50** นะ 5555+)

| Use case      | Redis (ms) | PostgreSQL (ms) | ต่างกัน |
| ------------- | ---------: | --------------: | ------: |
| Cache (อ่าน)  |      0.064 |           0.101 |  \~1.6× |
| Cache (เขียน) |      0.067 |           0.113 |  \~1.7× |
| Pub/Sub       |      0.080 |           0.120 |  \~1.5× |
| Message Queue |      0.159 |           2.135 |   \~13× |

จากที่ลองมาส่วนตัวยังรู้สึกว่า Redis ยังเป็นตัวเลือกที่ดีกว่าในหลายๆ ด้านอยู่ดี ไม่ว่าจะเป็นเรื่องของ

1. Latency เร็วกว่า
2. Throughput เยอะกว่า
3. Complexity & Effort ในการ Implement น้อยกว่า
4. Read เร็วกว่าเพราะเป็น In-memory

และที่สำคัญที่ยังให้ Redis เหนือกว่า PostgreSQL คือถ้าหากมี cache-hit rate สูง Redis จะช่วยลด Load ของ Database ลงได้เยอะ และลดความเสี่ยงที่ Database จะ crash เมื่อ traffic spike ด้วย

ตารางเปรียบเทียบระหว่าง Redis กับ PostgreSQL [#ตารางเปรียบเทียบระหว่าง-redis-กับ-postgresql]

| Feature             | Redis    | PostgreSQL |
| ------------------- | -------- | ---------- |
| Latency             | ต่ำกว่า  | สูงกว่า    |
| Throughput          | สูงกว่า  | ต่ำกว่า    |
| Complexity & Effort | น้อยกว่า | มากกว่า    |
| Cost                | แพงกว่า  | ถูกกว่า    |

DX อย่างนึงที่ผมชอบของ PostgreSQL [#dx-อย่างนึงที่ผมชอบของ-postgresql]

แต่ก็มีความแตกต่างนึงที่ผมรู้สึกชอบ คือเรื่อง **single-transactional** ที่ **PostgreSQL** จะสามารถ **Insert** และ **Cache** ไปใน transaction เดียวกันได้เลยซึ่งจะช่วยลดปัญหาเรื่อง **Cache Inconsistency** ได้มาก ในขณะที่ Redis จะต้องทำ 2 transactions แยกกันซึ่งมีโอกาสเกิดปัญหา **Cache Inconsistency** ถ้าหาก Redis เกิด crash ขึ้นมาพอดี

ตัวอย่าง [#ตัวอย่าง]

สมมติว่า **user** ต้องการ **update profile** ของตัวเอง เราก็จะสามารถอัปเดต **profile** ไปพร้อมๆ กับการเขียน **cache** ใหม่ทับลงไปใน **transaction** เดียวได้เลย ได้ **Cache Invalidation** ไปในตัว

```typescript
import { Pool } from 'pg';

const pool = new Pool();

async function updateProfile(
  userId: string,
  data: { name: string; bio: string }
) {
  const client = await pool.connect();

  try {
    await client.query('BEGIN');

    // 1. อัปเดต profile ซึ่งเป็น source of truth
    const res = await client.query(
      `UPDATE profiles
          SET name = $2, bio = $3, updated_at = NOW()
          WHERE user_id = $1
        RETURNING *`,
      [userId, data.name, data.bio]
    );

    const profile = res.rows[0];

    // 2. เขียน cache ของ profile ใหม่ทับลงไปใน transaction เดียวกัน
    await client.query(
      `INSERT INTO cache (key, value, expires_at) 
          VALUES ($1, $2, $3) 
          ON CONFLICT (key) 
            DO UPDATE 
              SET value = EXCLUDED.value, 
              expires_at = EXCLUDED.expires_at`,
      [
        `user:profile:${userId}`,
        JSON.stringify(profile),
        new Date(Date.now() + 3600 * 1000),
      ]
    );

    await client.query('COMMIT');

    return profile;
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
}
```

***


Last updated on June 22, 2026