Spark SQL Functions এবং Expressions গাইড ও নোট

Big Data and Analytics - স্পার্ক এসকিউএল (Spark SQL)
321

Spark SQL-এ Functions এবং Expressions দুটি অত্যন্ত গুরুত্বপূর্ণ উপাদান, যা ডেটা প্রসেসিং এবং বিশ্লেষণের ক্ষেত্রে ব্যবহৃত হয়। Spark SQL বিভিন্ন প্রাক-নির্ধারিত ফাংশন সরবরাহ করে, যার মাধ্যমে ডেটা অপারেশন যেমন গণনা, ফিল্টারিং, রূপান্তর, এবং পরিসংখ্যান করা যায়। এই ফাংশনগুলি ডেটা স্ট্রাকচারের উপর সরাসরি কাজ করে এবং ব্যবহারকারীদের আরও নমনীয় ও শক্তিশালী বিশ্লেষণ করতে সাহায্য করে।


Spark SQL Functions

Spark SQL-এর মধ্যে বিভিন্ন ধরনের ফাংশন রয়েছে, যা সাধারণত চারটি প্রধান ক্যাটাগরিতে বিভক্ত:

  1. অ্যাগ্রিগেট ফাংশন (Aggregate Functions)
    অ্যাগ্রিগেট ফাংশনগুলি ব্যবহারকারীদের গ্রুপভিত্তিক ডেটা পরিসংখ্যান বের করতে সাহায্য করে, যেমন গড়, সর্বোচ্চ, সর্বনিম্ন ইত্যাদি।
    • avg(): গড় মান বের করা।

      from pyspark.sql import functions as F
      df.select(F.avg('salary')).show()
      
    • sum(): মোট মান হিসাব করা।

      df.select(F.sum('salary')).show()
      
    • max(): সর্বোচ্চ মান বের করা।

      df.select(F.max('salary')).show()
      
    • min(): সর্বনিম্ন মান বের করা।

      df.select(F.min('salary')).show()
      
    • count(): রেকর্ডের সংখ্যা বের করা।

      df.select(F.count('name')).show()
      
  2. ম্যাথেমেটিক্যাল ফাংশন (Mathematical Functions)
    এই ফাংশনগুলো ডেটার উপর গণনা করার জন্য ব্যবহৃত হয়।
    • abs(): মানের আবসোলিউট ভ্যালু বের করা।

      df.select(F.abs('salary')).show()
      
    • round(): মানকে নির্দিষ্ট দশমিকের মধ্যে গোল করা।

      df.select(F.round('salary', 2)).show()
      
    • sqrt(): বর্গমূল বের করা।

      df.select(F.sqrt('salary')).show()
      
    • pow(): ঘাত নির্ধারণ করা।

      df.select(F.pow('salary', 2)).show()
      
  3. স্ট্রিং ফাংশন (String Functions)
    এই ফাংশনগুলো স্ট্রিং ডেটা প্রসেস করতে ব্যবহৃত হয়, যেমন স্ট্রিংয়ের দৈর্ঘ্য নির্ণয়, স্ট্রিংয়ের অংশ আলাদা করা ইত্যাদি।
    • concat(): দুটি বা ততোধিক স্ট্রিং যোগ করা।

      df.select(F.concat('first_name', 'last_name')).show()
      
    • length(): স্ট্রিংয়ের দৈর্ঘ্য বের করা।

      df.select(F.length('name')).show()
      
    • lower(): স্ট্রিংয়ের সব অক্ষরকে ছোট অক্ষরে পরিবর্তন করা।

      df.select(F.lower('name')).show()
      
    • upper(): স্ট্রিংয়ের সব অক্ষরকে বড় অক্ষরে পরিবর্তন করা।

      df.select(F.upper('name')).show()
      
  4. কনভার্সন ফাংশন (Conversion Functions)
    এই ফাংশনগুলো ডেটা টাইপ কনভার্সনের জন্য ব্যবহৃত হয়।
    • cast(): ডেটার টাইপ পরিবর্তন করা।

      df.select(F.col('salary').cast('float')).show()
      
    • to_date(): স্ট্রিং থেকে তারিখে রূপান্তর করা।

      df.select(F.to_date('date_string', 'yyyy-MM-dd')).show()
      

Spark SQL Expressions

Spark SQL Expressions হলো সেই এক্সপ্রেশনগুলি, যেগুলি SQL কোয়ারি বা DataFrame API-তে ব্যবহার করা হয় ডেটার উপর বিভিন্ন গণনা ও অপারেশন করার জন্য। Expressions মূলত ফাংশনের সমন্বয়ে তৈরি হয় এবং SQL কোয়ারির মধ্যে কার্যকরী হয়।

