Big Data and Analytics DataFrames এর সাথে কাজ করা গাইড ও নোট

397

Spark SQL-এ DataFrame হল একটি ডেটা স্ট্রাকচার যা SQL টেবিলের মতো কাজ করে। এটি একটি ডিস্ট্রিবিউটেড ডেটা স্ট্রাকচার যা বিভিন্ন ডেটা সোর্স থেকে ডেটা লোড এবং প্রসেস করার জন্য ব্যবহার করা হয়। DataFrame হল Spark SQL এর প্রধান উপাদান এবং এটি SQL কোয়ারি এবং DataFrame API উভয় মাধ্যমে পরিচালিত হতে পারে। DataFrame একটি সুশৃঙ্খল টেবিলের মতো কাঠামো তৈরি করে, যেখানে রেকর্ডগুলি সারি (rows) হিসেবে এবং কলামগুলো (columns) নির্দিষ্ট টাইপের ডেটা ধারণ করে।


DataFrame তৈরি করা

Spark SQL-এ DataFrame তৈরি করতে সাধারণত দুটি পদ্ধতি ব্যবহার করা হয়:

  1. CSV, JSON, Parquet বা অন্যান্য ফাইল ফরম্যাট থেকে DataFrame তৈরি করা।
  2. Python, Scala বা Java-এ তালিকা বা ডিকশনারি ব্যবহার করে DataFrame তৈরি করা।

১. ফাইল ফরম্যাট থেকে DataFrame তৈরি করা

Spark SQL বিভিন্ন ফাইল ফরম্যাট (যেমন CSV, JSON, Parquet, Avro) থেকে ডেটা লোড করতে পারে এবং সেই ডেটাকে DataFrame আকারে রূপান্তরিত করতে পারে।

# JSON ফাইল থেকে DataFrame তৈরি করা
df_json = spark.read.json("path_to_json_file")

# CSV ফাইল থেকে DataFrame তৈরি করা
df_csv = spark.read.option("header", "true").csv("path_to_csv_file")

# Parquet ফাইল থেকে DataFrame তৈরি করা
df_parquet = spark.read.parquet("path_to_parquet_file")

২. Python লিস্ট থেকে DataFrame তৈরি করা

from pyspark.sql import SparkSession

# SparkSession তৈরি
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()

# Python লিস্ট থেকে DataFrame তৈরি করা
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])

# DataFrame প্রদর্শন করা
df.show()

DataFrame API দিয়ে কাজ করা

Spark SQL-এ DataFrame API ব্যবহার করে ডেটার ওপর বিভিন্ন অপারেশন করা যায়, যেমন ফিল্টারিং, গ্রুপিং, অর্ডারিং, কলাম নির্বাচন, নতুন কলাম তৈরি ইত্যাদি।

১. কোলাম নির্বাচন করা

# নির্দিষ্ট কলাম নির্বাচন করা
df.select("Name").show()

# একাধিক কলাম নির্বাচন করা
df.select("Name", "Age").show()

২. ফিল্টারিং বা WHERE শর্ত প্রয়োগ করা

# WHERE শর্ত দিয়ে ডেটা ফিল্টার করা
df.filter(df["Age"] > 30).show()

৩. অর্ডারিং বা সাজানো (ORDER BY)

# Age কলামের ওপর ভিত্তি করে ডেটা সাজানো
df.orderBy("Age", ascending=False).show()

৪. গ্রুপিং এবং অ্যাগ্রিগেট ফাংশন (GROUP BY)

# Age কলাম অনুযায়ী গ্রুপিং এবং COUNT করা
df.groupBy("Age").count().show()

# গ্রুপিং এবং AVG (গড়) হিসাব করা
df.groupBy("Age").avg("Age").show()

৫. নতুন কলাম তৈরি করা

from pyspark.sql.functions import col

# নতুন কলাম তৈরি করা
df.withColumn("Age_in_5_Years", df["Age"] + 5).show()

৬. ডেটা পরিস্কার বা Null মান হ্যান্ডেল করা

# Null মান বাদ দেওয়া
df.na.drop().show()

# Null মানে কিছু মান প্রদান করা
df.na.fill({"Age": 0}).show()

৭. DataFrame এর সাথে SQL কোয়ারি এক্সিকিউট করা

DataFrame কে SQL টেবিল হিসেবে রেজিস্টার করে SQL কোয়ারি চালানো যায়।

