skip to content

Rabbit MQ และการใช้ Message Queue

/ 4 min read

Share on social media

rabbitmq-basic สามารถดู video ของหัวข้อนี้ก่อนได้ ดู video

Message Queue คืออะไร ?

message-queue

Ref: https://www.cloudamqp.com/blog/what-is-message-queuing.html

Message Queue คือตัวที่ทำหน้าที่ในการรับ Message และกระจาย Message ออกไปให้ปลายทางแบบ Asynchronous

โดยไอเดียคือ Message จะถูกเก็บอยู่ใน Queue จนกว่าจะถูกจัดการเสร็จสิ้นไป และ Message ทั้งหมดจะถูก process โดย “consumer เพียงตัวเดียว” เท่านั้น

เป้าหมายของการใช้ Message queue เพื่อลดการ decouple service (ทำให้ service ที่ทำงานต่อเนื่องกัน แยกออกไปจัดการทีละตัวได้)

ไอเดียที่คนส่วนใหญ่ชอบเอาไปเปรียบเทียบคือ ที่ทำการไปรษณีย์ ปกติเวลาเราจะส่งไปรษณีย์ เราจะส่งไปยังที่ทำการไปรษณีย์ โดยเราจะระบุปลายทางของไปรษณีย์เอาไว้ว่าต้องส่งไปที่ไหน โดยมีบุรุษไปรษณีย์ จะคอยเอาจดหมายจากที่ทำการไปรษณีย์ไปทำการส่งให้กลับปลายทางอีกที

เปรียบเทียบเป็น

  • เรา (ผู้ส่ง) คือ Producer
  • ที่ทำการไปรษณีย์ คือ Queue
  • บุรุษไปรษณีย์ คือ Consumer

โจทย์ของ Producer คือ ทำการส่งข้อมูลออกไปให้เร็วที่สุด และ เมื่อข้อมูลเข้า Queue ตัว Manager ก็จะทำการเรียก Consumer มารับ task ไปทำให้ไวที่สุด โดยที่ฝั่ง Producer นั้นไม่จำเป็นต้องรอให้ Consumer ทำเสร็จ ก็สามารถที่จะไปต่อได้เลย

เป้าหมายของการใช้ Message Queue คือการกระจายงานออกไปเพื่อให้สามารถ Scale งานได้เร็วยิ่งขึ้น ฝั่งที่เป็น Producer ก็สามารถตอบกลับได้ไวขึ้น ตัว task งานก็จะมี Consumer ไปรับได้มากขึ้น

RabbitMQ คืออะไร ?

Ref: https://www.rabbitmq.com/

rabbitmq-basic

RabbitMQ เป็น Message Broker ที่ใช้ Advanced Message queuing protocol (AMQP) เพื่อใช้สำหรับติดต่อหากัน โดยมีความสามารถรับและกระจาย Message จาก Producer, Consumer ได้ (เปรียบได้เป็นที่ทำการไปรษณีย์) ที่สามารถ scale และทำงานแบบ asynchronously ได้

key หลักของ RabbitMQ

  • Messaging Protocol ที่สามารถส่งผ่าน AMQP ได้ รวมถึง support กับ protocol อื่นๆอย่าง MQTT ด้วยเช่นกัน
  • มี Feature สำหรับการ Retry ผ่าน Acknowledge feature ที่สามารถบอกได้ว่า Queue ทำสำเร็จแล้วหรือไม่
  • ทำ Scale ได้ สามารถกระจายการทำงานของ Queue ผ่าน Consumer ได้ (อารมณ์ช่วยกันกระจายงานทำให้เสร็จ)
  • สามารถเก็บ queue เป็น Cluster ได้ = ช่วยกระจายการเก็บข้อมูลให้มั่นใจใน message เราได้
  • มี library support พร้อมอยู่แล้วกับ client หลายๆภาษา

แล้วมันต่างกับ Kafka ยังไง ?

ถ้ารู้สึกซีเรียสกับลำดับการทำงาน (เป็นลำดับ) จริงๆ RabbitMQ จะตอบโจทย์กว่า แต่ถ้าเน้นการทำงานให้ไว (กระจายงานได้สูงสุด ให้ใกล้เคียงกับ Realtime) Kakfa จะตอบโจทย์กว่า

มาลอง RabbitMQ กัน

โจทย์คือ

  • เราจะสร้าง queue สำหรับสร้าง order ขึ้นมา
  • เราจะให้ Message Queue คอยกวาด order ไปสร้าง
  • โดยเราจะทำการกระจาย Consumer เพื่อให้สร้าง order ได้ไวขึ้น

Dev task

  • สร้าง docker-compose สำหรับ rabbitmq, mysql, phpyadmin