Expressions এর উদাহরণ

  1. Arithmetic Expressions

    আপনি যদি গাণিতিক অভিব্যক্তি ব্যবহার করে একটি কলামের মানকে অন্য মানের সাথে যোগ, বিয়োগ, গুণ বা ভাগ করতে চান, তবে নিচের মতো এক্সপ্রেশন ব্যবহার করা যেতে পারে:

    df.select((df['salary'] + 1000).alias('new_salary')).show()
    
  2. Condition-based Expressions

    Spark SQL expressions-এ আপনি শর্ত অনুযায়ী ভ্যালু বের করার জন্য CASE WHEN এক্সপ্রেশন ব্যবহার করতে পারেন। এটি ডেটার ওপর শর্ত তৈরি করতে সহায়ক।

    df.select(
        F.when(df['salary'] > 50000, 'High Salary').otherwise('Low Salary').alias('salary_group')
    ).show()
    
  3. Column Expressions

    একটি DataFrame কলামের উপর একাধিক অপারেশন বা ফাংশন প্রয়োগ করার জন্য expressions ব্যবহার করা হয়।

    df.select(
        F.col('salary') * 1.1
    ).show()
    
  4. Aggregate Expressions

    Spark SQL expressions ব্যবহার করে GROUP BY এবং HAVING কন্ডিশনের সাথে অ্যাগ্রিগেট ফাংশন প্রয়োগ করা যায়।

    df.groupBy('department').agg(
        F.avg('salary').alias('average_salary')
    ).show()
    

Functions এবং Expressions এর ব্যবহার

Spark SQL-এর Functions এবং Expressions আপনাকে বিভিন্ন ধরনের ডেটা প্রক্রিয়াকরণ অপারেশন করতে সাহায্য করে। আপনি SQL কোয়ারি বা DataFrame API-এর মধ্যে এই ফাংশনগুলো ব্যবহার করে ডেটা বিশ্লেষণ এবং প্রসেসিংয়ের কাজগুলো আরও সহজ ও দ্রুত করতে পারেন। ফাংশন এবং এক্সপ্রেশন ব্যবহার করে বিশ্লেষণ, ফিল্টারিং, রূপান্তর, এবং অ্যাগ্রিগেটিং অপারেশনগুলো অত্যন্ত শক্তিশালী এবং স্কেলেবেল হয়।


সারাংশ

Spark SQL-এর Functions এবং Expressions ডেটা প্রসেসিং এবং বিশ্লেষণের জন্য অত্যন্ত গুরুত্বপূর্ণ উপাদান। বিভিন্ন ফাংশন যেমন অ্যাগ্রিগেট, ম্যাথেমেটিক্যাল, স্ট্রিং, এবং কনভার্সন ফাংশন ব্যবহার করে আপনি ডেটার উপর জটিল অপারেশন করতে পারেন, এবং Expressions ব্যবহার করে SQL কোয়ারি বা DataFrame API তে শর্ত ভিত্তিক বা গণনামূলক অপারেশন চালাতে পারেন। Spark SQL এই সব ফাংশন এবং এক্সপ্রেশনগুলির মাধ্যমে ডেটা প্রক্রিয়াকরণ আরও দ্রুত এবং কার্যকরী করে তোলে।

Content added By

Built-in Functions (String, Numeric, Date Functions)

300

Spark SQL এ বিভিন্ন ধরনের built-in functions রয়েছে যা ডেটা প্রসেসিংয়ের জন্য অত্যন্ত কার্যকরী। এই ফাংশনগুলো আপনাকে স্ট্রিং, নাম্বার, ডেটা, এবং অন্যান্য ডেটা টাইপের ওপর বিভিন্ন অপারেশন চালানোর সুবিধা দেয়। Spark SQL-এর built-in functions আপনি DataFrame API অথবা SQL কোয়ারির মাধ্যমে ব্যবহার করতে পারেন।


১. String Functions

Spark SQL-এ স্ট্রিং ডেটার উপর বিভিন্ন ধরনের built-in ফাংশন রয়েছে যা স্ট্রিং প্রসেসিংয়ের জন্য ব্যবহৃত হয়। এর মধ্যে কিছু জনপ্রিয় স্ট্রিং ফাংশন হলো:

১.১. length(): স্ট্রিংয়ের দৈর্ঘ্য নির্ণয় করা।

from pyspark.sql.functions import length

df = spark.createDataFrame([("Alice",), ("Bob",)], ["Name"])
df.select(length(df["Name"])).show()

১.২. upper(): স্ট্রিংকে বড় হাতের অক্ষরে রূপান্তর করা।

from pyspark.sql.functions import upper

df.select(upper(df["Name"])).show()

