Rabbit MQ และการใช้ Message Queue
/ 4 min read
สามารถดู video ของหัวข้อนี้ก่อนได้ ดู video
Message Queue คืออะไร ?

Ref: https://www.cloudamqp.com/blog/what-is-message-queuing.html
Message Queue คือตัวที่ทำหน้าที่ในการรับ Message และกระจาย Message ออกไปให้ปลายทางแบบ Asynchronous
โดยไอเดียคือ Message จะถูกเก็บอยู่ใน Queue จนกว่าจะถูกจัดการเสร็จสิ้นไป และ Message ทั้งหมดจะถูก process โดย “consumer เพียงตัวเดียว” เท่านั้น
เป้าหมายของการใช้ Message queue เพื่อลดการ decouple service (ทำให้ service ที่ทำงานต่อเนื่องกัน แยกออกไปจัดการทีละตัวได้)
ไอเดียที่คนส่วนใหญ่ชอบเอาไปเปรียบเทียบคือ ที่ทำการไปรษณีย์ ปกติเวลาเราจะส่งไปรษณีย์ เราจะส่งไปยังที่ทำการไปรษณีย์ โดยเราจะระบุปลายทางของไปรษณีย์เอาไว้ว่าต้องส่งไปที่ไหน โดยมีบุรุษไปรษณีย์ จะคอยเอาจดหมายจากที่ทำการไปรษณีย์ไปทำการส่งให้กลับปลายทางอีกที
เปรียบเทียบเป็น
- เรา (ผู้ส่ง) คือ Producer
- ที่ทำการไปรษณีย์ คือ Queue
- บุรุษไปรษณีย์ คือ Consumer
โจทย์ของ Producer คือ ทำการส่งข้อมูลออกไปให้เร็วที่สุด และ เมื่อข้อมูลเข้า Queue ตัว Manager ก็จะทำการเรียก Consumer มารับ task ไปทำให้ไวที่สุด โดยที่ฝั่ง Producer นั้นไม่จำเป็นต้องรอให้ Consumer ทำเสร็จ ก็สามารถที่จะไปต่อได้เลย
เป้าหมายของการใช้ Message Queue คือการกระจายงานออกไปเพื่อให้สามารถ Scale งานได้เร็วยิ่งขึ้น ฝั่งที่เป็น Producer ก็สามารถตอบกลับได้ไวขึ้น ตัว task งานก็จะมี Consumer ไปรับได้มากขึ้น
RabbitMQ คืออะไร ?
Ref: https://www.rabbitmq.com/

