PySpark'ta Tablo Verileri Nasıl Okunur ve Yazılır

Pyspark Ta Tablo Verileri Nasil Okunur Ve Yazilir



Veriler tablo biçiminde yüklenirse, PySpark'ta veri işleme daha hızlıdır. Bununla, SQl İfadelerini kullanarak işlem hızlı olacaktır. Bu nedenle, PySpark DataFrame/RDD'yi işlenmek üzere göndermeden önce bir tabloya dönüştürmek daha iyi bir yaklaşımdır. Bugün, tablo verilerini PySpark DataFrame'e nasıl okuyacağımızı, PySpark DataFrame'i tabloya nasıl yazacağımızı ve yerleşik işlevleri kullanarak mevcut tabloya yeni DataFrame'i nasıl ekleyeceğimizi göreceğiz. Hadi gidelim!

Pyspark.sql.DataFrameWriter.saveAsTable()

İlk olarak, write.saveAsTable() işlevini kullanarak mevcut PySpark DataFrame'i tabloya nasıl yazacağımızı göreceğiz. DataFrame'i tabloya yazmak için tablo adını ve modlar, partionBy vb. gibi diğer isteğe bağlı parametreleri alır. Bir parke dosyası olarak saklanır.

Sözdizimi:







dataframe_obj.write.saveAsTable(yol/Table_name,mode,partitionBy,…)
  1. Tablo_adı, dataframe_obj öğesinden oluşturulan tablonun adıdır.
  2. mode parametresini kullanarak tablonun verilerini ekleyebilir/üzerine yazabiliriz.
  3. partitionBy, sağlanan bu sütunlardaki değerlere dayalı olarak bölümler oluşturmak için tek/çoklu sütunları alır.

Örnek 1:

5 satır ve 4 sütun içeren bir PySpark DataFrame oluşturun. Bu Dataframe'i “Agri_Table1” adlı bir tabloya yazın.



pyspark'ı içe aktar

pyspark.sql'den SparkSession'ı içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# 5 satır ve 5 sütunlu tarım verileri

ağrı =[{ 'Toprak tipi' : 'Siyah' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 2500 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'AMERİKA BİRLEŞİK DEVLETLERİ' },

{ 'Toprak tipi' : 'Siyah' , 'Sulama_kullanılabilirliği' : 'Evet' , 'Dönüm' : 3500 , 'Toprak_durumu' : 'Islak' ,
'Ülke' : 'Hindistan' },

{ 'Toprak tipi' : 'Kırmızı' , 'Sulama_kullanılabilirliği' : 'Evet' , 'Dönüm' : 210 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'İngiltere' },

{ 'Toprak tipi' : 'Diğer' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 1000 , 'Toprak_durumu' : 'Islak' ,
'Ülke' : 'AMERİKA BİRLEŞİK DEVLETLERİ' },

{ 'Toprak tipi' : 'Kum' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 500 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'Hindistan' }]



# yukarıdaki verilerden veri çerçevesini oluştur

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Yukarıdaki DataFrame'i tabloya yazın.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Çıktı:







Önceki PySpark Verileri ile bir parke dosyasının oluşturulduğunu görebiliriz.



Örnek 2:

Önceki DataFrame'i göz önünde bulundurun ve 'Ülke' sütunundaki değerlere göre kayıtları bölümlendirerek tabloya 'Agri_Table2' yazın.

# Yukarıdaki DataFrame'i partitionBy parametresiyle tabloya yazın

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Ülke' ])

Çıktı:

'Ülke' sütununda üç benzersiz değer vardır - 'Hindistan', 'İngiltere' ve 'ABD'. Böylece, üç bölüm oluşturulur. Her bölüm, parke dosyalarını tutar.

Pyspark.sql.DataFrameReader.table()

Spark.read.table() işlevini kullanarak tabloyu PySpark DataFrame'e yükleyelim. Yol/tablo adı olan yalnızca bir parametre alır. Tabloyu doğrudan PySpark DataFrame'e yükler ve PySpark DataFrame'e uygulanan tüm SQL işlevleri bu yüklü DataFrame'e de uygulanabilir.