১.৩. lower(): স্ট্রিংকে ছোট হাতের অক্ষরে রূপান্তর করা।

from pyspark.sql.functions import lower

df.select(lower(df["Name"])).show()

১.৪. concat(): একাধিক স্ট্রিং একত্রিত করা।

from pyspark.sql.functions import concat

df.select(concat(df["Name"], df["Name"])).show()

১.৫. substr(): স্ট্রিংয়ের নির্দিষ্ট অংশ বের করা।

from pyspark.sql.functions import substr

df.select(substr(df["Name"], 1, 2)).show()  # প্রথম ২টি অক্ষর বের করবে

২. Numeric Functions

Spark SQL-এ সংখ্যার সাথে কাজ করার জন্যও অনেক built-in numeric ফাংশন রয়েছে। কিছু জনপ্রিয় numeric ফাংশন:

২.১. abs(): একটি সংখ্যার অভ্যন্তরীণ মান বের করা।

from pyspark.sql.functions import abs

df = spark.createDataFrame([(-5,), (3,)], ["Number"])
df.select(abs(df["Number"])).show()

২.২. round(): একটি সংখ্যাকে নির্দিষ্ট দশমিক স্থানে গোল করা।

from pyspark.sql.functions import round

df = spark.createDataFrame([(5.6789,), (3.1415,)], ["Value"])
df.select(round(df["Value"], 2)).show()  # ২ দশমিক পর্যন্ত গোল করবে

২.৩. ceil(): একটি সংখ্যাকে পরবর্তী পূর্ণসংখ্যায় রাউন্ড করা।

from pyspark.sql.functions import ceil

df.select(ceil(df["Value"])).show()

২.৪. floor(): একটি সংখ্যাকে পূর্ববর্তী পূর্ণসংখ্যায় রাউন্ড করা।

from pyspark.sql.functions import floor

df.select(floor(df["Value"])).show()

২.৫. avg(): একটি কলামের গড় মান বের করা।

from pyspark.sql.functions import avg

df.select(avg(df["Value"])).show()

৩. Date Functions

Spark SQL-এ ডেটার সাথে কাজ করার জন্যও অনেক built-in date ফাংশন রয়েছে। এই ফাংশনগুলো ডেটা প্রসেসিংয়ের জন্য খুবই সহায়ক।

৩.১. current_date(): বর্তমান দিনের তারিখ বের করা।

from pyspark.sql.functions import current_date

df = spark.createDataFrame([(1,)], ["ID"])
df.select(current_date()).show()

৩.২. current_timestamp(): বর্তমান সময় এবং তারিখ বের করা।

from pyspark.sql.functions import current_timestamp

df.select(current_timestamp()).show()

৩.৩. date_add(): একটি তারিখে নির্দিষ্ট দিন যোগ করা।

from pyspark.sql.functions import date_add

df = spark.createDataFrame([("2024-12-19",)], ["Date"])
df.select(date_add(df["Date"], 5)).show()  # ৫ দিন যোগ করবে

৩.৪. date_sub(): একটি তারিখ থেকে নির্দিষ্ট দিন বাদ দেওয়া।

from pyspark.sql.functions import date_sub

df.select(date_sub(df["Date"], 5)).show()  # ৫ দিন বাদ দিবে

৩.৫. year(): একটি তারিখের বছর বের করা।

from pyspark.sql.functions import year

df.select(year(df["Date"])).show()

৩.৬. month(): একটি তারিখের মাস বের করা।

from pyspark.sql.functions import month

df.select(month(df["Date"])).show()

৩.৭. datediff(): দুটি তারিখের মধ্যে পার্থক্য (দিনে) বের করা।

from pyspark.sql.functions import datediff

df = spark.createDataFrame([("2024-12-19", "2024-12-15")], ["Date1", "Date2"])
df.select(datediff(df["Date1"], df["Date2"])).show()

সারাংশ

Spark SQL-এর built-in functions বিভিন্ন ধরনের ডেটা (স্ট্রিং, নাম্বার, ডেটা) প্রসেসিংয়ের জন্য শক্তিশালী এবং কার্যকরী টুল প্রদান করে। আপনি এই ফাংশনগুলো ব্যবহার করে স্ট্রিংয়ের দৈর্ঘ্য নির্ণয় করা, সংখ্যা গোল করা, ডেটা থেকে মাস বা বছর বের করা ইত্যাদি অপারেশন সহজেই করতে পারবেন। Spark SQL এর ফাংশনগুলো ডেটা ফ্রেম API অথবা SQL কোয়ারি ব্যবহার করে প্রয়োগ করা যায়, যা ডেটা প্রসেসিংকে আরও দ্রুত এবং সহজ করে তোলে।