version: "3"
services:
rabbitmq:
image: rabbitmq:management-alpine
tty: true
volumes:
- rabbitmq_data:/var/lib/rabbitmq # Persistent volume for RabbitMQ data
environment:
RABBITMQ_DEFAULT_USER: "mikelopster" # Set default RabbitMQ user
RABBITMQ_DEFAULT_PASS: "password" # Set default RabbitMQ password
ports:
- "15672:15672"
- "5672:5672"
mysql:
image: mysql:latest
container_name: mysql
command: --default-authentication-plugin=mysql_native_password
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: orders
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
phpmyadmin:
image: phpmyadmin/phpmyadmin
container_name: phpmyadmin
environment:
PMA_HOST: mysql
PMA_USER: root
PMA_PASSWORD: rootpassword
MYSQL_ROOT_PASSWORD: rootpassword
ports:
- "8080:80"
depends_on:
- mysql
volumes:
rabbitmq_data:
driver: local
mysql_data:
driver: local
  • package.json
{
"name": "queue-basic",
"version": "1.0.0",
"description": "",
"main": "consumer.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"amqplib": "^0.10.3",
"mysql": "^2.18.1",
"uuid": "^9.0.1"
}
}

Project structure ในรอบนี้ จะมีไฟล์ที่เกี่ยวข้องเพียงแค่ 2 files คือ

  • producer.js สำหรับส่งข้อมูล order เข้า Queue
  • consumer.js สำหรับรับข้อมูล order จาก Queue และเขียนลง mysql
.
├── consumer.js --> สำหรับ consumer ที่รับข้อมูลจาก Queue
├── docker-compose.yml
├── package.json
└── producer.js --> สำหรับ producer ที่ส่งข้อมูลเข้า Queue

producer.js เขียน node ต่อเข้า rabbitmq

const amqp = require("amqplib");
const { v4: uuidv4 } = require("uuid");
async function sendOrder(order) {
const connection = await amqp.connect("amqp://mikelopster:password@localhost:5672");
const channel = await connection.createChannel();
const queue = "orders-new";
// เขียนลง disk เอาไว้ กรณีที่ queue ดับ
await channel.assertQueue(queue, { durable: true });
// ใส่ persistent + durable จะได้ข้อมูล queue เดิมออกมาได้
channel.sendToQueue(queue, Buffer.from(JSON.stringify(order)), { persistent: true });
console.log(" [x] Sent %s", order);
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
}
const order = {
orderNumber: uuidv4(),
product: "apple",
quantity: 10,
};
sendOrder(order);

consumer.js เขียน node รับจาก rabbitmq และเขียนต่อลง database

const amqp = require("amqplib");
const mysql = require("mysql");
const connection = mysql.createConnection({
host: "localhost",
user: "root",
password: "rootpassword",
database: "orders",
});
connection.connect();
const sleep = (milliseconds) => {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
};
async function receiveOrders() {
const conn = await amqp.connect("amqp://mikelopster:password@localhost:5672");
const channel = await conn.createChannel();
const queue = "orders-new";
await channel.assertQueue(queue, { durable: true });
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.prefetch(1);
channel.consume(queue, async (msg) => {
try {
const order = JSON.parse(msg.content.toString());
console.log(" [x] Received %s", order);
await sleep(10000);
const sql = "INSERT INTO orders SET ?";
connection.query(sql, order, (error, results) => {
if (error) throw error;
console.log("Order saved to database with id: " + results.insertId);
});
// บอกว่าได้ message แล้ว
channel.ack(msg);
} catch (error) {
console.log("Error:", error.message);
}
});
}
receiveOrders();

สามารถดู monitor ผ่าน localhost ของ rabbitmq ได้ มีหน้าจัดการให้แล้วเรียบร้อย

http://localhost:15672/#/
rabbitmq-monitor

Note เพิ่มเติม (จากใน video)

  • มีการแชร์เคส ack กรณีที่เกิด operation บางอย่างแล้วต้องหน่วงเวลาก่อนเขียนเข้า order
  • มีเคสการเพิ่ม prefetch เพื่อจำกัดการรับข้อมูลของ Consumer https://www.rabbitmq.com/consumer-prefetch.html

เราควรใช้ RabbitMQ (หรือ Message Queue) กับเคสไหนบ้าง

Ref: https://www.rabbitmq.com/getstarted.html

use case ที่มีโอกาสจะได้ใช้

  1. Order processing = ตามเคสในหัวข้อนี้ ขั้นตอนจริง order มีหลายขั้นตอน ดังนั้นการใช้ queue เพื่อให้ช่วย process ต่อได้
  2. Notification = สามารถ ทำเป็น queue เพื่อให้ส่ง notification ทีหลังได้ (ให้ process อื่นจัดการให้เสร็จก่อนจะส่ง Notification แยกได้)
  3. Email = service ทำเสร็จค่อยส่งเข้า queue เพื่อเรียงลำดับการส่ง email (คล้ายๆ Notification)

** อะไรที่เกี่ยวกับ Third party ที่มีเรื่องของ limitation การใช้งาน = Message queue จะ manage เรื่องนี้ได้ดี

  • เพื่อควบคุมปริมาณการส่งในแต่ละช่วงเวลา
  • เพื่อให้แน่ใจว่าการส่งสำเร็จ ก่อนที่เราจะให้ queue เลื่อนต่อไป
Related Post

Share on social media