Pyspark.sql.DataFrameWriter.saveAsTable()
İlk olarak, write.saveAsTable() işlevini kullanarak mevcut PySpark DataFrame'i tabloya nasıl yazacağımızı göreceğiz. DataFrame'i tabloya yazmak için tablo adını ve modlar, partionBy vb. gibi diğer isteğe bağlı parametreleri alır. Bir parke dosyası olarak saklanır.
Sözdizimi:
dataframe_obj.write.saveAsTable(yol/Table_name,mode,partitionBy,…)
- Tablo_adı, dataframe_obj öğesinden oluşturulan tablonun adıdır.
- mode parametresini kullanarak tablonun verilerini ekleyebilir/üzerine yazabiliriz.
- partitionBy, sağlanan bu sütunlardaki değerlere dayalı olarak bölümler oluşturmak için tek/çoklu sütunları alır.
Örnek 1:
5 satır ve 4 sütun içeren bir PySpark DataFrame oluşturun. Bu Dataframe'i “Agri_Table1” adlı bir tabloya yazın.
pyspark'ı içe aktar
pyspark.sql'den SparkSession'ı içe aktarın
linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()
# 5 satır ve 5 sütunlu tarım verileri
ağrı =[{ 'Toprak tipi' : 'Siyah' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 2500 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'AMERİKA BİRLEŞİK DEVLETLERİ' },
{ 'Toprak tipi' : 'Siyah' , 'Sulama_kullanılabilirliği' : 'Evet' , 'Dönüm' : 3500 , 'Toprak_durumu' : 'Islak' ,
'Ülke' : 'Hindistan' },
{ 'Toprak tipi' : 'Kırmızı' , 'Sulama_kullanılabilirliği' : 'Evet' , 'Dönüm' : 210 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'İngiltere' },
{ 'Toprak tipi' : 'Diğer' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 1000 , 'Toprak_durumu' : 'Islak' ,
'Ülke' : 'AMERİKA BİRLEŞİK DEVLETLERİ' },
{ 'Toprak tipi' : 'Kum' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 500 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'Hindistan' }]
# yukarıdaki verilerden veri çerçevesini oluştur
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Yukarıdaki DataFrame'i tabloya yazın.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Çıktı:
Önceki PySpark Verileri ile bir parke dosyasının oluşturulduğunu görebiliriz.
Örnek 2:
Önceki DataFrame'i göz önünde bulundurun ve 'Ülke' sütunundaki değerlere göre kayıtları bölümlendirerek tabloya 'Agri_Table2' yazın.
# Yukarıdaki DataFrame'i partitionBy parametresiyle tabloya yazınagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Ülke' ])
Çıktı:
'Ülke' sütununda üç benzersiz değer vardır - 'Hindistan', 'İngiltere' ve 'ABD'. Böylece, üç bölüm oluşturulur. Her bölüm, parke dosyalarını tutar.
Pyspark.sql.DataFrameReader.table()
Spark.read.table() işlevini kullanarak tabloyu PySpark DataFrame'e yükleyelim. Yol/tablo adı olan yalnızca bir parametre alır. Tabloyu doğrudan PySpark DataFrame'e yükler ve PySpark DataFrame'e uygulanan tüm SQL işlevleri bu yüklü DataFrame'e de uygulanabilir.
Sözdizimi:
spark_app.read.table(yol/'Table_name')Bu senaryoda, PySpark DataFrame'den oluşturulan önceki tabloyu kullanıyoruz. Önceki senaryo kod parçacıklarını ortamınıza uygulamanız gerektiğinden emin olun.
Örnek:
'Agri_Table1' tablosunu 'loaded_data' adlı DataFrame'e yükleyin.
load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )load_data.show()
Çıktı:
Tablonun PySpark DataFrame'e yüklendiğini görebiliriz.
SQL Sorgularını Yürütme
Şimdi, Spark.sql() işlevini kullanarak yüklenen DataFrame üzerinde bazı SQL sorguları yürütüyoruz.
# Yukarıdaki tablodaki tüm sütunları görüntülemek için SELECT komutunu kullanın.linuxhint_spark_app.sql( 'Agri_Table1'den * SEÇİN' ).göstermek()
# WHERE Maddesi
linuxhint_spark_app.sql( 'Agri_Table1'den * SEÇİN WHERE Soil_status='Dry' ' ).göstermek()
linuxhint_spark_app.sql( 'Agri_Table1 NEREDE Acre > 2000'den * SEÇİN ' ).göstermek()
Çıktı:
- İlk sorgu, DataFrame'deki tüm sütunları ve kayıtları görüntüler.
- İkinci sorgu, 'Soil_status' sütununa dayalı olarak kayıtları görüntüler. 'Kuru' öğeye sahip yalnızca üç kayıt vardır.
- Son sorgu, 2000'den büyük 'Acre' değerine sahip iki kayıt döndürür.
Pyspark.sql.DataFrameWriter.insertInto()
insertInto() işlevini kullanarak DataFrame'i mevcut tabloya ekleyebiliriz. Sütun adlarını tanımlamak ve ardından tabloya eklemek için bu işlevi selectExpr() ile birlikte kullanabiliriz. Bu işlev aynı zamanda tabloAdı'nı parametre olarak alır.
Sözdizimi:
DataFrame_obj.write.insertInto('Tablo_adı')Bu senaryoda, PySpark DataFrame'den oluşturulan önceki tabloyu kullanıyoruz. Önceki senaryo kod parçacıklarını ortamınıza uygulamanız gerektiğinden emin olun.
Örnek:
İki kayıt içeren yeni bir DataFrame oluşturun ve bunları 'Agri_Table1' tablosuna ekleyin.
pyspark'ı içe aktarpyspark.sql'den SparkSession'ı içe aktarın
linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()
# 2 satırlı tarım verileri
ağrı =[{ 'Toprak tipi' : 'Kum' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 2500 , 'Toprak_durumu' : 'Kuru' ,
'Ülke' : 'AMERİKA BİRLEŞİK DEVLETLERİ' },
{ 'Toprak tipi' : 'Kum' , 'Sulama_kullanılabilirliği' : 'HAYIR' , 'Dönüm' : 1200 , 'Toprak_durumu' : 'Islak' ,
'Ülke' : 'Japonya' }]
# yukarıdaki verilerden veri çerçevesini oluştur
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Dönüm' , 'Ülke' , 'Sulama_kullanılabilirliği' , 'Toprak tipi' ,
'Toprak_durumu' ).write.insert( 'Agri_Table1' )
# Son Agri_Table1'i göster
linuxhint_spark_app.sql( 'Agri_Table1'den * SEÇİN' ).göstermek()
Çıktı:
Şimdi, DataFrame'de bulunan toplam satır sayısı 7'dir.
Çözüm
Artık write.saveAsTable() işlevini kullanarak PySpark DataFrame'i tabloya nasıl yazacağınızı anlıyorsunuz. Tablo adını ve diğer isteğe bağlı parametreleri alır. Ardından, bu tabloyu spark.read.table() işlevini kullanarak PySpark DataFrame'e yükledik. Yol/tablo adı olan yalnızca bir parametre alır. Yeni DataFrame'i mevcut tabloya eklemek isterseniz, insertInto() işlevini kullanın.