Streaming Queries এবং Real-time Data Processing গাইড ও নোট

Big Data and Analytics - স্পার্ক এসকিউএল (Spark SQL) - Spark SQL এবং Streaming Data
320

Streaming Queries এবং Real-time Data Processing বর্তমানে ডেটা প্রক্রিয়াকরণের সবচেয়ে গুরুত্বপূর্ণ এবং চাহিদাযুক্ত বিষয়গুলোর মধ্যে একটি। Apache Spark SQL Structured Streaming ফিচার প্রদান করে, যা আপনাকে রিয়েল-টাইম ডেটা স্ট্রিমের ওপর SQL কোয়ারি চালানোর সুবিধা দেয়। এই প্রক্রিয়ায় Spark SQL ডেটা স্ট্রিমকে continuous queries এর মাধ্যমে প্রক্রিয়া করে এবং ফলাফল এক্সিকিউট করে।

Structured Streaming একটি শক্তিশালী টুল যা ডেটা সোসেস থেকে রিয়েল-টাইম ডেটা প্রাপ্তির জন্য ব্যবহার করা হয় এবং ডেটা প্রসেসিংয়ের জন্য SQL বা DataFrame API ব্যবহারের সুবিধা দেয়।


Structured Streaming

Structured Streaming হল Spark SQL-এর একটি ফিচার যা স্ট্রিমিং ডেটা প্রসেসিং সহজ এবং স্কেলেবল করে তোলে। এটি DataFrame/Dataset API ভিত্তিক, যেখানে স্ট্রিমিং ডেটাকে DataFrame হিসাবে ডেটা ফ্রেমওয়ার্কের মধ্যে প্রবাহিত করা হয়। Structured Streaming আপনার কোডকে রিয়েল-টাইম ডেটা প্রসেসিংয়ের জন্য সহজ এবং কার্যকরী করে তোলে, কারণ এটি Spark-এর ব্যাচ প্রসেসিং সিস্টেমের ওপর ভিত্তি করে কাজ করে এবং স্ট্রিমিং ডেটাকে ক্ষুদ্র ব্যাচের আকারে পরিচালনা করে।

Structured Streaming এর প্রধান সুবিধাগুলি হলো:

  • Declarative API: SQL কোয়ারি বা DataFrame API ব্যবহার করে স্ট্রিমিং ডেটার ওপর কাজ করা যায়।
  • Fault Tolerance: Spark SQL স্ট্রিমিং ডেটা প্রক্রিয়াকরণের জন্য fault tolerance সমর্থন করে।
  • Exactly Once Semantics: ডেটা প্রক্রিয়াকরণ নিশ্চিত করে যে, স্ট্রিমিং কোয়ারি প্রতি বার একই ডেটার জন্য একাধিক বার কার্যকরী না হবে।
  • Continuous Processing: ডেটা প্রবাহিত হতে থাকে এবং স্ট্রিমিং কোয়ারি সেগুলির ওপর অবিরাম কার্যকরী হয়।

Streaming Query তৈরি করা

Structured Streaming-এ একটি কোয়ারি তৈরির জন্য আপনি সাধারণত DataFrame বা Dataset ব্যবহার করেন এবং সেটি ডেটা সোর্সের উপর ভিত্তি করে স্ট্রিমিং ডেটার প্রসেসিং করেন। Spark SQL এর মাধ্যমে file sources, Kafka, socket ইত্যাদি সোর্স থেকে রিয়েল-টাইম ডেটা প্রাপ্তি এবং প্রসেসিং করা যায়।

উদাহরণ: File-based Streaming Query

এখানে, আমরা একটি ফোল্ডার থেকে incoming JSON ফাইল নিয়ে স্ট্রিমিং কোয়ারি চালাবো।

from pyspark.sql import SparkSession

# SparkSession তৈরি
spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()

# File-based স্ট্রিমিং সোর্স ব্যবহার করা
df = spark.readStream.json("path/to/input/directory")

# কোয়ারি তৈরি করা: ডেটাকে সোজা আউটপুট টেবিল/ফাইল-এ লেখার জন্য
query = df.writeStream.outputMode("append").format("parquet").option("path", "path/to/output/directory").start()

# কোয়ারি চালানো
query.awaitTermination()

এখানে, readStream.json() ব্যবহার করে incoming JSON ফাইলকে স্ট্রিমিং ডেটা সোর্স হিসাবে ব্যবহার করা হয়েছে এবং .writeStream() দিয়ে ফলাফলকে একটি output ফোল্ডারে Parquet ফরম্যাটে সেভ করা হয়েছে।


Streaming Query with Kafka

Apache Kafka হল একটি জনপ্রিয় স্ট্রিমিং ডেটা সোর্স, যা Spark SQL-এর সাথে খুব সহজেই ইন্টিগ্রেট করা যায়। Kafka থেকে ডেটা প্রাপ্তির জন্য Spark-এর readStream API ব্যবহার করতে হয়।

উদাহরণ: Kafka Streaming Query

from pyspark.sql import SparkSession

# SparkSession তৈরি
spark = SparkSession.builder.appName("Kafka Streaming Example").getOrCreate()

# Kafka থেকে স্ট্রিমিং ডেটা রিড করা
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "my_topic").load()