Content added By

UDF (User Defined Functions) তৈরি এবং ব্যবহার

420

Spark SQL এ UDF (User Defined Functions) হলো কাস্টম ফাংশন যা ব্যবহারকারীরা নিজের প্রয়োজন অনুযায়ী ডেটা প্রসেসিংয়ের জন্য তৈরি করেন। UDF ব্যবহার করে আপনি Spark SQL এর বিল্ট-ইন ফাংশনের বাইরে গিয়ে আপনার নিজস্ব লজিক প্রয়োগ করতে পারেন। সাধারণত, UDF ব্যবহার করা হয় যখন Spark SQL এর পূর্বনির্ধারিত ফাংশনগুলো যথেষ্ট নয় অথবা আপনি নির্দিষ্ট একটি কাস্টম লজিক প্রয়োগ করতে চান।

Spark SQL এ UDF তৈরি ও ব্যবহার করার প্রক্রিয়া দুটি প্রধান ধাপে বিভক্ত: প্রথমত, UDF তৈরি করা এবং দ্বিতীয়ত, তা SQL বা DataFrame API তে ব্যবহার করা।


UDF তৈরি করার পদ্ধতি

১. UDF তৈরি করা (Python উদাহরণ)

Spark SQL তে UDF তৈরি করার জন্য প্রথমে একটি পাইটন ফাংশন লিখতে হয়। তারপর Spark এর udf API ব্যবহার করে সেই ফাংশনটি UDF হিসেবে রেজিস্টার করতে হয়।

Python উদাহরণ: ধরা যাক, আমাদের একটি ফাংশন তৈরি করতে হবে যা একটি নামের প্রথম অক্ষর ক্যাপিটালাইজ করবে।

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# একটি সিম্পল ফাংশন তৈরি
def capitalize_name(name):
    if name:
        return name.capitalize()
    return None

# UDF হিসেবে রেজিস্টার করা
capitalize_name_udf = udf(capitalize_name, StringType())

# SparkSession তৈরি
spark = SparkSession.builder.appName("UDFExample").getOrCreate()

# উদাহরণ DataFrame
data = [("alice",), ("bob",), ("charlie",)]
columns = ["name"]
df = spark.createDataFrame(data, columns)

# UDF ব্যবহার করা
df_with_capitalized_name = df.withColumn("capitalized_name", capitalize_name_udf(df["name"]))
df_with_capitalized_name.show()

এখানে:

  • capitalize_name একটি সাধারণ Python ফাংশন যা নামের প্রথম অক্ষর ক্যাপিটালাইজ করে।
  • udf ব্যবহার করে এটিকে Spark SQL তে একটি UDF হিসেবে রেজিস্টার করা হয়।
  • withColumn ব্যবহার করে DataFrame এর একটি নতুন কলাম যুক্ত করা হয়, যেখানে capitalize_name_udf ফাংশনটি প্রয়োগ করা হয়েছে।

২. UDF তৈরি করা (Scala উদাহরণ)

Scala তে UDF তৈরি করার জন্য একই পদ্ধতি অনুসরণ করতে হয়, তবে এটি Java বা Scala এর টাইপ সিস্টেমের সাথে মানানসই হতে হবে।

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.StringType

// একটি সিম্পল ফাংশন তৈরি
def capitalizeName(name: String): String = {
  if (name != null) name.capitalize else null
}

// UDF হিসেবে রেজিস্টার করা
val capitalizeNameUDF = udf(capitalizeName _)

// SparkSession তৈরি
val spark = SparkSession.builder.appName("UDFExample").getOrCreate()

// উদাহরণ DataFrame
val data = Seq(("alice"), ("bob"), ("charlie"))
val df = spark.createDataFrame(data).toDF("name")

// UDF ব্যবহার করা
val dfWithCapitalizedNames = df.withColumn("capitalized_name", capitalizeNameUDF(df("name")))
dfWithCapitalizedNames.show()

এখানে:

  • capitalizeName একটি Scala ফাংশন যা নামের প্রথম অক্ষর ক্যাপিটালাইজ করে।
  • udf ফাংশনটি ব্যবহার করে capitalizeName ফাংশনটি Spark SQL তে UDF হিসেবে রেজিস্টার করা হয়।
  • withColumn ব্যবহার করে DataFrame এ নতুন কলাম যোগ করা হয় যেখানে UDF প্রয়োগ করা হয়েছে।

UDF এর অন্যান্য ব্যবহার

১. SQL কোয়ারিতে UDF ব্যবহার করা

Spark SQL এ UDF তৈরি করার পর আপনি এটি SQL কোয়ারিতেও ব্যবহার করতে পারেন। UDF রেজিস্টার করার পরে, আপনি spark.sql এর মাধ্যমে SQL কোয়ারি লিখে সেটি ব্যবহার করতে পারবেন।

