PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark DataFrame'in dönüştürülmesi pandas_udf() işlevi kullanılarak mümkündür. PySpark DataFrame üzerinde ok ile uygulanan kullanıcı tanımlı bir fonksiyondur. Vektörleştirilmiş işlemleri pandas_udf() kullanarak gerçekleştirebiliriz. Bu işlevi bir dekoratör olarak geçirerek uygulanabilir. Sözdizimini, parametreleri ve farklı örnekleri öğrenmek için bu kılavuzu inceleyelim.

İçindekiler Konusu:

PySpark DataFrame ve modül kurulumu hakkında bilgi edinmek istiyorsanız, bunu gözden geçirin madde .







Pyspark.sql.functions.pandas_udf()

Pandas_udf (), PySpark'taki sql.functions modülünde mevcuttur ve “from” anahtar kelimesi kullanılarak içe aktarılabilir. PySpark DataFrame'imiz üzerinde vektörleştirilmiş işlemleri gerçekleştirmek için kullanılır. Bu işlev, üç parametre geçirilerek bir dekoratör gibi uygulanır. Bundan sonra, bir ok kullanarak verileri vektör formatında (bunun için series/NumPy kullandığımız gibi) döndüren kullanıcı tanımlı bir fonksiyon oluşturabiliriz. Bu fonksiyon içerisinde sonucu döndürebiliyoruz.



Yapı ve Sözdizimi:



Öncelikle bu fonksiyonun yapısına ve sözdizimine bakalım:

@pandas_udf(veri türü)
def işlev_adı(işlem) -> convert_format:
iade beyanı

Burada işlev_adı tanımlı işlevimizin adıdır. Veri türü, bu işlev tarafından döndürülen veri türünü belirtir. “return” anahtar kelimesini kullanarak sonucu döndürebiliriz. Tüm işlemler ok ataması ile fonksiyon içerisinde gerçekleştirilir.





Pandas_udf (Fonksiyon ve DönüşTürü)

  1. İlk parametre, kendisine iletilen kullanıcı tanımlı işlevdir.
  2. İkinci parametre, fonksiyondan dönüş veri tipini belirtmek için kullanılır.

Veri:

Tüm bu kılavuzda, tanıtım için yalnızca bir PySpark DataFrame kullanıyoruz. Tanımladığımız tüm kullanıcı tanımlı fonksiyonlar bu PySpark DataFrame üzerinde uygulanmaktadır. PySpark kurulumundan sonra bu DataFrame'i ortamınızda oluşturduğunuzdan emin olun.



pyspark'ı içe aktar

pyspark.sql'den SparkSession'ı içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

pyspark.sql.functions'dan pandas_udf'u içe aktarın

pyspark.sql.types'ten içe aktarma *

pandaları panda olarak içe aktar

# sebze detayları

sebze =[{ 'tip' : 'sebze' , 'isim' : 'domates' , 'ülke_bul' : 'AMERİKA BİRLEŞİK DEVLETLERİ' , 'miktar' : 800 },

{ 'tip' : 'meyve' , 'isim' : 'muz' , 'ülke_bul' : 'ÇİN' , 'miktar' : yirmi },

{ 'tip' : 'sebze' , 'isim' : 'domates' , 'ülke_bul' : 'AMERİKA BİRLEŞİK DEVLETLERİ' , 'miktar' : 800 },

{ 'tip' : 'sebze' , 'isim' : 'Mango' , 'ülke_bul' : 'JAPONYA' , 'miktar' : 0 },

{ 'tip' : 'meyve' , 'isim' : 'limon' , 'ülke_bul' : 'HİNDİSTAN' , 'miktar' : 1700 },

{ 'tip' : 'sebze' , 'isim' : 'domates' , 'ülke_bul' : 'AMERİKA BİRLEŞİK DEVLETLERİ' , 'miktar' : 1200 },

{ 'tip' : 'sebze' , 'isim' : 'Mango' , 'ülke_bul' : 'JAPONYA' , 'miktar' : 0 },

{ 'tip' : 'meyve' , 'isim' : 'limon' , 'ülke_bul' : 'HİNDİSTAN' , 'miktar' : 0 }

]