# DataFrame কে SQL টেবিল হিসেবে রেজিস্টার করা
df.createOrReplaceTempView("people")

# SQL কোয়ারি চালানো
spark.sql("SELECT * FROM people WHERE Age > 30").show()

DataFrame এর Transformation এবং Action

Spark SQL-এ DataFrame অপারেশন দুটি প্রধান ভাগে বিভক্ত করা যায়:

  1. Transformation: এটি এমন অপারেশন যা একটি নতুন DataFrame তৈরি করে, যেমন filter(), select(), groupBy() ইত্যাদি। Transformation সাধারণত Lazy Evaluated হয়, অর্থাৎ এগুলি তখনই এক্সিকিউট হয় যখন Action অপারেশন করা হয়।
  2. Action: এটি এমন অপারেশন যা DataFrame থেকে কিছু রেজাল্ট বা আউটপুট তৈরি করে, যেমন show(), collect(), count() ইত্যাদি। Action সাধারণত ডেটাকে এক্সিকিউট করে এবং ফলাফল প্রদর্শন করে।

Transformation উদাহরণ:

# Transformation (একটি নতুন DataFrame তৈরি করা)
df_transformed = df.filter(df["Age"] > 30).select("Name")
df_transformed.show()

Action উদাহরণ:

# Action (ফলাফল প্রদর্শন করা)
df.show()  # এটি DataFrame এর প্রথম ২০টি রেকর্ড প্রদর্শন করবে

# Action (ডেটা সংগ্রহ করা)
result = df.collect()
print(result)  # এটি DataFrame এর সব রেকর্ড সংগ্রহ করবে

Spark SQL DataFrame-এর সুবিধা

  • ডিস্ট্রিবিউটেড প্রসেসিং: Spark DataFrame গুলি ডিস্ট্রিবিউটেড প্রকৃতির হওয়ায় বড় ডেটাসেটের ওপর দ্রুত কাজ করতে সক্ষম।
  • ফাংশনাল প্রোগ্রামিং সমর্থন: DataFrame API উচ্চ-স্তরের ফাংশনাল প্রোগ্রামিং সুবিধা প্রদান করে, যা ডেটার উপর কমপ্লেক্স ট্রান্সফরমেশন করা সহজ করে।
  • SQL সমর্থন: Spark SQL DataFrame API এবং SQL কোয়ারি উভয়ই সমর্থন করে, যা ডেটা বিশ্লেষণ এবং ট্রান্সফরমেশনকে আরও সহজ করে তোলে।

সারাংশ

Spark SQL DataFrame একটি শক্তিশালী টুল যা ডেটাকে একাধিক সোর্স থেকে লোড, প্রসেস, এবং বিশ্লেষণ করতে সহায়তা করে। এটি SQL কোয়ারি এবং DataFrame API উভয়েই কার্যকরী, যা বিভিন্ন ট্রান্সফরমেশন এবং অ্যাকশন অপারেশন সমর্থন করে। DataFrame ব্যবহারকারীদের দ্রুত, স্কেলেবল এবং কার্যকরী ডেটা প্রসেসিং করতে সাহায্য করে, যা বড় ডেটাসেটের জন্য বিশেষভাবে উপকারী।

Content added By

Columns এবং Rows এর মাধ্যমে Data Manipulation

288

Spark SQL একটি অত্যন্ত শক্তিশালী প্ল্যাটফর্ম যা ডেটার উপর SQL কোয়ারি, DataFrame API এবং Dataset API ব্যবহার করে ডেটা ম্যানিপুলেশন করতে সাহায্য করে। এই টিউটোরিয়ালে, আমরা দেখবো কীভাবে Spark SQL এ DataFrame এর Columns এবং Rows ব্যবহার করে ডেটা ম্যানিপুলেশন করা যায়।


DataFrame-এ Columns এর মাধ্যমে Data Manipulation

DataFrame একটি টেবিলের মতো ডেটা স্ট্রাকচার যা কলাম ভিত্তিক ডেটা সঞ্চয় করে। Spark SQL-এর DataFrame API ব্যবহার করে আপনি সহজেই কলামগুলোর উপর বিভিন্ন অপারেশন চালাতে পারেন, যেমন কলাম নির্বাচন, কলাম যোগ করা, কলাম ডিলিট করা, এবং কলামের মান পরিবর্তন করা।