# Kafka থেকে আসা বার্তাগুলির কনটেন্ট এক্সট্র্যাক্ট করা
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# কোয়ারি তৈরি করা এবং আউটপুট ফাইলে সেভ করা
query = df.writeStream.outputMode("append").format("parquet").option("path", "path/to/output").start()

# কোয়ারি চালানো
query.awaitTermination()

এখানে, Kafka থেকে স্ট্রিমিং ডেটা readStream.format("kafka") দিয়ে রিড করা হয়েছে এবং তারপরে CAST করে key এবং value কলামগুলোকে স্ট্রিং হিসেবে রূপান্তর করা হয়েছে।


Real-time Aggregation and Windowing

Structured Streaming-এর মাধ্যমে রিয়েল-টাইম অ্যাগ্রিগেশন এবং উইন্ডো ফাংশনও ব্যবহার করা যায়, যেখানে একটি নির্দিষ্ট সময়সীমার মধ্যে ডেটা অ্যাগ্রিগেট করা হয়।

উদাহরণ: Real-time Aggregation (Moving Average)

from pyspark.sql.functions import window, avg

# DataFrame তৈরি করা
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor_topic").load()

# Real-time অ্যাগ্রিগেশন: প্রতি 1 মিনিটে গড় বের করা
df_aggregated = df.groupBy(window(df.timestamp, "1 minute")).agg(avg("value").alias("avg_value"))

# কোয়ারি চালানো
query = df_aggregated.writeStream.outputMode("complete").format("parquet").option("path", "path/to/output").start()

query.awaitTermination()

এখানে, প্রতি ১ মিনিটে ডেটার গড় (avg_value) বের করার জন্য groupBy(window(df.timestamp, "1 minute")) ব্যবহার করা হয়েছে।


Handling Late Data

Spark Structured Streaming late data handle করতে watermarking ব্যবহৃত হয়। যখন কিছু ডেটা নির্ধারিত সময়ের পর আসে, তখন Spark সেই ডেটা হ্যান্ডেল করতে watermarking ব্যবহার করে, যা ডেটার বিলম্ব সময়ের উপর ভিত্তি করে ডেটাকে গ্রহণ করে এবং এর সাথে সম্পর্কিত অ্যাগ্রিগেশন বা অপারেশন করে।

উদাহরণ: Watermarking for Late Data

from pyspark.sql.functions import window

# ডেটার বিলম্ব টাইমিং হ্যান্ডেল করা
df_with_watermark = df.withWatermark("timestamp", "10 minutes")

# বিলম্বিত ডেটা অ্যাগ্রিগেশন করা
df_aggregated = df_with_watermark.groupBy(window(df_with_watermark.timestamp, "1 minute")).agg(avg("value").alias("avg_value"))

# কোয়ারি চালানো
query = df_aggregated.writeStream.outputMode("complete").format("parquet").option("path", "path/to/output").start()

query.awaitTermination()

এখানে, withWatermark("timestamp", "10 minutes") ব্যবহার করে ১০ মিনিট বিলম্বিত ডেটা হ্যান্ডেল করা হয়েছে এবং তারপরে অ্যাগ্রিগেশন করা হয়েছে।


Performance Tuning for Streaming Queries

Real-time Data Processing এর সময় পারফরম্যান্সে কিছু চ্যালেঞ্জ থাকতে পারে। Spark Structured Streaming-এর পারফরম্যান্স টিউনিংয়ের জন্য কিছু টিপস:

  1. Checkpointing: checkpointLocation ব্যবহার করে ডেটার পুনরুদ্ধার নিশ্চিত করুন, যাতে অ্যাপ্লিকেশন ক্র্যাশ হলে পুনরায় প্রক্রিয়া করা যায়।
  2. Trigger Interval: Trigger interval ব্যবহার করে স্ট্রিমিং কোয়ারি কতটা সময় পর পর এক্সিকিউট হবে তা নিয়ন্ত্রণ করতে পারেন।

    query = df.writeStream.trigger(processingTime="10 seconds").start()
    
  3. Partitioning: স্ট্রিমিং ডেটাকে repartition বা coalesce করে পার্টিশন সংখ্যা নিয়ন্ত্রণ করুন, যাতে অধিক প্রসেসিং সক্ষমতা পাওয়া যায়।
  4. Watermarking: বিলম্বিত ডেটা হ্যান্ডল করতে watermarking ব্যবহার করুন।

সারাংশ

Streaming Queries এবং Real-time Data Processing Spark SQL-এর Structured Streaming ফিচারের মাধ্যমে খুব সহজে এবং কার্যকরীভাবে করা যায়। আপনি বিভিন্ন সোর্স যেমন Kafka, files, বা sockets থেকে রিয়েল-টাইম ডেটা গ্রহণ করতে পারেন এবং SQL কোয়ারি বা DataFrame API ব্যবহার করে তা প্রক্রিয়া করতে পারেন। Spark SQL-এর Structured Streaming-এর মাধ্যমে আপনি অ্যাগ্রিগেশন, উইন্ডো অপারেশন, বিলম্বিত ডেটা হ্যান্ডলিং, এবং রিয়েল-টাইম কোয়ারি এক্সিকিউশন করতে সক্ষম হন।

Content added By
Promotion

Are you sure to start over?

Loading...