Python উদাহরণ:

# UDF রেজিস্টার করা SQL এ ব্যবহারের জন্য
spark.udf.register("capitalize_name_sql", capitalize_name, StringType())

# SQL কোয়ারি ব্যবহার করা
df_sql = spark.sql("SELECT name, capitalize_name_sql(name) as capitalized_name FROM df")
df_sql.show()

এখানে:

  • capitalize_name_sql UDF SQL কোয়ারিতে capitalize_name ফাংশন হিসেবে রেজিস্টার করা হয়েছে।
  • তারপর SQL কোয়ারি ব্যবহার করে UDF প্রয়োগ করা হয়েছে।

২. UDF এর পারফরম্যান্স

UDF ব্যবহারের সময় কিছু পারফরম্যান্স সমস্যা হতে পারে, কারণ Spark এর ইন-বিল্ট অপটিমাইজার এবং ক্যাটালিস্ট অপটিমাইজার UDF তে কাজ করতে পারে না। তাই আপনি যখন বড় ডেটাসেটের ওপর কাজ করছেন, তখন UDF ব্যবহার করার আগে সতর্ক থাকতে হবে। তবে, Spark SQL এ পারফরম্যান্স উন্নত করার জন্য অন্যান্য ফিচার যেমন Catalyst Optimizer এবং Tungsten ব্যবহার করা হয়, তবে UDF দিয়ে অপটিমাইজেশন সম্ভব নয়।


UDF এর সুবিধা

  1. কাস্টম লজিক: Spark SQL এর ইন-বিল্ট ফাংশনগুলো যদি আপনার প্রয়োজন মেটাতে না পারে, তাহলে UDF ব্যবহার করে আপনি নিজের কাস্টম লজিক প্রয়োগ করতে পারেন।
  2. ডেটা প্রসেসিং সুবিধা: UDF ব্যবহার করে আপনি DataFrame বা SQL কোয়ারি তে একাধিক ডেটা ট্রান্সফরমেশন করতে পারেন, যা সাধারণত SQL ফাংশন দিয়ে করা সম্ভব নয়।
  3. পোর্টেবল: আপনি তৈরি করা UDF বিভিন্ন Spark প্রজেক্ট বা অ্যাপ্লিকেশনেও ব্যবহার করতে পারেন, যা কোডের পুনঃব্যবহারযোগ্যতা বৃদ্ধি করে।

সারাংশ

Spark SQL এ User Defined Functions (UDF) তৈরি এবং ব্যবহার করা একটি অত্যন্ত শক্তিশালী টুল, যা কাস্টম ডেটা প্রসেসিংয়ের জন্য ব্যবহার করা হয়। UDF আপনাকে নিজস্ব ফাংশন তৈরি করার সুযোগ দেয়, যা SQL কোয়ারি বা DataFrame API তে প্রয়োগ করা যেতে পারে। তবে, UDF ব্যবহার করার সময় পারফরম্যান্স সংক্রান্ত কিছু চ্যালেঞ্জ থাকতে পারে, কারণ এটি Spark এর ক্যাটালিস্ট অপটিমাইজারের দ্বারা অপটিমাইজ করা যায় না। তবুও, যখন প্রয়োজনীয় কাস্টম লজিক প্রয়োগের বিষয় আসে, তখন UDF একটি অপরিহার্য টুল হিসেবে কাজ করে।

Content added By

Window Functions এর মাধ্যমে Advanced Data Processing

293

Spark SQL এর Window Functions একটি শক্তিশালী টুল যা ডেটার উপর অগ্রগতির ভিত্তিতে প্রক্রিয়াকরণ করতে সহায়তা করে। এটি একটি বিশেষ ধরনের ফাংশন যা একটি নির্দিষ্ট উইন্ডো (অথবা অংশ) এর মধ্যে কাজ করে এবং প্রতিটি রেকর্ডের জন্য একটি ফলাফল প্রদান করে। উইন্ডো ফাংশন মূলত সেই সমস্ত কেসে ব্যবহৃত হয় যেখানে আপনি আগের বা পরবর্তী রেকর্ডের উপর ভিত্তি করে বর্তমান রেকর্ডের সাথে কিছু কাজ করতে চান, যেমন রোলিং অ্যাগ্রিগেশন, রানিং টোটাল বা র্যাংকিং।