RabbitMQ เป็น Message Broker ที่ใช้ Advanced Message queuing protocol (AMQP) เพื่อใช้สำหรับติดต่อหากัน โดยมีความสามารถรับและกระจาย Message จาก Producer, Consumer ได้ (เปรียบได้เป็นที่ทำการไปรษณีย์) ที่สามารถ scale และทำงานแบบ asynchronously ได้
key หลักของ RabbitMQ
- Messaging Protocol ที่สามารถส่งผ่าน AMQP ได้ รวมถึง support กับ protocol อื่นๆอย่าง MQTT ด้วยเช่นกัน
- มี Feature สำหรับการ Retry ผ่าน Acknowledge feature ที่สามารถบอกได้ว่า Queue ทำสำเร็จแล้วหรือไม่
- ทำ Scale ได้ สามารถกระจายการทำงานของ Queue ผ่าน Consumer ได้ (อารมณ์ช่วยกันกระจายงานทำให้เสร็จ)
- สามารถเก็บ queue เป็น Cluster ได้ = ช่วยกระจายการเก็บข้อมูลให้มั่นใจใน message เราได้
- มี library support พร้อมอยู่แล้วกับ client หลายๆภาษา
แล้วมันต่างกับ Kafka ยังไง ?
ถ้ารู้สึกซีเรียสกับลำดับการทำงาน (เป็นลำดับ) จริงๆ RabbitMQ จะตอบโจทย์กว่า แต่ถ้าเน้นการทำงานให้ไว (กระจายงานได้สูงสุด ให้ใกล้เคียงกับ Realtime) Kakfa จะตอบโจทย์กว่า
มาลอง RabbitMQ กัน
โจทย์คือ
- เราจะสร้าง queue สำหรับสร้าง order ขึ้นมา
- เราจะให้ Message Queue คอยกวาด order ไปสร้าง
- โดยเราจะทำการกระจาย Consumer เพื่อให้สร้าง order ได้ไวขึ้น
Dev task
- สร้าง docker-compose สำหรับ rabbitmq, mysql, phpyadmin
version: "3"
services: rabbitmq: image: rabbitmq:management-alpine tty: true volumes: - rabbitmq_data:/var/lib/rabbitmq # Persistent volume for RabbitMQ data environment: RABBITMQ_DEFAULT_USER: "mikelopster" # Set default RabbitMQ user RABBITMQ_DEFAULT_PASS: "password" # Set default RabbitMQ password ports: - "15672:15672" - "5672:5672"
mysql: image: mysql:latest container_name: mysql command: --default-authentication-plugin=mysql_native_password environment: MYSQL_ROOT_PASSWORD: rootpassword MYSQL_DATABASE: orders ports: - "3306:3306" volumes: - mysql_data:/var/lib/mysql
phpmyadmin: image: phpmyadmin/phpmyadmin container_name: phpmyadmin environment: PMA_HOST: mysql PMA_USER: root PMA_PASSWORD: rootpassword MYSQL_ROOT_PASSWORD: rootpassword ports: - "8080:80" depends_on: - mysql
volumes: rabbitmq_data: driver: local mysql_data: driver: local
- package.json
{ "name": "queue-basic", "version": "1.0.0", "description": "", "main": "consumer.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "author": "", "license": "ISC", "dependencies": { "amqplib": "^0.10.3", "mysql": "^2.18.1", "uuid": "^9.0.1" }}
Project structure ในรอบนี้ จะมีไฟล์ที่เกี่ยวข้องเพียงแค่ 2 files คือ
producer.js
สำหรับส่งข้อมูล order เข้า Queueconsumer.js
สำหรับรับข้อมูล order จาก Queue และเขียนลง mysql
.├── consumer.js --> สำหรับ consumer ที่รับข้อมูลจาก Queue├── docker-compose.yml├── package.json└── producer.js --> สำหรับ producer ที่ส่งข้อมูลเข้า Queue
producer.js
เขียน node ต่อเข้า rabbitmq
const amqp = require("amqplib");const { v4: uuidv4 } = require("uuid");
async function sendOrder(order) { const connection = await amqp.connect("amqp://mikelopster:password@localhost:5672"); const channel = await connection.createChannel();
const queue = "orders-new";
// เขียนลง disk เอาไว้ กรณีที่ queue ดับ await channel.assertQueue(queue, { durable: true });
// ใส่ persistent + durable จะได้ข้อมูล queue เดิมออกมาได้ channel.sendToQueue(queue, Buffer.from(JSON.stringify(order)), { persistent: true });
console.log(" [x] Sent %s", order);
setTimeout(() => { connection.close(); process.exit(0); }, 500);}
const order = { orderNumber: uuidv4(), product: "apple", quantity: 10,};
sendOrder(order);
consumer.js
เขียน node รับจาก rabbitmq และเขียนต่อลง database
const amqp = require("amqplib");const mysql = require("mysql");
const connection = mysql.createConnection({ host: "localhost", user: "root", password: "rootpassword", database: "orders",});
connection.connect();
const sleep = (milliseconds) => { return new Promise((resolve) => setTimeout(resolve, milliseconds));};
async function receiveOrders() { const conn = await amqp.connect("amqp://mikelopster:password@localhost:5672"); const channel = await conn.createChannel();
const queue = "orders-new"; await channel.assertQueue(queue, { durable: true });
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.prefetch(1);
channel.consume(queue, async (msg) => { try { const order = JSON.parse(msg.content.toString()); console.log(" [x] Received %s", order);
await sleep(10000);
const sql = "INSERT INTO orders SET ?"; connection.query(sql, order, (error, results) => { if (error) throw error; console.log("Order saved to database with id: " + results.insertId); });
// บอกว่าได้ message แล้ว channel.ack(msg); } catch (error) { console.log("Error:", error.message); } });}
receiveOrders();
สามารถดู monitor ผ่าน localhost ของ rabbitmq ได้ มีหน้าจัดการให้แล้วเรียบร้อย
http://localhost:15672/#/

Note เพิ่มเติม (จากใน video)
- มีการแชร์เคส ack กรณีที่เกิด operation บางอย่างแล้วต้องหน่วงเวลาก่อนเขียนเข้า order
- มีเคสการเพิ่ม prefetch เพื่อจำกัดการรับข้อมูลของ Consumer https://www.rabbitmq.com/consumer-prefetch.html
เราควรใช้ RabbitMQ (หรือ Message Queue) กับเคสไหนบ้าง
Ref: https://www.rabbitmq.com/getstarted.html
use case ที่มีโอกาสจะได้ใช้
- Order processing = ตามเคสในหัวข้อนี้ ขั้นตอนจริง order มีหลายขั้นตอน ดังนั้นการใช้ queue เพื่อให้ช่วย process ต่อได้
- Notification = สามารถ ทำเป็น queue เพื่อให้ส่ง notification ทีหลังได้ (ให้ process อื่นจัดการให้เสร็จก่อนจะส่ง Notification แยกได้)
- Email = service ทำเสร็จค่อยส่งเข้า queue เพื่อเรียงลำดับการส่ง email (คล้ายๆ Notification)
** อะไรที่เกี่ยวกับ Third party ที่มีเรื่องของ limitation การใช้งาน = Message queue จะ manage เรื่องนี้ได้ดี
- เพื่อควบคุมปริมาณการส่งในแต่ละช่วงเวลา
- เพื่อให้แน่ใจว่าการส่งสำเร็จ ก่อนที่เราจะให้ queue เลื่อนต่อไป
- รู้จักกับ Design Pattern - Creational (Part 1/3)มี Video
มาเรียนรู้รูปแบบการพัฒนา Software Design Pattern ประเภทแรก Creational กัน
- มารู้จักกับ SQL Transaction กันว่ามันคืออะไร ?มี Video
มารู้จักเรื่องราวของการทำ Transaction และ Deadlock ผ่าน SQL กันว่ามันคืออะไร
- มาเรียน Canvas ผ่าน Pong game กันมี Video
มารู้จักกับ HTML Canvas ว่ามันคืออะไร ใช้ทำอะไรบ้าง และมารู้จักพื้นฐานเบื้องต้นของการทำ Animation บน Browser ว่ามีหลักการเป็นประมาณไหน
- Caching design pattern กับ backendมี Video มี Github
บทความนี้จะเล่าเรื่อง Cache Pattern 3 แบบ lazy loading, write through และ write back ว่าเราสามารถเอา cache ไปใช้คู่กับ database ยังไงได้บ้าง