# yukarıdaki verilerden pazar veri çerçevesini oluşturun

market_df = linuxhint_spark_app.createDataFrame(sebze)

market_df.show()

Çıktı:

Burada 4 sütun ve 8 satır ile bu DataFrame'i oluşturuyoruz. Şimdi, kullanıcı tanımlı işlevleri oluşturmak ve bunları bu sütunlara uygulamak için pandas_udf() kullanıyoruz.

Farklı Veri Türleriyle Pandas_udf()

Bu senaryoda pandas_udf() ile bazı kullanıcı tanımlı fonksiyonlar oluşturup kolonlara uygulayıp, select() metodunu kullanarak sonuçları gösteriyoruz. Her durumda, vektörleştirilmiş işlemleri gerçekleştirirken pandas.Series'i kullanırız. Bu, sütun değerlerini tek boyutlu bir dizi olarak kabul eder ve işlem sütuna uygulanır. Dekoratörün kendisinde, fonksiyon dönüş tipini belirtiyoruz.

Örnek 1: Dize Türü ile Pandas_udf()

Burada, string tipi sütun değerlerini büyük ve küçük harfe dönüştürmek için string dönüş tipi ile kullanıcı tanımlı iki fonksiyon oluşturuyoruz. Son olarak bu fonksiyonları “type” ve “locate_country” sütunlarına uyguluyoruz.

# Pandas_udf ile tür sütununu büyük harfe dönüştürün

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

i.str.upper()'ı döndür

# find_country sütununu pandas_udf ile küçük harfe dönüştürün

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

i.str.lower()'ı döndür

# Select() kullanarak sütunları göster

market_df.select( 'tip' ,type_upper_case( 'tip' ), 'ülke_bul' ,
country_lower_case( 'ülke_bul' )).göstermek()

Çıktı:

Açıklama:

StringType() işlevi, pyspark.sql.types modülünde mevcuttur. PySpark DataFrame'i oluştururken bu modülü zaten içe aktardık.

  1. İlk olarak, UDF (kullanıcı tanımlı işlev), str.upper() işlevini kullanarak dizeleri büyük harfle döndürür. str.upper(), verilen diziyi büyük harfe çeviren Seri Veri Yapısında mevcuttur (işlev içinde bir okla seriye dönüştürürken). Son olarak bu fonksiyon, select() metodu içerisinde belirtilen “type” sütununa uygulanır. Önceden, tür sütunundaki tüm dizeler küçük harfleydi. Şimdi, büyük harfle değiştirildiler.
  2. İkincisi, UDF, str.lower() işlevini kullanarak dizeleri büyük harfle döndürür. str.lower(), verilen dizgiyi küçük harfe çeviren Seri Veri Yapısında mevcuttur. Son olarak bu fonksiyon, select() metodu içerisinde belirtilen “type” sütununa uygulanır. Önceden, tür sütunundaki tüm dizeler büyük harfleydi. Şimdi, küçük harfe değiştirildiler.

Örnek 2: Tamsayı Türü ile Pandas_udf()

PySpark DataFrame tamsayı sütununu Pandas serisine dönüştüren bir UDF oluşturalım ve her değere 100 ekleyelim. Select() yönteminin içindeki 'miktar' sütununu bu işleve iletin.

# 100 ekle

@pandas_udf(TamsayıTürü())

def add_100(i: panda.Series) -> panda.Series:

geri dön 100

# Miktar sütununu yukarıdaki fonksiyona geçirin ve görüntüleyin.

market_df.select( 'miktar' ,ekle_100( 'miktar' )).göstermek()

Çıktı:

Açıklama:

