Spark SQL-এ Functions এবং Expressions দুটি অত্যন্ত গুরুত্বপূর্ণ উপাদান, যা ডেটা প্রসেসিং এবং বিশ্লেষণের ক্ষেত্রে ব্যবহৃত হয়। Spark SQL বিভিন্ন প্রাক-নির্ধারিত ফাংশন সরবরাহ করে, যার মাধ্যমে ডেটা অপারেশন যেমন গণনা, ফিল্টারিং, রূপান্তর, এবং পরিসংখ্যান করা যায়। এই ফাংশনগুলি ডেটা স্ট্রাকচারের উপর সরাসরি কাজ করে এবং ব্যবহারকারীদের আরও নমনীয় ও শক্তিশালী বিশ্লেষণ করতে সাহায্য করে।
Spark SQL Functions
Spark SQL-এর মধ্যে বিভিন্ন ধরনের ফাংশন রয়েছে, যা সাধারণত চারটি প্রধান ক্যাটাগরিতে বিভক্ত:
- অ্যাগ্রিগেট ফাংশন (Aggregate Functions)
অ্যাগ্রিগেট ফাংশনগুলি ব্যবহারকারীদের গ্রুপভিত্তিক ডেটা পরিসংখ্যান বের করতে সাহায্য করে, যেমন গড়, সর্বোচ্চ, সর্বনিম্ন ইত্যাদি।avg(): গড় মান বের করা।
from pyspark.sql import functions as F df.select(F.avg('salary')).show()sum(): মোট মান হিসাব করা।
df.select(F.sum('salary')).show()max(): সর্বোচ্চ মান বের করা।
df.select(F.max('salary')).show()min(): সর্বনিম্ন মান বের করা।
df.select(F.min('salary')).show()count(): রেকর্ডের সংখ্যা বের করা।
df.select(F.count('name')).show()
- ম্যাথেমেটিক্যাল ফাংশন (Mathematical Functions)
এই ফাংশনগুলো ডেটার উপর গণনা করার জন্য ব্যবহৃত হয়।abs(): মানের আবসোলিউট ভ্যালু বের করা।
df.select(F.abs('salary')).show()round(): মানকে নির্দিষ্ট দশমিকের মধ্যে গোল করা।
df.select(F.round('salary', 2)).show()sqrt(): বর্গমূল বের করা।
df.select(F.sqrt('salary')).show()pow(): ঘাত নির্ধারণ করা।
df.select(F.pow('salary', 2)).show()
- স্ট্রিং ফাংশন (String Functions)
এই ফাংশনগুলো স্ট্রিং ডেটা প্রসেস করতে ব্যবহৃত হয়, যেমন স্ট্রিংয়ের দৈর্ঘ্য নির্ণয়, স্ট্রিংয়ের অংশ আলাদা করা ইত্যাদি।concat(): দুটি বা ততোধিক স্ট্রিং যোগ করা।
df.select(F.concat('first_name', 'last_name')).show()length(): স্ট্রিংয়ের দৈর্ঘ্য বের করা।
df.select(F.length('name')).show()lower(): স্ট্রিংয়ের সব অক্ষরকে ছোট অক্ষরে পরিবর্তন করা।
df.select(F.lower('name')).show()upper(): স্ট্রিংয়ের সব অক্ষরকে বড় অক্ষরে পরিবর্তন করা।
df.select(F.upper('name')).show()
- কনভার্সন ফাংশন (Conversion Functions)
এই ফাংশনগুলো ডেটা টাইপ কনভার্সনের জন্য ব্যবহৃত হয়।cast(): ডেটার টাইপ পরিবর্তন করা।
df.select(F.col('salary').cast('float')).show()to_date(): স্ট্রিং থেকে তারিখে রূপান্তর করা।
df.select(F.to_date('date_string', 'yyyy-MM-dd')).show()
Spark SQL Expressions
Spark SQL Expressions হলো সেই এক্সপ্রেশনগুলি, যেগুলি SQL কোয়ারি বা DataFrame API-তে ব্যবহার করা হয় ডেটার উপর বিভিন্ন গণনা ও অপারেশন করার জন্য। Expressions মূলত ফাংশনের সমন্বয়ে তৈরি হয় এবং SQL কোয়ারির মধ্যে কার্যকরী হয়।
Expressions এর উদাহরণ
Arithmetic Expressions
আপনি যদি গাণিতিক অভিব্যক্তি ব্যবহার করে একটি কলামের মানকে অন্য মানের সাথে যোগ, বিয়োগ, গুণ বা ভাগ করতে চান, তবে নিচের মতো এক্সপ্রেশন ব্যবহার করা যেতে পারে:
df.select((df['salary'] + 1000).alias('new_salary')).show()Condition-based Expressions
Spark SQL expressions-এ আপনি শর্ত অনুযায়ী ভ্যালু বের করার জন্য CASE WHEN এক্সপ্রেশন ব্যবহার করতে পারেন। এটি ডেটার ওপর শর্ত তৈরি করতে সহায়ক।
df.select( F.when(df['salary'] > 50000, 'High Salary').otherwise('Low Salary').alias('salary_group') ).show()Column Expressions
একটি DataFrame কলামের উপর একাধিক অপারেশন বা ফাংশন প্রয়োগ করার জন্য expressions ব্যবহার করা হয়।
df.select( F.col('salary') * 1.1 ).show()Aggregate Expressions
Spark SQL expressions ব্যবহার করে GROUP BY এবং HAVING কন্ডিশনের সাথে অ্যাগ্রিগেট ফাংশন প্রয়োগ করা যায়।
df.groupBy('department').agg( F.avg('salary').alias('average_salary') ).show()
Functions এবং Expressions এর ব্যবহার
Spark SQL-এর Functions এবং Expressions আপনাকে বিভিন্ন ধরনের ডেটা প্রক্রিয়াকরণ অপারেশন করতে সাহায্য করে। আপনি SQL কোয়ারি বা DataFrame API-এর মধ্যে এই ফাংশনগুলো ব্যবহার করে ডেটা বিশ্লেষণ এবং প্রসেসিংয়ের কাজগুলো আরও সহজ ও দ্রুত করতে পারেন। ফাংশন এবং এক্সপ্রেশন ব্যবহার করে বিশ্লেষণ, ফিল্টারিং, রূপান্তর, এবং অ্যাগ্রিগেটিং অপারেশনগুলো অত্যন্ত শক্তিশালী এবং স্কেলেবেল হয়।
সারাংশ
Spark SQL-এর Functions এবং Expressions ডেটা প্রসেসিং এবং বিশ্লেষণের জন্য অত্যন্ত গুরুত্বপূর্ণ উপাদান। বিভিন্ন ফাংশন যেমন অ্যাগ্রিগেট, ম্যাথেমেটিক্যাল, স্ট্রিং, এবং কনভার্সন ফাংশন ব্যবহার করে আপনি ডেটার উপর জটিল অপারেশন করতে পারেন, এবং Expressions ব্যবহার করে SQL কোয়ারি বা DataFrame API তে শর্ত ভিত্তিক বা গণনামূলক অপারেশন চালাতে পারেন। Spark SQL এই সব ফাংশন এবং এক্সপ্রেশনগুলির মাধ্যমে ডেটা প্রক্রিয়াকরণ আরও দ্রুত এবং কার্যকরী করে তোলে।
Spark SQL এ বিভিন্ন ধরনের built-in functions রয়েছে যা ডেটা প্রসেসিংয়ের জন্য অত্যন্ত কার্যকরী। এই ফাংশনগুলো আপনাকে স্ট্রিং, নাম্বার, ডেটা, এবং অন্যান্য ডেটা টাইপের ওপর বিভিন্ন অপারেশন চালানোর সুবিধা দেয়। Spark SQL-এর built-in functions আপনি DataFrame API অথবা SQL কোয়ারির মাধ্যমে ব্যবহার করতে পারেন।
১. String Functions
Spark SQL-এ স্ট্রিং ডেটার উপর বিভিন্ন ধরনের built-in ফাংশন রয়েছে যা স্ট্রিং প্রসেসিংয়ের জন্য ব্যবহৃত হয়। এর মধ্যে কিছু জনপ্রিয় স্ট্রিং ফাংশন হলো:
১.১. length(): স্ট্রিংয়ের দৈর্ঘ্য নির্ণয় করা।
from pyspark.sql.functions import length
df = spark.createDataFrame([("Alice",), ("Bob",)], ["Name"])
df.select(length(df["Name"])).show()
১.২. upper(): স্ট্রিংকে বড় হাতের অক্ষরে রূপান্তর করা।
from pyspark.sql.functions import upper
df.select(upper(df["Name"])).show()
১.৩. lower(): স্ট্রিংকে ছোট হাতের অক্ষরে রূপান্তর করা।
from pyspark.sql.functions import lower
df.select(lower(df["Name"])).show()
১.৪. concat(): একাধিক স্ট্রিং একত্রিত করা।
from pyspark.sql.functions import concat
df.select(concat(df["Name"], df["Name"])).show()
১.৫. substr(): স্ট্রিংয়ের নির্দিষ্ট অংশ বের করা।
from pyspark.sql.functions import substr
df.select(substr(df["Name"], 1, 2)).show() # প্রথম ২টি অক্ষর বের করবে
২. Numeric Functions
Spark SQL-এ সংখ্যার সাথে কাজ করার জন্যও অনেক built-in numeric ফাংশন রয়েছে। কিছু জনপ্রিয় numeric ফাংশন:
২.১. abs(): একটি সংখ্যার অভ্যন্তরীণ মান বের করা।
from pyspark.sql.functions import abs
df = spark.createDataFrame([(-5,), (3,)], ["Number"])
df.select(abs(df["Number"])).show()
২.২. round(): একটি সংখ্যাকে নির্দিষ্ট দশমিক স্থানে গোল করা।
from pyspark.sql.functions import round
df = spark.createDataFrame([(5.6789,), (3.1415,)], ["Value"])
df.select(round(df["Value"], 2)).show() # ২ দশমিক পর্যন্ত গোল করবে
২.৩. ceil(): একটি সংখ্যাকে পরবর্তী পূর্ণসংখ্যায় রাউন্ড করা।
from pyspark.sql.functions import ceil
df.select(ceil(df["Value"])).show()
২.৪. floor(): একটি সংখ্যাকে পূর্ববর্তী পূর্ণসংখ্যায় রাউন্ড করা।
from pyspark.sql.functions import floor
df.select(floor(df["Value"])).show()
২.৫. avg(): একটি কলামের গড় মান বের করা।
from pyspark.sql.functions import avg
df.select(avg(df["Value"])).show()
৩. Date Functions
Spark SQL-এ ডেটার সাথে কাজ করার জন্যও অনেক built-in date ফাংশন রয়েছে। এই ফাংশনগুলো ডেটা প্রসেসিংয়ের জন্য খুবই সহায়ক।
৩.১. current_date(): বর্তমান দিনের তারিখ বের করা।
from pyspark.sql.functions import current_date
df = spark.createDataFrame([(1,)], ["ID"])
df.select(current_date()).show()
৩.২. current_timestamp(): বর্তমান সময় এবং তারিখ বের করা।
from pyspark.sql.functions import current_timestamp
df.select(current_timestamp()).show()
৩.৩. date_add(): একটি তারিখে নির্দিষ্ট দিন যোগ করা।
from pyspark.sql.functions import date_add
df = spark.createDataFrame([("2024-12-19",)], ["Date"])
df.select(date_add(df["Date"], 5)).show() # ৫ দিন যোগ করবে
৩.৪. date_sub(): একটি তারিখ থেকে নির্দিষ্ট দিন বাদ দেওয়া।
from pyspark.sql.functions import date_sub
df.select(date_sub(df["Date"], 5)).show() # ৫ দিন বাদ দিবে
৩.৫. year(): একটি তারিখের বছর বের করা।
from pyspark.sql.functions import year
df.select(year(df["Date"])).show()
৩.৬. month(): একটি তারিখের মাস বের করা।
from pyspark.sql.functions import month
df.select(month(df["Date"])).show()
৩.৭. datediff(): দুটি তারিখের মধ্যে পার্থক্য (দিনে) বের করা।
from pyspark.sql.functions import datediff
df = spark.createDataFrame([("2024-12-19", "2024-12-15")], ["Date1", "Date2"])
df.select(datediff(df["Date1"], df["Date2"])).show()
সারাংশ
Spark SQL-এর built-in functions বিভিন্ন ধরনের ডেটা (স্ট্রিং, নাম্বার, ডেটা) প্রসেসিংয়ের জন্য শক্তিশালী এবং কার্যকরী টুল প্রদান করে। আপনি এই ফাংশনগুলো ব্যবহার করে স্ট্রিংয়ের দৈর্ঘ্য নির্ণয় করা, সংখ্যা গোল করা, ডেটা থেকে মাস বা বছর বের করা ইত্যাদি অপারেশন সহজেই করতে পারবেন। Spark SQL এর ফাংশনগুলো ডেটা ফ্রেম API অথবা SQL কোয়ারি ব্যবহার করে প্রয়োগ করা যায়, যা ডেটা প্রসেসিংকে আরও দ্রুত এবং সহজ করে তোলে।
Spark SQL এ UDF (User Defined Functions) হলো কাস্টম ফাংশন যা ব্যবহারকারীরা নিজের প্রয়োজন অনুযায়ী ডেটা প্রসেসিংয়ের জন্য তৈরি করেন। UDF ব্যবহার করে আপনি Spark SQL এর বিল্ট-ইন ফাংশনের বাইরে গিয়ে আপনার নিজস্ব লজিক প্রয়োগ করতে পারেন। সাধারণত, UDF ব্যবহার করা হয় যখন Spark SQL এর পূর্বনির্ধারিত ফাংশনগুলো যথেষ্ট নয় অথবা আপনি নির্দিষ্ট একটি কাস্টম লজিক প্রয়োগ করতে চান।
Spark SQL এ UDF তৈরি ও ব্যবহার করার প্রক্রিয়া দুটি প্রধান ধাপে বিভক্ত: প্রথমত, UDF তৈরি করা এবং দ্বিতীয়ত, তা SQL বা DataFrame API তে ব্যবহার করা।
UDF তৈরি করার পদ্ধতি
১. UDF তৈরি করা (Python উদাহরণ)
Spark SQL তে UDF তৈরি করার জন্য প্রথমে একটি পাইটন ফাংশন লিখতে হয়। তারপর Spark এর udf API ব্যবহার করে সেই ফাংশনটি UDF হিসেবে রেজিস্টার করতে হয়।
Python উদাহরণ: ধরা যাক, আমাদের একটি ফাংশন তৈরি করতে হবে যা একটি নামের প্রথম অক্ষর ক্যাপিটালাইজ করবে।
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# একটি সিম্পল ফাংশন তৈরি
def capitalize_name(name):
if name:
return name.capitalize()
return None
# UDF হিসেবে রেজিস্টার করা
capitalize_name_udf = udf(capitalize_name, StringType())
# SparkSession তৈরি
spark = SparkSession.builder.appName("UDFExample").getOrCreate()
# উদাহরণ DataFrame
data = [("alice",), ("bob",), ("charlie",)]
columns = ["name"]
df = spark.createDataFrame(data, columns)
# UDF ব্যবহার করা
df_with_capitalized_name = df.withColumn("capitalized_name", capitalize_name_udf(df["name"]))
df_with_capitalized_name.show()
এখানে:
capitalize_nameএকটি সাধারণ Python ফাংশন যা নামের প্রথম অক্ষর ক্যাপিটালাইজ করে।udfব্যবহার করে এটিকে Spark SQL তে একটি UDF হিসেবে রেজিস্টার করা হয়।withColumnব্যবহার করে DataFrame এর একটি নতুন কলাম যুক্ত করা হয়, যেখানেcapitalize_name_udfফাংশনটি প্রয়োগ করা হয়েছে।
২. UDF তৈরি করা (Scala উদাহরণ)
Scala তে UDF তৈরি করার জন্য একই পদ্ধতি অনুসরণ করতে হয়, তবে এটি Java বা Scala এর টাইপ সিস্টেমের সাথে মানানসই হতে হবে।
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.StringType
// একটি সিম্পল ফাংশন তৈরি
def capitalizeName(name: String): String = {
if (name != null) name.capitalize else null
}
// UDF হিসেবে রেজিস্টার করা
val capitalizeNameUDF = udf(capitalizeName _)
// SparkSession তৈরি
val spark = SparkSession.builder.appName("UDFExample").getOrCreate()
// উদাহরণ DataFrame
val data = Seq(("alice"), ("bob"), ("charlie"))
val df = spark.createDataFrame(data).toDF("name")
// UDF ব্যবহার করা
val dfWithCapitalizedNames = df.withColumn("capitalized_name", capitalizeNameUDF(df("name")))
dfWithCapitalizedNames.show()
এখানে:
capitalizeNameএকটি Scala ফাংশন যা নামের প্রথম অক্ষর ক্যাপিটালাইজ করে।udfফাংশনটি ব্যবহার করেcapitalizeNameফাংশনটি Spark SQL তে UDF হিসেবে রেজিস্টার করা হয়।withColumnব্যবহার করে DataFrame এ নতুন কলাম যোগ করা হয় যেখানে UDF প্রয়োগ করা হয়েছে।
UDF এর অন্যান্য ব্যবহার
১. SQL কোয়ারিতে UDF ব্যবহার করা
Spark SQL এ UDF তৈরি করার পর আপনি এটি SQL কোয়ারিতেও ব্যবহার করতে পারেন। UDF রেজিস্টার করার পরে, আপনি spark.sql এর মাধ্যমে SQL কোয়ারি লিখে সেটি ব্যবহার করতে পারবেন।
Python উদাহরণ:
# UDF রেজিস্টার করা SQL এ ব্যবহারের জন্য
spark.udf.register("capitalize_name_sql", capitalize_name, StringType())
# SQL কোয়ারি ব্যবহার করা
df_sql = spark.sql("SELECT name, capitalize_name_sql(name) as capitalized_name FROM df")
df_sql.show()
এখানে:
capitalize_name_sqlUDF SQL কোয়ারিতেcapitalize_nameফাংশন হিসেবে রেজিস্টার করা হয়েছে।- তারপর SQL কোয়ারি ব্যবহার করে UDF প্রয়োগ করা হয়েছে।
২. UDF এর পারফরম্যান্স
UDF ব্যবহারের সময় কিছু পারফরম্যান্স সমস্যা হতে পারে, কারণ Spark এর ইন-বিল্ট অপটিমাইজার এবং ক্যাটালিস্ট অপটিমাইজার UDF তে কাজ করতে পারে না। তাই আপনি যখন বড় ডেটাসেটের ওপর কাজ করছেন, তখন UDF ব্যবহার করার আগে সতর্ক থাকতে হবে। তবে, Spark SQL এ পারফরম্যান্স উন্নত করার জন্য অন্যান্য ফিচার যেমন Catalyst Optimizer এবং Tungsten ব্যবহার করা হয়, তবে UDF দিয়ে অপটিমাইজেশন সম্ভব নয়।
UDF এর সুবিধা
- কাস্টম লজিক: Spark SQL এর ইন-বিল্ট ফাংশনগুলো যদি আপনার প্রয়োজন মেটাতে না পারে, তাহলে UDF ব্যবহার করে আপনি নিজের কাস্টম লজিক প্রয়োগ করতে পারেন।
- ডেটা প্রসেসিং সুবিধা: UDF ব্যবহার করে আপনি DataFrame বা SQL কোয়ারি তে একাধিক ডেটা ট্রান্সফরমেশন করতে পারেন, যা সাধারণত SQL ফাংশন দিয়ে করা সম্ভব নয়।
- পোর্টেবল: আপনি তৈরি করা UDF বিভিন্ন Spark প্রজেক্ট বা অ্যাপ্লিকেশনেও ব্যবহার করতে পারেন, যা কোডের পুনঃব্যবহারযোগ্যতা বৃদ্ধি করে।
সারাংশ
Spark SQL এ User Defined Functions (UDF) তৈরি এবং ব্যবহার করা একটি অত্যন্ত শক্তিশালী টুল, যা কাস্টম ডেটা প্রসেসিংয়ের জন্য ব্যবহার করা হয়। UDF আপনাকে নিজস্ব ফাংশন তৈরি করার সুযোগ দেয়, যা SQL কোয়ারি বা DataFrame API তে প্রয়োগ করা যেতে পারে। তবে, UDF ব্যবহার করার সময় পারফরম্যান্স সংক্রান্ত কিছু চ্যালেঞ্জ থাকতে পারে, কারণ এটি Spark এর ক্যাটালিস্ট অপটিমাইজারের দ্বারা অপটিমাইজ করা যায় না। তবুও, যখন প্রয়োজনীয় কাস্টম লজিক প্রয়োগের বিষয় আসে, তখন UDF একটি অপরিহার্য টুল হিসেবে কাজ করে।
Spark SQL এর Window Functions একটি শক্তিশালী টুল যা ডেটার উপর অগ্রগতির ভিত্তিতে প্রক্রিয়াকরণ করতে সহায়তা করে। এটি একটি বিশেষ ধরনের ফাংশন যা একটি নির্দিষ্ট উইন্ডো (অথবা অংশ) এর মধ্যে কাজ করে এবং প্রতিটি রেকর্ডের জন্য একটি ফলাফল প্রদান করে। উইন্ডো ফাংশন মূলত সেই সমস্ত কেসে ব্যবহৃত হয় যেখানে আপনি আগের বা পরবর্তী রেকর্ডের উপর ভিত্তি করে বর্তমান রেকর্ডের সাথে কিছু কাজ করতে চান, যেমন রোলিং অ্যাগ্রিগেশন, রানিং টোটাল বা র্যাংকিং।
এটি ডেটা প্রসেসিং, বিশেষ করে অ্যানালাইটিক্যাল কাজের জন্য অত্যন্ত গুরুত্বপূর্ণ এবং সহায়ক হতে পারে। Spark SQL এ উইন্ডো ফাংশন ব্যবহারের মাধ্যমে আপনি জটিল অ্যানালাইসিস এবং হিসাব করতে পারেন, যেটি সাধারণ SQL অ্যাগ্রিগেট ফাংশনের সাহায্যে সম্ভব নয়।
Window Functions কী?
Window Functions হল এমন ফাংশন যা একটি নির্দিষ্ট "উইন্ডো" বা অংশের মধ্যে কাজ করে এবং আগের বা পরবর্তী রেকর্ডগুলোর উপর ভিত্তি করে ফলাফল প্রদান করে। সাধারণভাবে, এটি OVER() ক্লজের মাধ্যমে কাজ করে, যা উইন্ডোর সীমানা নির্ধারণ করে।
উইন্ডো ফাংশনের সাহায্যে আপনি নিম্নলিখিত কাজগুলো করতে পারেন:
- র্যাংকিং এবং রোকে (row) র্যাঙ্ক দেওয়া
- রানিং টোটাল বা অ্যাভারেজ হিসাব করা
- পারসেন্টাইল ক্যালকুলেশন
- পার্টিশন বা গ্রুপের মধ্যে অ্যাগ্রিগেশন
Spark SQL-এ Window Functions এর বৈশিষ্ট্য
১. PARTITION BY
PARTITION BY ক্লজটি উইন্ডোর ভিতরে একটি গ্রুপ তৈরি করতে ব্যবহৃত হয়। এটি ডেটাকে ভাগ করে দেয় এবং প্রতিটি পার্টিশনের জন্য উইন্ডো ফাংশন চালানো হয়।
২. ORDER BY
ORDER BY উইন্ডোর মধ্যে ডেটাকে একটি নির্দিষ্ট ক্রমে সাজানোর জন্য ব্যবহৃত হয়, যাতে ফাংশনগুলি ক্রমানুসারে সঠিকভাবে কাজ করতে পারে।
৩. ROWS BETWEEN
ROWS BETWEEN ক্লজটি উইন্ডোর মধ্যে কোন রেকর্ডগুলো অন্তর্ভুক্ত হবে তা নির্ধারণ করে। এটি ডেটাকে একটি রেঞ্জ হিসেবে ব্যাখ্যা করে (যেমন, আগের ২টি রেকর্ড বা পরবর্তী ১০টি রেকর্ড)।
Spark SQL-এ Window Functions ব্যবহার
Spark SQL এর Window Functions বিভিন্ন ধরনের কাজের জন্য ব্যবহৃত হতে পারে। নিচে কয়েকটি জনপ্রিয় উইন্ডো ফাংশনের ব্যবহার আলোচনা করা হল।
১. ROW_NUMBER() ফাংশন
ROW_NUMBER() একটি উইন্ডো ফাংশন যা প্রতিটি রেকর্ডের জন্য একটি ইউনিক র্যাংক প্রদান করে। এটি ডেটাকে একটি নির্দিষ্ট ক্রমে সাজানোর জন্য ব্যবহার করা হয়।
Python উদাহরণ:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# SparkSession তৈরি
spark = SparkSession.builder.appName("WindowExample").getOrCreate()
# স্যাম্পল ডেটা তৈরি
data = [("Alice", 100), ("Bob", 200), ("Charlie", 300), ("David", 400)]
columns = ["Name", "Value"]
# DataFrame তৈরি
df = spark.createDataFrame(data, columns)
# উইন্ডো স্পেসিফিকেশন তৈরি (ORDER BY Value)
windowSpec = Window.orderBy("Value")
# ROW_NUMBER() ফাংশন ব্যবহার
df_with_row_number = df.withColumn("RowNumber", row_number().over(windowSpec))
df_with_row_number.show()
এখানে, row_number() ফাংশন ব্যবহার করে আমরা প্রতিটি রেকর্ডের জন্য একটি র্যাংক তৈরি করেছি, যেটি "Value" কলামের ক্রম অনুসারে সাজানো।
২. RANK() ফাংশন
RANK() ফাংশন একই রকম র্যাংকিং প্রদান করে, তবে যদি দুটি বা তার বেশি রেকর্ডের মান সমান হয়, তাহলে তাদের র্যাংক এক হবে, এবং পরবর্তী র্যাংকটি বাদ পড়বে।
Python উদাহরণ:
from pyspark.sql.functions import rank
# RANK() ফাংশন ব্যবহার
df_with_rank = df.withColumn("Rank", rank().over(windowSpec))
df_with_rank.show()
এখানে, rank() ফাংশন ব্যবহার করে আমরা র্যাংকিং তৈরি করেছি এবং সমান মানের জন্য একই র্যাংক দেয়া হয়েছে।
৩. SUM() / AVG() ফাংশন (Running Total)
SUM() এবং AVG() উইন্ডো ফাংশনগুলি রানিং টোটাল বা রানিং অ্যাভারেজ হিসাব করতে ব্যবহৃত হয়। এটি ডেটার একটি নির্দিষ্ট অংশের ওপর অগ্রগতির ভিত্তিতে হিসাব করে।
Python উদাহরণ:
from pyspark.sql.functions import sum
# উইন্ডো স্পেসিফিকেশন তৈরি (PARTITION BY Name)
windowSpec = Window.partitionBy("Name").orderBy("Value").rowsBetween(Window.unboundedPreceding, Window.currentRow)
# RUNNING SUM() ফাংশন ব্যবহার
df_with_running_sum = df.withColumn("RunningSum", sum("Value").over(windowSpec))
df_with_running_sum.show()
এখানে, sum() ফাংশন ব্যবহার করে আমরা "Value" কলামের জন্য রানিং টোটাল তৈরি করেছি, যেখানে ডেটা "Name" অনুযায়ী পার্টিশন করা হয়েছে।
৪. LEAD() এবং LAG() ফাংশন
LEAD() এবং LAG() ফাংশনগুলি আগের বা পরবর্তী রেকর্ডের মান ফেরত দেয়। এগুলি সাধারণত সময়ভিত্তিক বিশ্লেষণে ব্যবহৃত হয়।
Python উদাহরণ:
from pyspark.sql.functions import lead, lag
from pyspark.sql import functions as F
# LEAD() এবং LAG() ফাংশন ব্যবহার
df_with_lead_lag = df.withColumn("Lead", lead("Value", 1).over(windowSpec)) \
.withColumn("Lag", lag("Value", 1).over(windowSpec))
df_with_lead_lag.show()
এখানে, lead() এবং lag() ফাংশন ব্যবহার করে আমরা প্রতিটি রেকর্ডের পরবর্তী এবং আগের মান বের করেছি।
Spark SQL উইন্ডো ফাংশনের বাস্তব প্রয়োগ
Spark SQL উইন্ডো ফাংশনগুলি বেশ কিছু বাস্তব জীবনের অ্যানালিটিক্যাল কাজের জন্য ব্যবহার করা যেতে পারে, যেমন:
- ফাইনান্সিয়াল অ্যানালাইসিস: রানিং টোটাল, গ্রোথ রেট এবং প্রফিট মার্জিন হিসাব করা।
- র্যাংকিং: র্যাংকিং তৈরির জন্য যেমন, টপ সেলার, গ্রাহকের আয়ের ভিত্তিতে র্যাংক নির্ধারণ।
- সময়ভিত্তিক বিশ্লেষণ: স্টক মার্কেট অ্যানালাইসিস, যেখানে গতকালের, পরশুদিনের বা আগের সপ্তাহের ডেটার সঙ্গে বর্তমান ডেটার তুলনা করা হয়।
সারাংশ
Spark SQL-এ Window Functions ব্যবহার করে আপনি আরও জটিল এবং উন্নত ডেটা প্রক্রিয়াকরণ করতে পারেন। এটি বিশেষ করে র্যাংকিং, রানিং টোটাল, অ্যাগ্রিগেশন এবং সময়ভিত্তিক বিশ্লেষণের জন্য অত্যন্ত কার্যকরী। উইন্ডো ফাংশন ব্যবহার করে আপনি বড় ডেটাসেটের ওপর আরো বিস্তারিত এবং অগ্রগতির ভিত্তিতে অ্যানালাইসিস করতে সক্ষম হন।
Spark SQL এ Conditional Expressions (যেমন WHEN, IF, CASE) ব্যবহার করে শর্তের উপর ভিত্তি করে ডেটার মান পরিবর্তন করা বা নতুন মান তৈরি করা যায়। এগুলো সাধারণত ডেটা ট্রান্সফরমেশন ও কন্ডিশনাল লজিক প্রয়োগে ব্যবহৃত হয়। চলুন, এগুলো কীভাবে Spark SQL এ ব্যবহার করা যায়, তা বিস্তারিতভাবে দেখি।
WHEN-THEN Expression
Spark SQL-এ WHEN এবং THEN শর্তযুক্ত এক্সপ্রেশন ব্যবহার করে আমরা নির্দিষ্ট শর্তের উপর ভিত্তি করে ডেটা কন্ডিশনাল ট্রান্সফর্ম করতে পারি। সাধারণত এটি CASE WHEN এর অংশ হিসেবে ব্যবহৃত হয়, যা SQL-এ কন্ডিশনাল লজিক তৈরি করতে ব্যবহৃত হয়।
WHEN Example:
from pyspark.sql.functions import when
# WHEN expression দিয়ে কলামের মান পরিবর্তন করা
df.select(
"name",
"age",
when(df["age"] > 30, "Senior").otherwise("Junior").alias("age_group")
).show()
এখানে:
when(df["age"] > 30, "Senior"): যদিage৩০ এর বেশি হয়, তবে "Senior" হবে।otherwise("Junior"): যদি শর্তটি পূর্ণ না হয়, তবে "Junior" হবে।
এটি একটি নতুন কলাম age_group তৈরি করবে, যা নির্ধারণ করবে ব্যবহারকারীর বয়সের ভিত্তিতে তারা "Senior" নাকি "Junior"।
CASE WHEN Expression
SQL এর CASE WHEN এক্সপ্রেশন Spark SQL এ একইভাবে কাজ করে। এটি WHEN এর সাথে শর্ত এবং THEN এর মাধ্যমে আউটপুট প্রদান করে।
CASE WHEN Example:
from pyspark.sql.functions import expr
# CASE WHEN expression দিয়ে মান পরিবর্তন
df.select(
"name",
"age",
expr("CASE WHEN age > 30 THEN 'Senior' ELSE 'Junior' END").alias("age_group")
).show()
এখানে:
CASE WHEN age > 30 THEN 'Senior' ELSE 'Junior' END: এটিageএর মানের ভিত্তিতে "Senior" বা "Junior" রিটার্ন করবে।expr()ফাংশন SQL স্টাইল এক্সপ্রেশন ব্যবহার করতে সাহায্য করে।
IF Expression
Spark SQL-এ IF এক্সপ্রেশন সাধারণত when এর মতো কাজ করে। এটি খুবই সরল এবং স্টেটমেন্ট হিসেবে কাজ করে। IF সাধারণত একক শর্তের জন্য ব্যবহার করা হয়।
IF Example:
from pyspark.sql.functions import col
# IF expression ব্যবহার করে শর্তযুক্ত মান নির্ধারণ
df.select(
"name",
"age",
(col("age") > 30).cast("string").alias("is_senior")
).show()
এখানে:
(col("age") > 30): এটি যদিage৩০ এর বেশি হয় তবেTrueএবং নাহলেFalseরিটার্ন করবে।.cast("string"): ফলাফলটি স্ট্রিং হিসেবে কনভার্ট করা হয়েছে।
Multiple Conditions with WHEN
একাধিক শর্ত সংযুক্ত করে WHEN এক্সপ্রেশন ব্যবহার করা যেতে পারে। এতে একাধিক শর্তের উপর ভিত্তি করে ডেটা ট্রান্সফর্ম করা সম্ভব হয়।
Multiple Conditions Example:
df.select(
"name",
"age",
when(df["age"] < 18, "Child")
.when((df["age"] >= 18) & (df["age"] <= 60), "Adult")
.otherwise("Senior")
.alias("age_category")
).show()
এখানে:
- প্রথম শর্তে,
age১৮ এর কম হলে "Child" হবে। - দ্বিতীয় শর্তে,
age১৮ থেকে ৬০ এর মধ্যে হলে "Adult" হবে। - অন্যথায়, "Senior" হবে।
Conditional Expressions এর সারাংশ:
- WHEN-THEN: এটি একটি শর্তের উপর ভিত্তি করে ডেটা ট্রান্সফর্ম করতে ব্যবহৃত হয়।
- CASE WHEN: এটি SQL এর মতো শর্তযুক্ত এক্সপ্রেশন যেখানে একাধিক শর্ত দেওয়া যায় এবং প্রতিটি শর্তের জন্য আলাদা আউটপুট রিটার্ন করা যায়।
- IF: এটি খুব সাধারণ শর্ত, যেটি সাধারণত দুটি শর্তের মধ্যে একটি নির্বাচন করতে ব্যবহৃত হয়।
সারাংশ
Spark SQL-এ WHEN, CASE, এবং IF এর মতো কন্ডিশনাল এক্সপ্রেশন ব্যবহার করে আমরা ডেটার মধ্যে শর্তসাপেক্ষ লজিক প্রয়োগ করতে পারি। এগুলো ডেটাকে শর্তানুসারে ট্রান্সফর্ম বা ক্যাটাগরি করতে সহায়ক হয়, যা ডেটা অ্যানালাইসিস ও ফিচার ইঞ্জিনিয়ারিং এর জন্য খুবই উপযোগী।
Read more