FastAPI এর সাথে Celery এবং Redis বা RabbitMQ ব্যবহার করে আপনার অ্যাপ্লিকেশনে অ্যাসিঙ্ক্রোনাস কাজ (Asynchronous tasks) এবং ব্যাকগ্রাউন্ড টাস্ক পরিচালনা করা সম্ভব। Celery হলো একটি শক্তিশালী টাস্ক কিউ ব্যবস্থাপনা সিস্টেম যা ডিস্ট্রিবিউটেড টাস্ক কিউ সিস্টেমের মাধ্যমে ব্যাকগ্রাউন্ডে চলমান টাস্কগুলো পরিচালনা করে। এখানে আমরা দেখব কিভাবে Redis বা RabbitMQ ব্যবহার করে Celery Queue ম্যানেজ করা যায় এবং এটি FastAPI এর সাথে ইন্টিগ্রেট করা যায়।
Celery এবং Redis/RabbitMQ কেন ব্যবহার করবেন?
- ব্যাকগ্রাউন্ড টাস্ক: Web request-response চক্রের বাইরে লম্বা-running কাজ (যেমন ইমেইল পাঠানো, ডাটাবেস ব্যাচ প্রসেসিং) করা যায়।
- স্কেলেবিলিটি: Redis বা RabbitMQ সার্ভার ব্যবহার করে টাস্কগুলো ডিস্ট্রিবিউটেড সিস্টেমে স্কেল করা যায়, যাতে লোড বেশি হওয়ায় কাজের ধীরগতি না হয়।
- অ্যাসিঙ্ক্রোনাস এক্সিকিউশন: CPU-intensive কাজকে অ্যাসিঙ্ক্রোনাসভাবে প্রসেস করতে সক্ষম।
প্রয়োজনীয় প্যাকেজ ইনস্টলেশন
Step 1: Celery, Redis বা RabbitMQ ইনস্টল করুন
আপনার প্রজেক্টের জন্য নিচের প্যাকেজগুলো ইনস্টল করুন:
pip install fastapi celery redis
RabbitMQ ব্যবহারের জন্য:
pip install celery[redis]
Step 2: Redis বা RabbitMQ সার্ভার সেটআপ
- Redis: আপনি Redis এর অফিশিয়াল সাইট থেকে Redis সার্ভার ইনস্টল করতে পারেন বা ডকার ব্যবহার করতে পারেন।
# Redis ইনস্টল
sudo apt-get install redis-server
- RabbitMQ: RabbitMQ ইনস্টল করতে:
sudo apt-get install rabbitmq-server
Celery সেটআপ FastAPI এর সাথে
Step 3: Celery কনফিগারেশন
Celery Configuration ফাইল তৈরি করা হবে এবং এখানে Redis বা RabbitMQ কনফিগারেশন থাকবে।
উদাহরণ: Celery সেটআপ Redis এর সাথে
# celery_worker.py
from celery import Celery
# Redis কনফিগারেশন
app = Celery('worker', broker='redis://localhost:6379/0')
# কাজের ফাংশন
@app.task
def send_email(email: str):
# ইমেইল পাঠানোর কাস্টম লজিক
print(f"Sending email to {email}")
return f"Email sent to {email}"
এখানে, Redis ব্যবহৃত হচ্ছে টাস্ক কিউ (broker) হিসেবে। আপনার Redis সার্ভার যদি অন্য মেশিনে থাকে তবে আপনাকে broker='redis://your_redis_host:6379/0' উল্লেখ করতে হবে।
Step 4: FastAPI অ্যাপ্লিকেশন সেটআপ
# main.py
from fastapi import FastAPI
from celery_worker import send_email
app = FastAPI()
@app.post("/send-email/{email}")
async def send_email_task(email: str):
send_email.delay(email)
return {"message": f"Email task for {email} has been accepted"}
এখানে, send_email.delay() ব্যবহার করা হয়েছে যাতে টাস্কটি অ্যাসিঙ্ক্রোনাসভাবে কল হয় এবং তা Celery কিউতে চলে যাবে।
Step 5: Celery Worker চালানো
Celery worker রান করার জন্য এই কমান্ডটি ব্যবহার করুন:
celery -A celery_worker.app worker --loglevel=info
এটি Celery worker শুরু করবে এবং টাস্কগুলো কিউ থেকে প্রসেস করবে।
Step 6: FastAPI অ্যাপ চালানো
FastAPI অ্যাপ চালাতে:
uvicorn main:app --reload
এখানে main ফাইলের নাম এবং app FastAPI অ্যাপের অবজেক্ট।
উদাহরণ: RabbitMQ ব্যবহার
RabbitMQ কনফিগারেশন একে অপরের থেকে কিছুটা আলাদা হলেও, তাতে অনেক ক্ষেত্রেই একই প্রক্রিয়া অনুসরণ করা হয়। এখানে, Celery কনফিগারেশন RabbitMQ কে ব্রোকার হিসেবে ব্যবহার করবে।
# celery_worker.py
from celery import Celery
# RabbitMQ কনফিগারেশন
app = Celery('worker', broker='pyamqp://guest@localhost//')
@app.task
def process_task(data: str):
print(f"Processing: {data}")
return f"Task processed: {data}"
এখানে pyamqp://guest@localhost// RabbitMQ ব্রোকার কনফিগারেশন, যা RabbitMQ সার্ভারের মাধ্যমে কাজ করবে।
Task Queue এবং Worker Scaling
FastAPI এবং Celery এর মধ্যে টাস্ক কিউ ব্যবস্থাপনা সম্ভবত একটি ডিস্ট্রিবিউটেড সিস্টেমের জন্য ব্যবহৃত হয়। বিভিন্ন workers দিয়ে টাস্কগুলো স্কেল করা সম্ভব। একাধিক worker ব্যবহার করে Celery কে স্কেল করা যেতে পারে, যাতে উচ্চ লোডের সময় কাজের গতিকে বজায় রাখা যায়।
Celery worker-দের স্কেল করার জন্য:
celery -A celery_worker.app worker --loglevel=info --concurrency=4
এখানে --concurrency=4 প্যারামিটারটি worker-কে ৪টি থ্রেডের মাধ্যমে কাজ করতে বলছে।
FastAPI এবং Celery এর মাধ্যমে আপনি অ্যাসিঙ্ক্রোনাস এবং ব্যাকগ্রাউন্ড টাস্ক সিস্টেম তৈরি করতে পারেন, যা Redis বা RabbitMQ ব্রোকার ব্যবহার করে স্কেলেবল এবং কার্যকর টাস্ক কিউ ম্যানেজমেন্ট সরবরাহ করে। এটি দীর্ঘ-running কাজগুলো যেমন ইমেইল পাঠানো, ফাইল প্রসেসিং, ডাটাবেস ব্যাচ প্রসেসিং ইত্যাদি পরিচালনা করার জন্য অত্যন্ত উপকারী।
Read more