এটি ডেটা প্রসেসিং, বিশেষ করে অ্যানালাইটিক্যাল কাজের জন্য অত্যন্ত গুরুত্বপূর্ণ এবং সহায়ক হতে পারে। Spark SQL এ উইন্ডো ফাংশন ব্যবহারের মাধ্যমে আপনি জটিল অ্যানালাইসিস এবং হিসাব করতে পারেন, যেটি সাধারণ SQL অ্যাগ্রিগেট ফাংশনের সাহায্যে সম্ভব নয়।


Window Functions কী?

Window Functions হল এমন ফাংশন যা একটি নির্দিষ্ট "উইন্ডো" বা অংশের মধ্যে কাজ করে এবং আগের বা পরবর্তী রেকর্ডগুলোর উপর ভিত্তি করে ফলাফল প্রদান করে। সাধারণভাবে, এটি OVER() ক্লজের মাধ্যমে কাজ করে, যা উইন্ডোর সীমানা নির্ধারণ করে।

উইন্ডো ফাংশনের সাহায্যে আপনি নিম্নলিখিত কাজগুলো করতে পারেন:

  • র্যাংকিং এবং রোকে (row) র‌্যাঙ্ক দেওয়া
  • রানিং টোটাল বা অ্যাভারেজ হিসাব করা
  • পারসেন্টাইল ক্যালকুলেশন
  • পার্টিশন বা গ্রুপের মধ্যে অ্যাগ্রিগেশন

Spark SQL-এ Window Functions এর বৈশিষ্ট্য

১. PARTITION BY

PARTITION BY ক্লজটি উইন্ডোর ভিতরে একটি গ্রুপ তৈরি করতে ব্যবহৃত হয়। এটি ডেটাকে ভাগ করে দেয় এবং প্রতিটি পার্টিশনের জন্য উইন্ডো ফাংশন চালানো হয়।

২. ORDER BY

ORDER BY উইন্ডোর মধ্যে ডেটাকে একটি নির্দিষ্ট ক্রমে সাজানোর জন্য ব্যবহৃত হয়, যাতে ফাংশনগুলি ক্রমানুসারে সঠিকভাবে কাজ করতে পারে।

৩. ROWS BETWEEN

ROWS BETWEEN ক্লজটি উইন্ডোর মধ্যে কোন রেকর্ডগুলো অন্তর্ভুক্ত হবে তা নির্ধারণ করে। এটি ডেটাকে একটি রেঞ্জ হিসেবে ব্যাখ্যা করে (যেমন, আগের ২টি রেকর্ড বা পরবর্তী ১০টি রেকর্ড)।


Spark SQL-এ Window Functions ব্যবহার

Spark SQL এর Window Functions বিভিন্ন ধরনের কাজের জন্য ব্যবহৃত হতে পারে। নিচে কয়েকটি জনপ্রিয় উইন্ডো ফাংশনের ব্যবহার আলোচনা করা হল।

১. ROW_NUMBER() ফাংশন

ROW_NUMBER() একটি উইন্ডো ফাংশন যা প্রতিটি রেকর্ডের জন্য একটি ইউনিক র‌্যাংক প্রদান করে। এটি ডেটাকে একটি নির্দিষ্ট ক্রমে সাজানোর জন্য ব্যবহার করা হয়।

Python উদাহরণ:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# SparkSession তৈরি
spark = SparkSession.builder.appName("WindowExample").getOrCreate()

# স্যাম্পল ডেটা তৈরি
data = [("Alice", 100), ("Bob", 200), ("Charlie", 300), ("David", 400)]
columns = ["Name", "Value"]

# DataFrame তৈরি
df = spark.createDataFrame(data, columns)

# উইন্ডো স্পেসিফিকেশন তৈরি (ORDER BY Value)
windowSpec = Window.orderBy("Value")

# ROW_NUMBER() ফাংশন ব্যবহার
df_with_row_number = df.withColumn("RowNumber", row_number().over(windowSpec))
df_with_row_number.show()

এখানে, row_number() ফাংশন ব্যবহার করে আমরা প্রতিটি রেকর্ডের জন্য একটি র‌্যাংক তৈরি করেছি, যেটি "Value" কলামের ক্রম অনুসারে সাজানো।


২. RANK() ফাংশন

RANK() ফাংশন একই রকম র‌্যাংকিং প্রদান করে, তবে যদি দুটি বা তার বেশি রেকর্ডের মান সমান হয়, তাহলে তাদের র‌্যাংক এক হবে, এবং পরবর্তী র‌্যাংকটি বাদ পড়বে।

Python উদাহরণ:

from pyspark.sql.functions import rank

# RANK() ফাংশন ব্যবহার
df_with_rank = df.withColumn("Rank", rank().over(windowSpec))
df_with_rank.show()

এখানে, rank() ফাংশন ব্যবহার করে আমরা র‌্যাংকিং তৈরি করেছি এবং সমান মানের জন্য একই র‌্যাংক দেয়া হয়েছে।


