PySpark Read.Parquet()

Pyspark Read Parquet



PySpark'ta write.parquet() işlevi, DataFrame'i parke dosyasına yazar ve read.parquet(), parke dosyasını PySpark DataFrame'e veya başka herhangi bir DataSource'a okur. Apache Spark'daki sütunları hızlı ve verimli bir şekilde işlemek için verileri sıkıştırmamız gerekiyor. Veri sıkıştırma, hafızamızı korur ve tüm sütunlar düz seviyeye dönüştürülür. Bu, düz sütun düzeyinde depolamanın mevcut olduğu anlamına gelir. Bunları saklayan dosya PARKE dosyası olarak bilinir.

Bu kılavuzda, esas olarak pyspark.sql.DataFrameReader sınıfında bulunan read.parquet() işlevini kullanarak parke dosyasını PySpark DataFrame/SQL'e okumaya/yüklemeye odaklanacağız.

İçindekiler Konusu:







Parke Dosyasını Alın



Parke Dosyasını PySpark DataFrame'e okuyun



Parke Dosyasını PySpark SQL'e okuyun





Pyspark.sql.DataFrameReader.parquet()

Bu işlev, parke dosyasını okumak ve onu PySpark DataFrame'e yüklemek için kullanılır. Parke dosyasının yol/dosya adını alır. Genel işlev bu olduğundan, basitçe read.parquet() işlevini kullanabiliriz.

Sözdizimi:



Şimdi read.parquet() sözdizimini görelim:

spark_app.read.parquet(dosya_adı.parquet/yol)

İlk olarak, pip komutunu kullanarak PySpark modülünü kurun:

pip kurulumu pyspark

Parke Dosyasını Alın

Bir parke dosyasını okumak için, o verilerden parke dosyasının oluşturulduğu verilere ihtiyacınız vardır. Bu bölümde, PySpark DataFrame'den bir parke dosyasının nasıl oluşturulacağını göreceğiz.

5 kayıt ile bir PySpark DataFrame oluşturalım ve bunu “industry_parquet” parke dosyasına yazalım.

pyspark'ı içe aktar

pyspark.sql'den SparkSession, Row'u içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# Endüstri ayrıntılarını saklayan veri çerçevesini oluşturun

Industry_df = linuxhint_spark_app.createDataFrame([Satır(Tür= 'Tarım' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,
Değerlendirme= 'Sıcak' ,Toplam_çalışanlar= 100 ),

Satır(Tür= 'Tarım' ,Alan= 'Hindistan' ,Derecelendirme= 'Sıcak' ,Toplam_çalışanlar= 200 ),

Satır(Tür= 'Gelişim' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,Derecelendirme= 'Ilık' ,Toplam_çalışanlar= 100 ),

Satır(Tür= 'Eğitim' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,Derecelendirme= 'Serin' ,Toplam_çalışanlar= 400 ),

Satır(Tür= 'Eğitim' ,Alan= 'AMERİKA BİRLEŞİK DEVLETLERİ' ,Derecelendirme= 'Ilık' ,Toplam_çalışanlar= yirmi )

])

# Gerçek Veri Çerçevesi

Industry_df.show()

# Industry_df'yi parke dosyasına yazın

Industry_df.coalesce( 1 ).write.parke( 'endüstri_parke' )

Çıktı:

Bu, 5 kaydı tutan DataFrame'dir.

Önceki DataFrame için bir parke dosyası oluşturulur. Burada uzantılı dosya adımız “part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet” şeklindedir. Bu dosyayı öğreticinin tamamında kullanıyoruz.

Parke Dosyasını PySpark DataFrame'e okuyun

Parke dosyamız var. Bu dosyayı read.parquet() işlevini kullanarak okuyalım ve PySpark DataFrame içine yükleyelim.

pyspark'ı içe aktar

pyspark.sql'den SparkSession, Row'u içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# Parke dosyasını dataframe_from_parquet nesnesine oku.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# dataframe_from_parquet-DataFrame'i görüntüleyin

dataframe_from_parquet.show()

Çıktı:

DataFrame'i parquet dosyasından oluşturulan show() metodunu kullanarak gösteriyoruz.

Parke Dosyası ile SQL Sorguları

DataFrame'e yüklendikten sonra, SQL tablolarını oluşturmak ve DataFrame'de bulunan verileri görüntülemek mümkün olabilir. TEMPORARY VIEW oluşturmamız ve parke dosyasından oluşturulan DataFrame'den kayıtları döndürmek için SQL komutlarını kullanmamız gerekiyor.