UDF içinde, tüm değerleri yineliyoruz ve Seriye dönüştürüyoruz. Bundan sonra Serideki her değere 100 ekliyoruz. Son olarak “quantity” sütununu bu fonksiyona geçiyoruz ve tüm değerlere 100 eklendiğini görüyoruz.

Groupby() & Agg() Kullanan Farklı Veri Türleriyle Pandas_udf()

UDF'yi birleştirilmiş sütunlara geçirmek için örneklere bakalım. Burada sütun değerleri önce groupby() fonksiyonu kullanılarak gruplandırılır ve toplama agg() fonksiyonu kullanılarak yapılır. UDF'mizi bu toplama işlevinin içine geçiriyoruz.

Sözdizimi:

pyspark_dataframe_object.groupby( 'gruplandırma_sütun' ).agg(UDF
(pyspark_dataframe_object[ 'kolon' ]))

Burada gruplama sütunundaki değerler önce gruplandırılır. Ardından, UDF'mize göre gruplandırılmış her veri üzerinde toplama yapılır.

Örnek 1: Toplam Ortalama() ile Pandas_udf()

Burada, dönüş tipi float olan kullanıcı tanımlı bir fonksiyon yaratıyoruz. Fonksiyonun içinde, mean() fonksiyonunu kullanarak ortalamayı hesaplıyoruz. Bu UDF, her tür için ortalama miktarı almak üzere 'miktar' sütununa geçirilir.

# ortalamayı/ortalamayı döndür

@pandas_udf( 'batmadan yüzmek' )

def ortalama_işlev(i: panda.Series) -> kayan nokta:

dönüş i.mean()

# Tür sütununu gruplandırarak miktar sütununu fonksiyona geçirin.

market_df.groupby( 'tip' ).agg(ortalama_işlev(pazar_df[ 'miktar' ])).göstermek()

Çıktı:

'Type' sütunundaki öğelere göre gruplandırıyoruz. İki grup oluşur - “meyve” ve “sebze”. Her grup için ortalama hesaplanır ve döndürülür.

Örnek 2: Aggregate Max() ve Min() ile Pandas_udf()

Burada tamsayı (int) dönüş tipine sahip kullanıcı tanımlı iki fonksiyon oluşturuyoruz. İlk UDF minimum değeri, ikinci UDF ise maksimum değeri döndürür.

# minimum değeri döndüren pandas_udf

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

i.min()'i döndür

# maksimum değeri döndüren pandas_udf

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

i.max()'ı döndür

# Locate_country'yi gruplandırarak miktar sütununu min_pandas_udf'a geçirin.

market_df.groupby( 'ülke_bul' ).agg(min_(market_df[ 'miktar' ])).göstermek()

# Locate_country'yi gruplandırarak miktar sütununu max_ pandas_udf'a geçirin.

market_df.groupby( 'ülke_bul' ).agg(max_(market_df[ 'miktar' ])).göstermek()

Çıktı:

Minimum ve maksimum değerleri döndürmek için, UDF'lerin dönüş tipinde min() ve max() fonksiyonlarını kullanırız. Şimdi verileri “locate_country” sütununda gruplandırıyoruz. Dört grup oluşturulur (“ÇİN”, “HİNDİSTAN”, “JAPONYA”, “ABD”). Her grup için maksimum miktarı döndürürüz. Benzer şekilde, minimum miktarı iade ediyoruz.

Çözüm

Temel olarak pandas_udf (), PySpark DataFrame'imiz üzerinde vektörleştirilmiş işlemleri gerçekleştirmek için kullanılır. pandas_udf() öğesinin nasıl oluşturulacağını ve PySpark DataFrame'e nasıl uygulanacağını gördük. Daha iyi anlamak için, tüm veri türlerini (dize, kayan nokta ve tamsayı) dikkate alarak farklı örnekleri tartıştık. agg() işlevi aracılığıyla pandas_udf()'u groupby() ile kullanmak mümkün olabilir.