৩. SUM() / AVG() ফাংশন (Running Total)

SUM() এবং AVG() উইন্ডো ফাংশনগুলি রানিং টোটাল বা রানিং অ্যাভারেজ হিসাব করতে ব্যবহৃত হয়। এটি ডেটার একটি নির্দিষ্ট অংশের ওপর অগ্রগতির ভিত্তিতে হিসাব করে।

Python উদাহরণ:

from pyspark.sql.functions import sum

# উইন্ডো স্পেসিফিকেশন তৈরি (PARTITION BY Name)
windowSpec = Window.partitionBy("Name").orderBy("Value").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# RUNNING SUM() ফাংশন ব্যবহার
df_with_running_sum = df.withColumn("RunningSum", sum("Value").over(windowSpec))
df_with_running_sum.show()

এখানে, sum() ফাংশন ব্যবহার করে আমরা "Value" কলামের জন্য রানিং টোটাল তৈরি করেছি, যেখানে ডেটা "Name" অনুযায়ী পার্টিশন করা হয়েছে।


৪. LEAD() এবং LAG() ফাংশন

LEAD() এবং LAG() ফাংশনগুলি আগের বা পরবর্তী রেকর্ডের মান ফেরত দেয়। এগুলি সাধারণত সময়ভিত্তিক বিশ্লেষণে ব্যবহৃত হয়।

Python উদাহরণ:

from pyspark.sql.functions import lead, lag
from pyspark.sql import functions as F

# LEAD() এবং LAG() ফাংশন ব্যবহার
df_with_lead_lag = df.withColumn("Lead", lead("Value", 1).over(windowSpec)) \
                     .withColumn("Lag", lag("Value", 1).over(windowSpec))
df_with_lead_lag.show()

এখানে, lead() এবং lag() ফাংশন ব্যবহার করে আমরা প্রতিটি রেকর্ডের পরবর্তী এবং আগের মান বের করেছি।


Spark SQL উইন্ডো ফাংশনের বাস্তব প্রয়োগ

Spark SQL উইন্ডো ফাংশনগুলি বেশ কিছু বাস্তব জীবনের অ্যানালিটিক্যাল কাজের জন্য ব্যবহার করা যেতে পারে, যেমন:

  • ফাইনান্সিয়াল অ্যানালাইসিস: রানিং টোটাল, গ্রোথ রেট এবং প্রফিট মার্জিন হিসাব করা।
  • র্যাংকিং: র্যাংকিং তৈরির জন্য যেমন, টপ সেলার, গ্রাহকের আয়ের ভিত্তিতে র্যাংক নির্ধারণ।
  • সময়ভিত্তিক বিশ্লেষণ: স্টক মার্কেট অ্যানালাইসিস, যেখানে গতকালের, পরশুদিনের বা আগের সপ্তাহের ডেটার সঙ্গে বর্তমান ডেটার তুলনা করা হয়।

সারাংশ

Spark SQL-এ Window Functions ব্যবহার করে আপনি আরও জটিল এবং উন্নত ডেটা প্রক্রিয়াকরণ করতে পারেন। এটি বিশেষ করে র্যাংকিং, রানিং টোটাল, অ্যাগ্রিগেশন এবং সময়ভিত্তিক বিশ্লেষণের জন্য অত্যন্ত কার্যকরী। উইন্ডো ফাংশন ব্যবহার করে আপনি বড় ডেটাসেটের ওপর আরো বিস্তারিত এবং অগ্রগতির ভিত্তিতে অ্যানালাইসিস করতে সক্ষম হন।

Content added By

Conditional Expressions (WHEN, IF, CASE) এর ব্যবহার

322

Spark SQL এ Conditional Expressions (যেমন WHEN, IF, CASE) ব্যবহার করে শর্তের উপর ভিত্তি করে ডেটার মান পরিবর্তন করা বা নতুন মান তৈরি করা যায়। এগুলো সাধারণত ডেটা ট্রান্সফরমেশন ও কন্ডিশনাল লজিক প্রয়োগে ব্যবহৃত হয়। চলুন, এগুলো কীভাবে Spark SQL এ ব্যবহার করা যায়, তা বিস্তারিতভাবে দেখি।


WHEN-THEN Expression

Spark SQL-এ WHEN এবং THEN শর্তযুক্ত এক্সপ্রেশন ব্যবহার করে আমরা নির্দিষ্ট শর্তের উপর ভিত্তি করে ডেটা কন্ডিশনাল ট্রান্সফর্ম করতে পারি। সাধারণত এটি CASE WHEN এর অংশ হিসেবে ব্যবহৃত হয়, যা SQL-এ কন্ডিশনাল লজিক তৈরি করতে ব্যবহৃত হয়।