Sözdizimi:

spark_app.read.table(yol/'Table_name')

Bu senaryoda, PySpark DataFrame'den oluşturulan önceki tabloyu kullanıyoruz. Önceki senaryo kod parçacıklarını ortamınıza uygulamanız gerektiğinden emin olun.

Örnek:

'Agri_Table1' tablosunu 'loaded_data' adlı DataFrame'e yükleyin.

load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

load_data.show()

Çıktı:

Tablonun PySpark DataFrame'e yüklendiğini görebiliriz.

SQL Sorgularını Yürütme

Şimdi, Spark.sql() işlevini kullanarak yüklenen DataFrame üzerinde bazı SQL sorguları yürütüyoruz.

# Yukarıdaki tablodaki tüm sütunları görüntülemek için SELECT komutunu kullanın.

linuxhint_spark_app.sql( 'Agri_Table1'den * SEÇİN' ).göstermek()

# WHERE Maddesi

linuxhint_spark_app.sql( 'Agri_Table1'den * SEÇİN WHERE Soil_status='Dry' ' ).göstermek()

linuxhint_spark_app.sql( 'Agri_Table1 NEREDE Acre > 2000'den * SEÇİN ' ).göstermek()

Çıktı:

  1. İlk sorgu, DataFrame'deki tüm sütunları ve kayıtları görüntüler.
  2. İkinci sorgu, 'Soil_status' sütununa dayalı olarak kayıtları görüntüler. 'Kuru' öğeye sahip yalnızca üç kayıt vardır.
  3. Son sorgu, 2000'den büyük 'Acre' değerine sahip iki kayıt döndürür.

Pyspark.sql.DataFrameWriter.insertInto()

insertInto() işlevini kullanarak DataFrame'i mevcut tabloya ekleyebiliriz. Sütun adlarını tanımlamak ve ardından tabloya eklemek için bu işlevi selectExpr() ile birlikte kullanabiliriz. Bu işlev aynı zamanda tabloAdı'nı parametre olarak alır.

Sözdizimi:

DataFrame_obj.write.insertInto('Tablo_adı')

Bu senaryoda, PySpark DataFrame'den oluşturulan önceki tabloyu kullanıyoruz. Önceki senaryo kod parçacıklarını ortamınıza uygulamanız gerektiğinden emin olun.

Örnek:

İki kayıt içeren yeni bir DataFrame oluşturun ve bunları 'Agri_Table1' tablosuna ekleyin.

pyspark'ı içe aktar

pyspark.sql'den SparkSession'ı içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# 2 satırlı tarım verileri

ağrı =[{ 'Toprak tipi' : 'Kum' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 2500 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'AMERİKA BİRLEŞİK DEVLETLERİ' },

{ 'Toprak tipi' : 'Kum' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 1200 , 'Toprak_durumu' : 'Islak' ,
'Ülke' : 'Japonya' }]

# yukarıdaki verilerden veri çerçevesini oluştur

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Dönüm' , 'Ülke' , 'Sulama_kullanılabilirliği' , 'Toprak tipi' ,
'Toprak_durumu' ).write.insert( 'Agri_Table1' )

# Son Agri_Table1'i göster

linuxhint_spark_app.sql( 'Agri_Table1'den * SEÇİN' ).göstermek()

Çıktı:

Şimdi, DataFrame'de bulunan toplam satır sayısı 7'dir.

Çözüm

Artık write.saveAsTable() işlevini kullanarak PySpark DataFrame'i tabloya nasıl yazacağınızı anlıyorsunuz. Tablo adını ve diğer isteğe bağlı parametreleri alır. Ardından, bu tabloyu spark.read.table() işlevini kullanarak PySpark DataFrame'e yükledik. Yol/tablo adı olan yalnızca bir parametre alır. Yeni DataFrame'i mevcut tabloya eklemek isterseniz, insertInto() işlevini kullanın.