Bu kılavuzda, esas olarak pyspark.sql.DataFrameReader sınıfında bulunan read.parquet() işlevini kullanarak parke dosyasını PySpark DataFrame/SQL'e okumaya/yüklemeye odaklanacağız.
İçindekiler Konusu:
Parke Dosyasını PySpark DataFrame'e okuyun
Parke Dosyasını PySpark SQL'e okuyun
Pyspark.sql.DataFrameReader.parquet()
Bu işlev, parke dosyasını okumak ve onu PySpark DataFrame'e yüklemek için kullanılır. Parke dosyasının yol/dosya adını alır. Genel işlev bu olduğundan, basitçe read.parquet() işlevini kullanabiliriz.
Sözdizimi:
Şimdi read.parquet() sözdizimini görelim:
spark_app.read.parquet(dosya_adı.parquet/yol)İlk olarak, pip komutunu kullanarak PySpark modülünü kurun:
pip kurulumu pyspark
Parke Dosyasını Alın
Bir parke dosyasını okumak için, o verilerden parke dosyasının oluşturulduğu verilere ihtiyacınız vardır. Bu bölümde, PySpark DataFrame'den bir parke dosyasının nasıl oluşturulacağını göreceğiz.
5 kayıt ile bir PySpark DataFrame oluşturalım ve bunu “industry_parquet” parke dosyasına yazalım.
pyspark'ı içe aktarpyspark.sql'den SparkSession, Row'u içe aktarın
linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()
# Endüstri ayrıntılarını saklayan veri çerçevesini oluşturun
Industry_df = linuxhint_spark_app.createDataFrame([Satır(Tür= 'Tarım' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,
Değerlendirme= 'Sıcak' ,Toplam_çalışanlar= 100 ),
Satır(Tür= 'Tarım' ,Alan= 'Hindistan' ,Derecelendirme= 'Sıcak' ,Toplam_çalışanlar= 200 ),
Satır(Tür= 'Gelişim' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,Derecelendirme= 'Ilık' ,Toplam_çalışanlar= 100 ),
Satır(Tür= 'Eğitim' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,Derecelendirme= 'Serin' ,Toplam_çalışanlar= 400 ),
Satır(Tür= 'Eğitim' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,Derecelendirme= 'Ilık' ,Toplam_çalışanlar= yirmi )
])
# Gerçek Veri Çerçevesi
Industry_df.show()
# Industry_df'yi parke dosyasına yazın
Industry_df.coalesce( 1 ).write.parke( 'endüstri_parke' )
Çıktı:
Bu, 5 kaydı tutan DataFrame'dir.
Önceki DataFrame için bir parke dosyası oluşturulur. Burada uzantılı dosya adımız “part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet” şeklindedir. Bu dosyayı öğreticinin tamamında kullanıyoruz.
Parke Dosyasını PySpark DataFrame'e okuyun
Parke dosyamız var. Bu dosyayı read.parquet() işlevini kullanarak okuyalım ve PySpark DataFrame içine yükleyelim.
pyspark'ı içe aktarpyspark.sql'den SparkSession, Row'u içe aktarın
linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()
# Parke dosyasını dataframe_from_parquet nesnesine oku.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# dataframe_from_parquet-DataFrame'i görüntüleyin
dataframe_from_parquet.show()
Çıktı:
DataFrame'i parquet dosyasından oluşturulan show() metodunu kullanarak gösteriyoruz.
Parke Dosyası ile SQL Sorguları
DataFrame'e yüklendikten sonra, SQL tablolarını oluşturmak ve DataFrame'de bulunan verileri görüntülemek mümkün olabilir. TEMPORARY VIEW oluşturmamız ve parke dosyasından oluşturulan DataFrame'den kayıtları döndürmek için SQL komutlarını kullanmamız gerekiyor.
Örnek 1:
'Sectors' adlı geçici bir görünüm oluşturun ve DataFrame'deki kayıtları görüntülemek için SELECT komutunu kullanın. buna başvurabilirsiniz öğretici bu, Spark – SQL'de bir GÖRÜNÜMün nasıl oluşturulacağını açıklar.
pyspark'ı içe aktarpyspark.sql'den SparkSession, Row'u içe aktarın
linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()
# Parke dosyasını dataframe_from_parquet nesnesine oku.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# - 'Sectors' adlı yukarıdaki parke dosyasından Görünüm oluşturun
dataframe_from_parquet.createOrReplaceTempView( 'sektörler' )
# Sektörlerdeki tüm kayıtları görüntülemek için sorgu
linuxhint_spark_app.sql( 'Sektörlerden * seçin' ).göstermek()
Çıktı:
Örnek 2:
Önceki GÖRÜNÜMÜ kullanarak SQL sorgusunu yazın:
- “Hindistan” a ait Sektörlerdeki tüm kayıtları görüntülemek için.
- 100'den fazla çalışanı olan Sektörlerdeki tüm kayıtları görüntülemek için.
linuxhint_spark_app.sql( 'Alan = 'Hindistan' olan Sektörlerden * seçin' ).göstermek()
# 100'den fazla çalışanı olan Sektörlerdeki tüm kayıtları görüntülemek için sorgula
linuxhint_spark_app.sql( 'Toplam_çalışanların>100 olduğu Sektörlerden * seçin' ).göstermek()
Çıktı:
Alanı “Hindistan” olan tek bir kayıt ve 100'den büyük çalışanı olan iki kayıt var.
Parke Dosyasını PySpark SQL'e okuyun
Öncelikle CREATE komutunu kullanarak bir VIEW oluşturmamız gerekiyor. SQL sorgusu içerisindeki “path” anahtar kelimesini kullanarak parke dosyasını Spark SQL'e okuyabiliriz. Yoldan sonra dosyanın adını/konumunu belirtmemiz gerekiyor.
Sözdizimi:
kıvılcım_uygulama.sql( 'PARKE SEÇENEKLERİNİ KULLANARAK GEÇİCİ GÖRÜNÜM view_name OLUŞTURUN (yol ' dosya_adı.parke ')' )Örnek 1:
“Sector2” adlı geçici bir görünüm oluşturun ve parke dosyasını içine okuyun. sql() işlevini kullanarak, görünümde bulunan tüm kayıtları görüntülemek için seçme sorgusunu yazın.
pyspark'ı içe aktarpyspark.sql'den SparkSession, Row'u içe aktarın
linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()
# Parke dosyasını Spark-SQL'e okuyun
linuxhint_spark_app.sql( 'PARKE SEÇENEKLERİNİ KULLANARAK Sector2 GEÇİCİ GÖRÜNÜM OLUŞTURUN (yol ' parça-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parke ')' )
# Sektör2'deki tüm kayıtları görüntülemek için sorgu
linuxhint_spark_app.sql( 'Sector2'den * seçin' ).göstermek()
Çıktı:
Örnek 2:
Önceki GÖRÜNÜMÜ kullanın ve 'Sıcak' veya 'Soğuk' derecelendirmesine sahip tüm kayıtları görüntülemek için sorguyu yazın.
# Derecelendirme- Sıcak veya Soğuk ile Sektör2'deki tüm kayıtları görüntülemek için sorgulayın.linuxhint_spark_app.sql( 'Sektör 2'den *'yi seçin, burada Rating='Hot' OR Rating='Cool'' ).göstermek()
Çıktı:
Derecelendirmesi 'Sıcak' veya 'Soğuk' olan üç kayıt vardır.
Çözüm
PySpark'ta write.parquet() işlevi, DataFrame'i parke dosyasına yazar. read.parquet() işlevi, parke dosyasını PySpark DataFrame'e veya başka herhangi bir DataSource'a okur. Parke dosyasını PySpark DataFrame'e ve PySpark tablosuna nasıl okuyacağımızı öğrendik. Bu öğreticinin bir parçası olarak, PySpark DataFrame'den tabloların nasıl oluşturulacağını ve WHERE yan tümcesini kullanarak verilerin nasıl filtreleneceğini de tartıştık.