Örnek 1:

'Sectors' adlı geçici bir görünüm oluşturun ve DataFrame'deki kayıtları görüntülemek için SELECT komutunu kullanın. buna başvurabilirsiniz öğretici bu, Spark – SQL'de bir GÖRÜNÜMün nasıl oluşturulacağını açıklar.

pyspark'ı içe aktar

pyspark.sql'den SparkSession, Row'u içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# Parke dosyasını dataframe_from_parquet nesnesine oku.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# - 'Sectors' adlı yukarıdaki parke dosyasından Görünüm oluşturun

dataframe_from_parquet.createOrReplaceTempView( 'sektörler' )

# Sektörlerdeki tüm kayıtları görüntülemek için sorgu

linuxhint_spark_app.sql( 'Sektörlerden * seçin' ).göstermek()

Çıktı:

Örnek 2:

Önceki GÖRÜNÜMÜ kullanarak SQL sorgusunu yazın:

  1. “Hindistan” a ait Sektörlerdeki tüm kayıtları görüntülemek için.
  2. 100'den fazla çalışanı olan Sektörlerdeki tüm kayıtları görüntülemek için.
# 'Hindistan' a ait Sektörlerdeki tüm kayıtları görüntülemek için sorgulayın.

linuxhint_spark_app.sql( 'Alan = 'Hindistan' olan Sektörlerden * seçin' ).göstermek()

# 100'den fazla çalışanı olan Sektörlerdeki tüm kayıtları görüntülemek için sorgula

linuxhint_spark_app.sql( 'Toplam_çalışanların>100 olduğu Sektörlerden * seçin' ).göstermek()

Çıktı:

Alanı “Hindistan” olan tek bir kayıt ve 100'den büyük çalışanı olan iki kayıt var.

Parke Dosyasını PySpark SQL'e okuyun

Öncelikle CREATE komutunu kullanarak bir VIEW oluşturmamız gerekiyor. SQL sorgusu içerisindeki “path” anahtar kelimesini kullanarak parke dosyasını Spark SQL'e okuyabiliriz. Yoldan sonra dosyanın adını/konumunu belirtmemiz gerekiyor.

Sözdizimi:

kıvılcım_uygulama.sql( 'PARKE SEÇENEKLERİNİ KULLANARAK GEÇİCİ GÖRÜNÜM view_name OLUŞTURUN (yol ' dosya_adı.parke ')' )

Örnek 1:

“Sector2” adlı geçici bir görünüm oluşturun ve parke dosyasını içine okuyun. sql() işlevini kullanarak, görünümde bulunan tüm kayıtları görüntülemek için seçme sorgusunu yazın.

pyspark'ı içe aktar

pyspark.sql'den SparkSession, Row'u içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# Parke dosyasını Spark-SQL'e okuyun

linuxhint_spark_app.sql( 'PARKE SEÇENEKLERİNİ KULLANARAK Sector2 GEÇİCİ GÖRÜNÜM OLUŞTURUN (yol ' parça-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parke ')' )

# Sektör2'deki tüm kayıtları görüntülemek için sorgu

linuxhint_spark_app.sql( 'Sector2'den * seçin' ).göstermek()

Çıktı:

Örnek 2:

Önceki GÖRÜNÜMÜ kullanın ve 'Sıcak' veya 'Soğuk' derecelendirmesine sahip tüm kayıtları görüntülemek için sorguyu yazın.

# Derecelendirme- Sıcak veya Soğuk ile Sektör2'deki tüm kayıtları görüntülemek için sorgulayın.

linuxhint_spark_app.sql( 'Sektör 2'den *'yi seçin, burada Rating='Hot' OR Rating='Cool'' ).göstermek()

Çıktı:

Derecelendirmesi 'Sıcak' veya 'Soğuk' olan üç kayıt vardır.

Çözüm

PySpark'ta write.parquet() işlevi, DataFrame'i parke dosyasına yazar. read.parquet() işlevi, parke dosyasını PySpark DataFrame'e veya başka herhangi bir DataSource'a okur. Parke dosyasını PySpark DataFrame'e ve PySpark tablosuna nasıl okuyacağımızı öğrendik. Bu öğreticinin bir parçası olarak, PySpark DataFrame'den tabloların nasıl oluşturulacağını ve WHERE yan tümcesini kullanarak verilerin nasıl filtreleneceğini de tartıştık.