Streaming Queries এবং Real-time Data Processing বর্তমানে ডেটা প্রক্রিয়াকরণের সবচেয়ে গুরুত্বপূর্ণ এবং চাহিদাযুক্ত বিষয়গুলোর মধ্যে একটি। Apache Spark SQL Structured Streaming ফিচার প্রদান করে, যা আপনাকে রিয়েল-টাইম ডেটা স্ট্রিমের ওপর SQL কোয়ারি চালানোর সুবিধা দেয়। এই প্রক্রিয়ায় Spark SQL ডেটা স্ট্রিমকে continuous queries এর মাধ্যমে প্রক্রিয়া করে এবং ফলাফল এক্সিকিউট করে।
Structured Streaming একটি শক্তিশালী টুল যা ডেটা সোসেস থেকে রিয়েল-টাইম ডেটা প্রাপ্তির জন্য ব্যবহার করা হয় এবং ডেটা প্রসেসিংয়ের জন্য SQL বা DataFrame API ব্যবহারের সুবিধা দেয়।
Structured Streaming
Structured Streaming হল Spark SQL-এর একটি ফিচার যা স্ট্রিমিং ডেটা প্রসেসিং সহজ এবং স্কেলেবল করে তোলে। এটি DataFrame/Dataset API ভিত্তিক, যেখানে স্ট্রিমিং ডেটাকে DataFrame হিসাবে ডেটা ফ্রেমওয়ার্কের মধ্যে প্রবাহিত করা হয়। Structured Streaming আপনার কোডকে রিয়েল-টাইম ডেটা প্রসেসিংয়ের জন্য সহজ এবং কার্যকরী করে তোলে, কারণ এটি Spark-এর ব্যাচ প্রসেসিং সিস্টেমের ওপর ভিত্তি করে কাজ করে এবং স্ট্রিমিং ডেটাকে ক্ষুদ্র ব্যাচের আকারে পরিচালনা করে।
Structured Streaming এর প্রধান সুবিধাগুলি হলো:
- Declarative API: SQL কোয়ারি বা DataFrame API ব্যবহার করে স্ট্রিমিং ডেটার ওপর কাজ করা যায়।
- Fault Tolerance: Spark SQL স্ট্রিমিং ডেটা প্রক্রিয়াকরণের জন্য fault tolerance সমর্থন করে।
- Exactly Once Semantics: ডেটা প্রক্রিয়াকরণ নিশ্চিত করে যে, স্ট্রিমিং কোয়ারি প্রতি বার একই ডেটার জন্য একাধিক বার কার্যকরী না হবে।
- Continuous Processing: ডেটা প্রবাহিত হতে থাকে এবং স্ট্রিমিং কোয়ারি সেগুলির ওপর অবিরাম কার্যকরী হয়।
Streaming Query তৈরি করা
Structured Streaming-এ একটি কোয়ারি তৈরির জন্য আপনি সাধারণত DataFrame বা Dataset ব্যবহার করেন এবং সেটি ডেটা সোর্সের উপর ভিত্তি করে স্ট্রিমিং ডেটার প্রসেসিং করেন। Spark SQL এর মাধ্যমে file sources, Kafka, socket ইত্যাদি সোর্স থেকে রিয়েল-টাইম ডেটা প্রাপ্তি এবং প্রসেসিং করা যায়।
উদাহরণ: File-based Streaming Query
এখানে, আমরা একটি ফোল্ডার থেকে incoming JSON ফাইল নিয়ে স্ট্রিমিং কোয়ারি চালাবো।
from pyspark.sql import SparkSession
# SparkSession তৈরি
spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
# File-based স্ট্রিমিং সোর্স ব্যবহার করা
df = spark.readStream.json("path/to/input/directory")
# কোয়ারি তৈরি করা: ডেটাকে সোজা আউটপুট টেবিল/ফাইল-এ লেখার জন্য
query = df.writeStream.outputMode("append").format("parquet").option("path", "path/to/output/directory").start()
# কোয়ারি চালানো
query.awaitTermination()
এখানে, readStream.json() ব্যবহার করে incoming JSON ফাইলকে স্ট্রিমিং ডেটা সোর্স হিসাবে ব্যবহার করা হয়েছে এবং .writeStream() দিয়ে ফলাফলকে একটি output ফোল্ডারে Parquet ফরম্যাটে সেভ করা হয়েছে।
Streaming Query with Kafka
Apache Kafka হল একটি জনপ্রিয় স্ট্রিমিং ডেটা সোর্স, যা Spark SQL-এর সাথে খুব সহজেই ইন্টিগ্রেট করা যায়। Kafka থেকে ডেটা প্রাপ্তির জন্য Spark-এর readStream API ব্যবহার করতে হয়।
উদাহরণ: Kafka Streaming Query
from pyspark.sql import SparkSession
# SparkSession তৈরি
spark = SparkSession.builder.appName("Kafka Streaming Example").getOrCreate()
# Kafka থেকে স্ট্রিমিং ডেটা রিড করা
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_topic").load()
# Kafka থেকে আসা বার্তাগুলির কনটেন্ট এক্সট্র্যাক্ট করা
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# কোয়ারি তৈরি করা এবং আউটপুট ফাইলে সেভ করা
query = df.writeStream.outputMode("append").format("parquet").option("path", "path/to/output").start()
# কোয়ারি চালানো
query.awaitTermination()
এখানে, Kafka থেকে স্ট্রিমিং ডেটা readStream.format("kafka") দিয়ে রিড করা হয়েছে এবং তারপরে CAST করে key এবং value কলামগুলোকে স্ট্রিং হিসেবে রূপান্তর করা হয়েছে।
Real-time Aggregation and Windowing
Structured Streaming-এর মাধ্যমে রিয়েল-টাইম অ্যাগ্রিগেশন এবং উইন্ডো ফাংশনও ব্যবহার করা যায়, যেখানে একটি নির্দিষ্ট সময়সীমার মধ্যে ডেটা অ্যাগ্রিগেট করা হয়।
উদাহরণ: Real-time Aggregation (Moving Average)
from pyspark.sql.functions import window, avg
# DataFrame তৈরি করা
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor_topic").load()
# Real-time অ্যাগ্রিগেশন: প্রতি 1 মিনিটে গড় বের করা
df_aggregated = df.groupBy(window(df.timestamp, "1 minute")).agg(avg("value").alias("avg_value"))
# কোয়ারি চালানো
query = df_aggregated.writeStream.outputMode("complete").format("parquet").option("path", "path/to/output").start()
query.awaitTermination()
এখানে, প্রতি ১ মিনিটে ডেটার গড় (avg_value) বের করার জন্য groupBy(window(df.timestamp, "1 minute")) ব্যবহার করা হয়েছে।
Handling Late Data
Spark Structured Streaming late data handle করতে watermarking ব্যবহৃত হয়। যখন কিছু ডেটা নির্ধারিত সময়ের পর আসে, তখন Spark সেই ডেটা হ্যান্ডেল করতে watermarking ব্যবহার করে, যা ডেটার বিলম্ব সময়ের উপর ভিত্তি করে ডেটাকে গ্রহণ করে এবং এর সাথে সম্পর্কিত অ্যাগ্রিগেশন বা অপারেশন করে।
উদাহরণ: Watermarking for Late Data
from pyspark.sql.functions import window
# ডেটার বিলম্ব টাইমিং হ্যান্ডেল করা
df_with_watermark = df.withWatermark("timestamp", "10 minutes")
# বিলম্বিত ডেটা অ্যাগ্রিগেশন করা
df_aggregated = df_with_watermark.groupBy(window(df_with_watermark.timestamp, "1 minute")).agg(avg("value").alias("avg_value"))
# কোয়ারি চালানো
query = df_aggregated.writeStream.outputMode("complete").format("parquet").option("path", "path/to/output").start()
query.awaitTermination()
এখানে, withWatermark("timestamp", "10 minutes") ব্যবহার করে ১০ মিনিট বিলম্বিত ডেটা হ্যান্ডেল করা হয়েছে এবং তারপরে অ্যাগ্রিগেশন করা হয়েছে।
Performance Tuning for Streaming Queries
Real-time Data Processing এর সময় পারফরম্যান্সে কিছু চ্যালেঞ্জ থাকতে পারে। Spark Structured Streaming-এর পারফরম্যান্স টিউনিংয়ের জন্য কিছু টিপস:
- Checkpointing:
checkpointLocationব্যবহার করে ডেটার পুনরুদ্ধার নিশ্চিত করুন, যাতে অ্যাপ্লিকেশন ক্র্যাশ হলে পুনরায় প্রক্রিয়া করা যায়। Trigger Interval: Trigger interval ব্যবহার করে স্ট্রিমিং কোয়ারি কতটা সময় পর পর এক্সিকিউট হবে তা নিয়ন্ত্রণ করতে পারেন।
query = df.writeStream.trigger(processingTime="10 seconds").start()- Partitioning: স্ট্রিমিং ডেটাকে repartition বা coalesce করে পার্টিশন সংখ্যা নিয়ন্ত্রণ করুন, যাতে অধিক প্রসেসিং সক্ষমতা পাওয়া যায়।
- Watermarking: বিলম্বিত ডেটা হ্যান্ডল করতে watermarking ব্যবহার করুন।
সারাংশ
Streaming Queries এবং Real-time Data Processing Spark SQL-এর Structured Streaming ফিচারের মাধ্যমে খুব সহজে এবং কার্যকরীভাবে করা যায়। আপনি বিভিন্ন সোর্স যেমন Kafka, files, বা sockets থেকে রিয়েল-টাইম ডেটা গ্রহণ করতে পারেন এবং SQL কোয়ারি বা DataFrame API ব্যবহার করে তা প্রক্রিয়া করতে পারেন। Spark SQL-এর Structured Streaming-এর মাধ্যমে আপনি অ্যাগ্রিগেশন, উইন্ডো অপারেশন, বিলম্বিত ডেটা হ্যান্ডলিং, এবং রিয়েল-টাইম কোয়ারি এক্সিকিউশন করতে সক্ষম হন।
Read more