WHEN Example:

from pyspark.sql.functions import when

# WHEN expression দিয়ে কলামের মান পরিবর্তন করা
df.select(
    "name",
    "age",
    when(df["age"] > 30, "Senior").otherwise("Junior").alias("age_group")
).show()

এখানে:

  • when(df["age"] > 30, "Senior"): যদি age ৩০ এর বেশি হয়, তবে "Senior" হবে।
  • otherwise("Junior"): যদি শর্তটি পূর্ণ না হয়, তবে "Junior" হবে।

এটি একটি নতুন কলাম age_group তৈরি করবে, যা নির্ধারণ করবে ব্যবহারকারীর বয়সের ভিত্তিতে তারা "Senior" নাকি "Junior"।


CASE WHEN Expression

SQL এর CASE WHEN এক্সপ্রেশন Spark SQL এ একইভাবে কাজ করে। এটি WHEN এর সাথে শর্ত এবং THEN এর মাধ্যমে আউটপুট প্রদান করে।

CASE WHEN Example:

from pyspark.sql.functions import expr

# CASE WHEN expression দিয়ে মান পরিবর্তন
df.select(
    "name",
    "age",
    expr("CASE WHEN age > 30 THEN 'Senior' ELSE 'Junior' END").alias("age_group")
).show()

এখানে:

  • CASE WHEN age > 30 THEN 'Senior' ELSE 'Junior' END: এটি age এর মানের ভিত্তিতে "Senior" বা "Junior" রিটার্ন করবে।
  • expr() ফাংশন SQL স্টাইল এক্সপ্রেশন ব্যবহার করতে সাহায্য করে।

IF Expression

Spark SQL-এ IF এক্সপ্রেশন সাধারণত when এর মতো কাজ করে। এটি খুবই সরল এবং স্টেটমেন্ট হিসেবে কাজ করে। IF সাধারণত একক শর্তের জন্য ব্যবহার করা হয়।

IF Example:

from pyspark.sql.functions import col

# IF expression ব্যবহার করে শর্তযুক্ত মান নির্ধারণ
df.select(
    "name",
    "age",
    (col("age") > 30).cast("string").alias("is_senior")
).show()

এখানে:

  • (col("age") > 30): এটি যদি age ৩০ এর বেশি হয় তবে True এবং নাহলে False রিটার্ন করবে।
  • .cast("string"): ফলাফলটি স্ট্রিং হিসেবে কনভার্ট করা হয়েছে।

Multiple Conditions with WHEN

একাধিক শর্ত সংযুক্ত করে WHEN এক্সপ্রেশন ব্যবহার করা যেতে পারে। এতে একাধিক শর্তের উপর ভিত্তি করে ডেটা ট্রান্সফর্ম করা সম্ভব হয়।

Multiple Conditions Example:

df.select(
    "name",
    "age",
    when(df["age"] < 18, "Child")
    .when((df["age"] >= 18) & (df["age"] <= 60), "Adult")
    .otherwise("Senior")
    .alias("age_category")
).show()

এখানে:

  • প্রথম শর্তে, age ১৮ এর কম হলে "Child" হবে।
  • দ্বিতীয় শর্তে, age ১৮ থেকে ৬০ এর মধ্যে হলে "Adult" হবে।
  • অন্যথায়, "Senior" হবে।

Conditional Expressions এর সারাংশ:

  • WHEN-THEN: এটি একটি শর্তের উপর ভিত্তি করে ডেটা ট্রান্সফর্ম করতে ব্যবহৃত হয়।
  • CASE WHEN: এটি SQL এর মতো শর্তযুক্ত এক্সপ্রেশন যেখানে একাধিক শর্ত দেওয়া যায় এবং প্রতিটি শর্তের জন্য আলাদা আউটপুট রিটার্ন করা যায়।
  • IF: এটি খুব সাধারণ শর্ত, যেটি সাধারণত দুটি শর্তের মধ্যে একটি নির্বাচন করতে ব্যবহৃত হয়।

সারাংশ

Spark SQL-এ WHEN, CASE, এবং IF এর মতো কন্ডিশনাল এক্সপ্রেশন ব্যবহার করে আমরা ডেটার মধ্যে শর্তসাপেক্ষ লজিক প্রয়োগ করতে পারি। এগুলো ডেটাকে শর্তানুসারে ট্রান্সফর্ম বা ক্যাটাগরি করতে সহায়ক হয়, যা ডেটা অ্যানালাইসিস ও ফিচার ইঞ্জিনিয়ারিং এর জন্য খুবই উপযোগী।

Content added By
Promotion

Are you sure to start over?

Loading...