1. কলাম নির্বাচন (Selecting Columns)

DataFrame থেকে নির্দিষ্ট কলাম নির্বাচন করার জন্য, আপনি .select() মেথড ব্যবহার করতে পারেন।

# SparkSession তৈরি করা
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL Example").getOrCreate()

# DataFrame তৈরি
data = [("John", 28), ("Doe", 22), ("Alice", 30), ("Bob", 25)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# নির্দিষ্ট কলাম নির্বাচন
df.select("Name").show()

এই কোডটি শুধুমাত্র "Name" কলামটি প্রদর্শন করবে।

2. কলাম যোগ করা (Adding Columns)

DataFrame-এ নতুন কলাম যোগ করতে আপনি DataFrame এর সাথে একটি নতুন কলাম যোগ করতে পারেন। উদাহরণস্বরূপ, যদি আপনি "Age" কলামের ওপর কিছু গণনা করতে চান এবং একটি নতুন কলাম তৈরি করতে চান:

from pyspark.sql.functions import col

# একটি নতুন কলাম 'Age_in_5_years' যোগ করা
df_with_new_col = df.withColumn("Age_in_5_years", col("Age") + 5)
df_with_new_col.show()

এই কোডটি "Age_in_5_years" নামক একটি নতুন কলাম যোগ করবে, যার মান হবে "Age" কলামের মানের উপর ৫ যোগ করা।

3. কলাম মুছে ফেলা (Dropping Columns)

DataFrame থেকে নির্দিষ্ট কলাম মুছে ফেলতে আপনি .drop() মেথড ব্যবহার করতে পারেন:

# "Age" কলাম মুছে ফেলা
df_dropped = df.drop("Age")
df_dropped.show()

এখানে, "Age" কলামটি DataFrame থেকে মুছে ফেলা হয়েছে।

4. কলামের মান পরিবর্তন (Modifying Column Values)

আপনি একটি কলামের মান পরিবর্তন করতে পারেন যেমন নির্দিষ্ট শর্ত অনুযায়ী। উদাহরণস্বরূপ, "Age" কলামের মান ৩০ এর বেশি হলে সেটিকে ৩০ এর সমান করে দেওয়া:

from pyspark.sql.functions import when

# Age কলামের মান পরিবর্তন করা
df_modified = df.withColumn("Age", when(col("Age") > 30, 30).otherwise(col("Age")))
df_modified.show()

এখানে, যদি "Age" কলামের মান ৩০ এর বেশি হয়, তবে সেটি ৩০ হবে, অন্যথায় পূর্বের মান থাকবে।


DataFrame-এ Rows এর মাধ্যমে Data Manipulation

DataFrame এর রো (row) গুলি ম্যানিপুলেট করতে হলে সাধারণত ফিল্টার, গ্রুপিং, অর্ডারিং, এবং শর্ত ভিত্তিক অপারেশন ব্যবহার করা হয়। Spark SQL-এ filter() অথবা where() মেথড ব্যবহার করে রো-ভিত্তিক ডেটা ম্যানিপুলেশন করা যায়।

1. রো ফিল্টার করা (Filtering Rows)

DataFrame থেকে নির্দিষ্ট শর্ত অনুযায়ী রো ফিল্টার করতে আপনি .filter() অথবা .where() মেথড ব্যবহার করতে পারেন:

# "Age" কলামের মান ২৫ এর বেশি হলে রো ফিল্টার করা
df_filtered = df.filter(col("Age") > 25)
df_filtered.show()

এখানে, "Age" কলামের মান ২৫ এর বেশি এমন সব রো প্রদর্শিত হবে।

2. গ্রুপিং (Grouping Rows)

Spark SQL এ গ্রুপিং করতে .groupBy() মেথড ব্যবহার করা হয়, যা রো-ভিত্তিক অপারেশন যেমন sum(), avg(), count() ইত্যাদি প্রয়োগ করতে সাহায্য করে:

# "Age" কলাম অনুযায়ী গ্রুপিং এবং গড় বয়স নির্ণয়
df_grouped = df.groupBy("Age").count()
df_grouped.show()

এখানে, DataFrame "Age" অনুসারে গ্রুপিং করে প্রতিটি বয়সের কতগুলো রো রয়েছে তা গণনা করা হয়েছে।

3. রো সজ্জিত করা (Ordering Rows)

DataFrame এ রো সজ্জিত করতে .orderBy() মেথড ব্যবহার করা হয়। এটি রো গুলিকে নির্দিষ্ট কলামের মান অনুসারে সাজিয়ে দেয়:

# "Age" কলাম অনুসারে রো সাজানো
df_ordered = df.orderBy("Age", ascending=False)
df_ordered.show()

এখানে, "Age" কলাম অনুসারে ডেটা ডিসেন্ডিং অর্ডারে সাজানো হবে।


Spark SQL এ Columns এবং Rows ম্যানিপুলেশনের সারাংশ

Spark SQL-এ Columns এবং Rows এর মাধ্যমে ডেটা ম্যানিপুলেশন একটি গুরুত্বপূর্ণ অংশ। DataFrame API এর মাধ্যমে আপনি বিভিন্ন কলাম অপারেশন যেমন কলাম নির্বাচন, যোগ, মুছে ফেলা এবং মান পরিবর্তন করতে পারেন। একইভাবে, রো-ভিত্তিক অপারেশন যেমন ফিল্টারিং, গ্রুপিং, এবং সাজানো (ordering) করার মাধ্যমে ডেটা ম্যানিপুলেশন করা যায়। Spark SQL এর এই ক্ষমতাগুলি ডেটাকে আরও কার্যকরভাবে বিশ্লেষণ এবং প্রসেস করতে সহায়ক।

Content added By

DataFrame এর মধ্যে ফিল্টারিং, সিলেকশন, এবং অর্ডারিং

276

Spark SQL-এ DataFrame একটি অত্যন্ত শক্তিশালী ডেটা স্ট্রাকচার যা SQL-এ টেবিলের মতো কাজ করে এবং SQL কোয়ারি বা DataFrame API দিয়ে ডেটা প্রসেসিং সহজ করে তোলে। DataFrame এ ফিল্টারিং (filtering), সিলেকশন (selection), এবং অর্ডারিং (ordering) অপারেশন করতে পারবেন। চলুন, এই অপারেশনগুলো কীভাবে করা যায় তা দেখি।


DataFrame-এ ফিল্টারিং (Filtering)

ফিল্টারিং বা শর্ত প্রয়োগ করা DataFrame এর মধ্যে নির্দিষ্ট রেকর্ড নির্বাচন করার একটি প্রক্রিয়া। Spark SQL-এর DataFrame API ব্যবহার করে আপনি filter() বা where() মেথডের মাধ্যমে ডেটা ফিল্টার করতে পারবেন।

ফিল্টারিং উদাহরণ:

# DataFrame তৈরি
df = spark.read.json("path_to_json_file")

# ফিল্টারিং: বয়স ৩০ এর বেশি রেকর্ড ফিল্টার করা
df_filtered = df.filter(df['age'] > 30)
df_filtered.show()

এখানে, df['age'] > 30 শর্তের অধীনে বয়স ৩০ এর বেশি রেকর্ডগুলো ফিল্টার করা হয়েছে।

where() মেথড ব্যবহার:

# where() মেথড ব্যবহার করে ফিল্টারিং
df_filtered = df.where(df['age'] > 30)
df_filtered.show()

filter() এবং where() মেথডের মধ্যে কার্যক্রম এক, তবে where() SQL-এ ব্যবহৃত WHERE ক্লজের মতো কাজ করে।


DataFrame-এ সিলেকশন (Selection)

সিলেকশন DataFrame-এর মধ্যে নির্দিষ্ট কলাম নির্বাচন করার প্রক্রিয়া। Spark SQL-এ, আপনি select() মেথড ব্যবহার করে এক বা একাধিক কলাম নির্বাচন করতে পারেন।

সিলেকশন উদাহরণ:

# DataFrame থেকে নির্দিষ্ট কলাম নির্বাচন
df_selected = df.select("name", "age")
df_selected.show()

এখানে, name এবং age কলামগুলো নির্বাচন করা হয়েছে।

একাধিক কলাম নির্বাচন:

# একাধিক কলাম নির্বাচন
df_selected = df.select("name", "age", "address")
df_selected.show()

এটি name, age, এবং address কলামগুলো নির্বাচন করবে।

নতুন কলাম যোগ করা:

# নতুন কলাম যোগ করা
from pyspark.sql.functions import col
df_selected = df.select("name", "age", (col("age") * 2).alias("double_age"))
df_selected.show()

এখানে, age কলামের দ্বিগুণ (double_age) কলাম হিসেবে নতুনভাবে যুক্ত করা হয়েছে।


DataFrame-এ অর্ডারিং (Ordering)

অর্ডারিং DataFrame-এর মধ্যে ডেটা সাজানোর (ascending বা descending) একটি প্রক্রিয়া। আপনি orderBy() মেথড ব্যবহার করে ডেটাকে একটি বা একাধিক কলামের ভিত্তিতে সাজাতে পারেন।

অর্ডারিং উদাহরণ:

# DataFrame কে বয়সের ওপর ascending অর্ডারে সাজানো
df_ordered = df.orderBy("age")
df_ordered.show()

এখানে, age কলাম অনুযায়ী ডেটা ascending (বয়সের কম থেকে বেশি) অর্ডারে সাজানো হয়েছে।

descending অর্ডারে সাজানো:

# বয়স descending অর্ডারে সাজানো
df_ordered_desc = df.orderBy(df['age'], ascending=False)
df_ordered_desc.show()

এখানে, ascending=False ব্যবহার করে বয়সের descending (বয়সের বেশি থেকে কম) অর্ডারে ডেটা সাজানো হয়েছে।

একাধিক কলামের ভিত্তিতে অর্ডারিং:

# একাধিক কলামের ভিত্তিতে সাজানো
df_ordered_multi = df.orderBy("age", "name")
df_ordered_multi.show()

এখানে, প্রথমে age কলাম অনুযায়ী ascending অর্ডারে এবং এরপর name কলাম অনুযায়ী ascending অর্ডারে ডেটা সাজানো হয়েছে।


সারাংশ

Spark SQL-এ DataFrame ব্যবহার করে ফিল্টারিং, সিলেকশন, এবং অর্ডারিং করা খুবই সহজ এবং কার্যকর। filter() বা where() মেথড দিয়ে ডেটা ফিল্টার করা যায়, select() মেথড দিয়ে কলাম সিলেক্ট করা যায় এবং orderBy() মেথড দিয়ে ডেটাকে সাজানো যায়। এই অপারেশনগুলো Spark SQL-এ ডেটা প্রসেসিংকে আরও দ্রুত এবং সুবিধাজনক করে তোলে, বিশেষ করে বড় ডেটাসেটের ক্ষেত্রে।

Content added By

Aggregation এবং Grouping Techniques

332

Spark SQL-এ aggregation এবং grouping হল ডেটা প্রসেসিংয়ের দুটি গুরুত্বপূর্ণ কৌশল, যা আপনাকে ডেটার উপর বিভিন্ন ধরনের স্ট্যাটিস্টিক্যাল অপারেশন (যেমন গড়, যোগফল, মিনিমাম, ম্যাক্সিমাম, কাউন্ট) করতে সহায়তা করে। এই কৌশলগুলি সাধারণত GROUP BY এবং aggregation functions এর মাধ্যমে সম্পাদিত হয়। চলুন, Spark SQL-এ Aggregation এবং Grouping Techniques কিভাবে ব্যবহার করা হয়, তা বিস্তারিতভাবে দেখব।


Aggregation Functions (অ্যাগ্রিগেশন ফাংশন)

Spark SQL-এ বেশ কিছু শক্তিশালী অ্যাগ্রিগেশন ফাংশন রয়েছে, যা বিভিন্ন ডেটার উপর গাণিতিক অপারেশন করতে সাহায্য করে। এখানে কিছু গুরুত্বপূর্ণ aggregation ফাংশন দেওয়া হলো:

  • count(): ডেটাসেটের মধ্যে কতটি রেকর্ড আছে, তা গণনা করে।
  • sum(): নির্দিষ্ট কলামের মোট যোগফল নির্ধারণ করে।
  • avg(): নির্দিষ্ট কলামের গড় মান নির্ধারণ করে।
  • min(): নির্দিষ্ট কলামের সর্বনিম্ন মান নির্ধারণ করে।
  • max(): নির্দিষ্ট কলামের সর্বোচ্চ মান নির্ধারণ করে।

Aggregation ব্যবহার করে ডেটা প্রসেসিং

ধরা যাক, আমাদের কাছে একটি DataFrame আছে, যেখানে কর্মচারীদের নাম, বয়স, এবং তাদের বিভাগ সম্পর্কিত ডেটা রয়েছে। এই DataFrame এর উপর aggregation অপারেশন করতে আমরা নিচের কোড ব্যবহার করতে পারি।

উদাহরণ:

from pyspark.sql import SparkSession

# SparkSession তৈরি
spark = SparkSession.builder.appName("Aggregation Example").getOrCreate()

# ডেটা তৈরি
data = [("John", "HR", 28),
        ("Alice", "Finance", 30),
        ("Bob", "IT", 25),
        ("Dave", "HR", 32),
        ("Eve", "Finance", 29)]

# DataFrame তৈরি
columns = ["Name", "Department", "Age"]
df = spark.createDataFrame(data, columns)

# Aggregation ফাংশন ব্যবহার করা (গড় বয়স)
df_avg_age = df.groupBy("Department").agg({"Age": "avg"})
df_avg_age.show()

আউটপুট:

+----------+--------+
|Department|avg(Age)|
+----------+--------+
|       HR|    30.0|
|  Finance|    29.5|
|       IT|    25.0|
+----------+--------+

এখানে:

  • groupBy("Department") ব্যবহার করে বিভাগ অনুযায়ী ডেটাকে গ্রুপ করা হয়েছে।
  • agg({"Age": "avg"}) ব্যবহার করে প্রতিটি বিভাগের গড় বয়স গণনা করা হয়েছে।

Grouping Techniques (গ্রুপিং টেকনিক)

GROUP BY অপারেশন ব্যবহার করে ডেটাকে গ্রুপ করা হয় এবং তারপর প্রতিটি গ্রুপের জন্য aggregation ফাংশন প্রয়োগ করা হয়। Spark SQL এর groupBy() এবং agg() মেথড এর মাধ্যমে আপনি ডেটাকে গ্রুপ করে বিভিন্ন ধরনের স্ট্যাটিস্টিক্যাল অপারেশন করতে পারেন।

উদাহরণ:

# বিভাগ অনুসারে গড় বয়স এবং বয়সের যোগফল বের করা
df_grouped = df.groupBy("Department").agg(
    {"Age": "avg", "Age": "sum"}
)
df_grouped.show()

আউটপুট:

+----------+--------+--------+
|Department|avg(Age)|sum(Age)|
+----------+--------+--------+
|       HR|    30.0|     60|
|  Finance|    29.5|     59|
|       IT|    25.0|     25|
+----------+--------+--------+

এখানে:

  • groupBy("Department") ডেটাকে বিভাগ অনুসারে গ্রুপ করে।
  • agg({"Age": "avg", "Age": "sum"}) দ্বারা প্রতিটি বিভাগের জন্য গড় বয়স এবং বয়সের যোগফল হিসাব করা হয়েছে।

Multiple Aggregation Functions (একাধিক অ্যাগ্রিগেশন ফাংশন)

Spark SQL-এ একাধিক aggregation ফাংশন একযোগে ব্যবহার করা যায়। আপনি যদি একই কলামের ওপর একাধিক aggregation অপারেশন করতে চান, তবে তা একসাথে করা সম্ভব।

উদাহরণ:

# একাধিক aggregation ফাংশন
df_agg_multiple = df.groupBy("Department").agg(
    {"Age": "avg", "Age": "max", "Age": "min", "Age": "sum"}
)
df_agg_multiple.show()

আউটপুট:

+----------+--------+--------+--------+--------+
|Department|avg(Age)|max(Age)|min(Age)|sum(Age)|
+----------+--------+--------+--------+--------+
|       HR|    30.0|      32|      28|      60|
|  Finance|    29.5|      30|      29|      59|
|       IT|    25.0|      25|      25|      25|
+----------+--------+--------+--------+--------+

এখানে:

  • আমরা একই কলাম Age এর ওপর গড় (avg), সর্বোচ্চ (max), সর্বনিম্ন (min), এবং যোগফল (sum) একসাথে ব্যবহার করেছি।

Filtered Aggregation (ফিল্টারড অ্যাগ্রিগেশন)

Spark SQL-এ আপনি aggregation এর আগে ডেটা ফিল্টার করতে পারেন, যাতে নির্দিষ্ট শর্তে ডেটা বিশ্লেষণ করা যায়। filter() বা where() মেথড ব্যবহার করে এই কাজটি করা হয়।

উদাহরণ:

# শুধুমাত্র ৩০ এর উপরে বয়সের ক্ষেত্রে গড় বয়স বের করা
df_filtered = df.filter(df["Age"] > 30).groupBy("Department").agg({"Age": "avg"})
df_filtered.show()

আউটপুট:

+----------+--------+
|Department|avg(Age)|
+----------+--------+
|       HR|    32.0|
+----------+--------+

এখানে:

  • filter(df["Age"] > 30) ব্যবহার করে আমরা প্রথমে এমন রেকর্ডগুলো ফিল্টার করেছি, যেগুলোর বয়স ৩০ এর বেশি।
  • এরপর, groupBy("Department").agg({"Age": "avg"}) দ্বারা সেই ডেটার ওপর গড় বয়স হিসাব করা হয়েছে।

সারাংশ

Spark SQL-এর aggregation এবং grouping techniques আপনাকে বিভিন্ন ডেটাসেটের ওপর স্ট্যাটিস্টিক্যাল অপারেশন করতে সহায়তা করে। groupBy() এবং agg() মেথডের মাধ্যমে আপনি সহজে ডেটাকে গ্রুপ করে, বিভিন্ন অ্যাগ্রিগেশন ফাংশন (যেমন sum(), avg(), max(), min(), count()) ব্যবহার করতে পারেন। Spark SQL আপনাকে একাধিক অ্যাগ্রিগেশন ফাংশন একসাথে ব্যবহার করার সুবিধা দেয়, এবং filter() বা where() মেথড ব্যবহার করে আপনি aggregation করার আগে ডেটাকে ফিল্টার করতে পারেন।

Content added By

DataFrame API এর মাধ্যমে Complex Data Processing

278

Spark SQL-এর DataFrame API একটি শক্তিশালী এবং নমনীয় ইন্টারফেস, যা ডিস্ট্রিবিউটেড ডেটা প্রক্রিয়াকরণের জন্য ব্যবহার করা হয়। DataFrame API ব্যবহার করে Complex Data Processing খুব সহজে করা যায়, যা বিভিন্ন ধরনের ডেটা ট্রান্সফর্মেশন, ফিল্টারিং, অ্যাগ্রিগেশন এবং বিভিন্ন জটিল কোয়ারি প্রক্রিয়া সমর্থন করে।

এখানে আমরা দেখব কীভাবে DataFrame API ব্যবহার করে জটিল ডেটা প্রসেসিং করা যায় এবং কীভাবে বিভিন্ন ধরনের ট্রান্সফর্মেশন এবং অ্যাকশন অপারেশন ব্যবহার করা যায়।


DataFrame API এর মাধ্যমে Complex Data Processing এর ধাপসমূহ

  1. SparkSession তৈরি করা
    প্রথমেই Spark SQL চালানোর জন্য SparkSession তৈরি করতে হয়, যা Spark-এর প্রধান এন্ট্রি পয়েন্ট হিসেবে কাজ করে।

    from pyspark.sql import SparkSession
    
    # SparkSession তৈরি
    spark = SparkSession.builder.appName("Complex Data Processing").getOrCreate()
    
  2. ডেটা লোড করা
    DataFrame তৈরি করতে বিভিন্ন ডেটা সোর্স (যেমন CSV, JSON, Parquet, Hive ইত্যাদি) থেকে ডেটা লোড করা যেতে পারে।

    # JSON ফাইল থেকে DataFrame লোড করা
    df = spark.read.json("path_to_file.json")
    
    # CSV ফাইল থেকে DataFrame লোড করা
    df_csv = spark.read.option("header", "true").csv("path_to_csv_file")
    

Complex Data Processing উদাহরণ

1. Data Filtering (ফিল্টারিং)

DataFrame এর ওপর filter বা where অপারেশন ব্যবহার করে নির্দিষ্ট শর্ত অনুযায়ী ডেটা ফিল্টার করা যায়।

# বয়স ৩০ এর বেশি এমন কর্মচারীদের ফিল্টার করা
filtered_df = df.filter(df['age'] > 30)
filtered_df.show()

এখানে, df['age'] > 30 শর্তটি পূরণ করা কর্মচারীদের ডেটা রিটার্ন করবে।

2. Data Transformation (ট্রান্সফর্মেশন)

DataFrame API-তে select, withColumn, drop, alias ইত্যাদি ব্যবহার করে ডেটা ট্রান্সফর্ম করা যায়।

  • withColumn: নতুন কলাম যোগ বা বিদ্যমান কলামের মান পরিবর্তন করতে।
# নতুন কলাম 'salary_increase' যোগ করা
df_transformed = df.withColumn("salary_increase", df['salary'] * 1.10)
df_transformed.show()
  • select: নির্দিষ্ট কলাম নির্বাচন করতে।
# 'name' এবং 'age' কলাম নির্বাচন করা
selected_df = df.select("name", "age")
selected_df.show()
  • drop: নির্দিষ্ট কলাম বাদ দিতে।
# 'salary' কলাম বাদ দেওয়া
df_dropped = df.drop("salary")
df_dropped.show()

3. Aggregation (অ্যাগ্রিগেশন)

DataFrame API-তে groupBy এবং agg ব্যবহার করে ডেটা গ্রুপ এবং অ্যাগ্রিগেট করা যায়।

# বিভাগ অনুযায়ী গড় বেতন বের করা
agg_df = df.groupBy("department").agg({"salary": "avg"})
agg_df.show()

এখানে, groupBy("department") বিভাগ অনুযায়ী গ্রুপ করে এবং agg({"salary": "avg"}) গড় বেতন বের করে।

4. Joining DataFrames (জয়েনিং)

DataFrame API-তে join ব্যবহার করে একাধিক DataFrame-এর মধ্যে ডেটা মিশ্রিত করা যায়।

# দুটি DataFrame যুক্ত করা, যেখানে department_id মিলবে
joined_df = df1.join(df2, df1.department_id == df2.department_id)
joined_df.show()

এখানে, df1 এবং df2 DataFrame দুটি department_id কলামের মাধ্যমে যুক্ত করা হয়েছে।

5. Sorting (অর্ডারিং)

DataFrame API তে orderBy ব্যবহার করে ডেটা সাজানো যায়।

# বেতন অনুযায়ী কর্মচারীদের সাজানো
sorted_df = df.orderBy(df['salary'], ascending=False)
sorted_df.show()

এখানে, salary কলাম অনুসারে কর্মচারীদের ডেটা নামিয়ে সাজানো হয়েছে।

6. Complex Calculations (জটিল গণনা)

DataFrame API তে গণনা বা কাস্টম লজিক প্রয়োগ করতে expr বা udf (User Defined Function) ব্যবহার করা যায়।

from pyspark.sql.functions import expr

# কর্মচারীদের বেতন ও ট্যাক্স হিসাব করা
df_calculated = df.withColumn("tax", expr("salary * 0.05"))
df_calculated.show()

এখানে, expr("salary * 0.05") ব্যবহার করে বেতন থেকে ৫% ট্যাক্স হিসাব করা হয়েছে।


একাধিক DataFrame-এ কাজ করা

DataFrame API ব্যবহার করে একাধিক ডেটাসেট বা টেবিলের ওপর JOIN, UNION, এবং INTERSECT অপারেশনও করা যেতে পারে।

  • Union: দুইটি DataFrame একত্রিত করার জন্য।
# দুটি DataFrame একত্রিত করা
union_df = df1.union(df2)
union_df.show()
  • Intersect: দুটি DataFrame এর মধ্যে যে ডেটা একসাথে রয়েছে তা বের করা।
# দুটি DataFrame এর কমন ডেটা বের করা
intersect_df = df1.intersect(df2)
intersect_df.show()

সারাংশ

Spark SQL এর DataFrame API ব্যবহার করে Complex Data Processing অনেক সহজ এবং কার্যকরী হয়ে ওঠে। DataFrame API তে বিভিন্ন ধরনের ট্রান্সফর্মেশন যেমন filter, select, withColumn, groupBy এবং join ব্যবহার করে ডেটা পরিসংখ্যান, ফিল্টারিং, গ্রুপিং, এবং জয়েনিং করা যায়। এইসব অপারেশনগুলি লিনিয়ার এবং প্যারালাল প্রসেসিংয়ে দ্রুত কাজ করে, কারণ Spark SQL ডিস্ট্রিবিউটেড কম্পিউটিং পরিবেশে কাজ করে। DataFrame API-তে জটিল গণনা এবং কাস্টম লজিক প্রয়োগ করা সম্ভব, যা ডেটা বিশ্লেষণ এবং বিশ্লেষণাত্মক কাজকে আরও শক্তিশালী করে তোলে।

Content added By
Promotion

Are you sure to start over?

Loading...