PySpark DataFrame'i JSON'a dönüştürme

Pyspark Dataframe I Json A Donusturme



Yapılandırılmış bir veriyi JSON kullanarak iletmek mümkündür ve ayrıca düşük bellek tüketir. PySpark RDD veya PySpark DataFrame ile karşılaştırıldığında JSON, JSON ile mümkün olan düşük bellek ve serileştirme kullanır. PySpark DataFrame'i pyspark.sql.DataFrameWriter.json() yöntemini kullanarak JSON'a dönüştürebiliriz. Bunun dışında DataFrame'i JSON'a dönüştürmenin iki yolu daha vardır.

İçindekiler Konusu:

Tüm örneklerde basit bir PySpark DataFrame düşünelim ve belirtilen fonksiyonları kullanarak JSON'a dönüştürelim.







Gerekli Modül:

Henüz kurulu değilse, PySpark kütüphanesini ortamınıza kurun. Yüklemek için aşağıdaki komuta başvurabilirsiniz:



pip kurulumu pyspark

To_json() ile ToPandas() Kullanarak JSON'a PySpark DataFrame

to_json() yöntemi, Pandas DataFrame'i JSON'a dönüştüren Pandas modülünde mevcuttur. PySpark DataFrame'imizi Pandas DataFrame'e dönüştürürsek bu yöntemi kullanabiliriz. PySpark DataFrame'i Pandas DataFrame'e dönüştürmek için toPandas() yöntemi kullanılır. Parametreleriyle birlikte to_json() sözdizimini görelim.



Sözdizimi:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient, dönüştürülen JSON'u istenen formatta görüntülemek için kullanılır. “Kayıtlar”, “tablo”, “değerler”, “sütunlar”, “dizin”, “bölme” alır.
  2. Dizin, dönüştürülen JSON dizesine dizini eklemek/kaldırmak için kullanılır. 'True' olarak ayarlanırsa, indeksler görüntülenir. Aksi takdirde, oryantasyon 'bölünmüş' veya 'tablo' ise endeksler görüntülenmez.

Örnek 1: 'Kayıtlar' Olarak Yönlendirin

3 satır ve 4 sütun içeren bir 'skills_df' PySpark DataFrame oluşturun. orient parametresini 'records' olarak belirterek bu DataFrame'i JSON'a dönüştürün.

pyspark'ı içe aktar

pandaları ithal et

pyspark.sql'den SparkSession'ı içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# 3 satır ve 4 sütun içeren beceri verileri

beceriler =[{ 'İD' : 123 , 'kişi' : 'Bal' , 'yetenek' : 'tablo' , 'ödül' : 25000 },

{ 'İD' : 112 , 'kişi' : 'Mouni' , 'yetenek' : 'dans' , 'ödül' : 2000 },

{ 'İD' : 153 , 'kişi' : 'Tulası' , 'yetenek' : 'okuma' , 'ödül' : 1200 }

]

# yukarıdaki verilerden beceri veri çerçevesini oluşturun

beceriler_df = linuxhint_spark_app.createDataFrame(beceriler)

# Gerçek beceri verileri

Skills_df.show()

# 'kayıt' olarak yönlendirerek to_json() kullanarak JSON'a dönüştürün

json_skills_data = skill_df.toPandas().to_json(orient= 'kayıtlar' )

print(json_skills_data)

Çıktı:



+---+------+-----+--------+

| kimlik|kişi|ödül| beceri|

+---+------+-----+--------+

| 123 | tatlım| 25000 |resim|

| 112 | Mouni| 2000 | dans|

| 153 |Tülası| 1200 | okuma|

+---+------+-----+--------+

[{ 'İD' : 123 , 'kişi' : 'Bal' , 'ödül' : 25000 , 'yetenek' : 'tablo' },{ 'İD' : 112 , 'kişi' : 'Mouni' , 'ödül' : 2000 , 'yetenek' : 'dans' },{ 'İD' : 153 , 'kişi' : 'Tulası' , 'ödül' : 1200 , 'yetenek' : 'okuma' }]

PySpark DataFrame'in bir değerler sözlüğü ile JSON dizisine dönüştürüldüğünü görebiliriz. Burada, anahtarlar sütun adını temsil eder ve değer, PySpark DataFrame'deki satır/hücre değerini temsil eder.

Örnek 2: 'Bölünmüş' Olarak Yönlendirin

'Bölünmüş' yön tarafından döndürülen JSON biçimi, bir sütun listesi, dizin listesi ve veri listesi içeren sütun adlarını içerir. Aşağıda “bölünmüş” yönün biçimi verilmiştir.

# 'split' olarak yönlendirerek to_json() kullanarak JSON'a dönüştürün

json_skills_data = skill_df.toPandas().to_json(orient= 'bölmek' )

print(json_skills_data)

Çıktı:

{ 'sütunlar' :[ 'İD' , 'kişi' , 'ödül' , 'yetenek' ], 'dizin' :[ 0 , 1 , 2 ], 'veri' :[[ 123 , 'Bal' , 25000 , 'tablo' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulası' , 1200 , 'okuma' ]]}

Örnek 3: 'Dizin' olarak yönlendirme

Burada, PySpark DataFrame'deki her satır, sütun adı olarak anahtarı olan bir sözlük biçiminde kullanımdan kaldırılmıştır. Her sözlük için dizin konumu bir anahtar olarak belirtilir.

# Orient as 'index' ile to_json() kullanarak JSON'a dönüştürün

json_skills_data = skill_df.toPandas().to_json(orient= 'dizin' )

print(json_skills_data)

Çıktı:

{ '0' :{ 'İD' : 123 , 'kişi' : 'Bal' , 'ödül' : 25000 , 'yetenek' : 'tablo' }, '1' :{ 'İD' : 112 , 'kişi' : 'Mouni' , 'ödül' : 2000 , 'yetenek' : 'dans' }, '2' :{ 'İD' : 153 , 'kişi' : 'Tulası' , 'ödül' : 1200 , 'yetenek' : 'okuma' }}

Örnek 4: 'Sütunlar' Olarak Yönlendirin

Sütunlar her kayıt için anahtardır. Her sütun, dizin numaralarıyla sütun değerlerini alan bir sözlüğe sahiptir.

# to_json() kullanarak 'columns' olarak yönlendirerek JSON'a dönüştürün

json_skills_data = skill_df.toPandas().to_json(orient= 'sütunlar' )

print(json_skills_data)

Çıktı:

{ 'İD' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'kişi' :{ '0' : 'Bal' , '1' : 'Mouni' , '2' : 'Tulası' }, 'ödül' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'yetenek' :{ '0' : 'tablo' , '1' : 'dans' , '2' : 'okuma' }}

Örnek 5: 'Değerler' Olarak Yönlendirin

Yalnızca JSON'daki değerlere ihtiyacınız varsa, 'değerler' yönüne gidebilirsiniz. Her satırı bir listede görüntüler. Son olarak, tüm listeler bir listede saklanır. Bu JSON, iç içe geçmiş liste türündedir.

# 'değerler' olarak yönlendirme ile to_json() kullanarak JSON'a dönüştürün

json_skills_data = skill_df.toPandas().to_json(orient= 'değerler' )

print(json_skills_data)

Çıktı:

[[ 123 , 'Bal' , 25000 , 'tablo' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulası' , 1200 , 'okuma' ]]

Örnek 6: “Tablo” Olarak Yönlendirin

'Tablo' yönü, sütun veri türleri, birincil anahtar olarak dizin ve Pandas sürümü ile birlikte alan adlarına sahip şemayı içeren JSON'u döndürür. Değer içeren sütun adları “veri” olarak görüntülenir.

# to_json() kullanarak 'tablo' olarak yönlendirerek JSON'a dönüştürün

json_skills_data = skill_df.toPandas().to_json(orient= 'masa' )

print(json_skills_data)

Çıktı:

{ 'şema' :{ 'alanlar' :[{ 'isim' : 'dizin' , 'tip' : 'tamsayı' },{ 'isim' : 'İD' , 'tip' : 'tamsayı' },{ 'isim' : 'kişi' , 'tip' : 'sicim' },{ 'isim' : 'ödül' , 'tip' : 'tamsayı' },{ 'isim' : 'yetenek' , 'tip' : 'sicim' }], 'birincil anahtar' :[ 'dizin' ], 'pandalar_versiyonu' : '1.4.0' }, 'veri' :[{ 'dizin' : 0 , 'İD' : 123 , 'kişi' : 'Bal' , 'ödül' : 25000 , 'yetenek' : 'tablo' },{ 'dizin' : 1 , 'İD' : 112 , 'kişi' : 'Mouni' , 'ödül' : 2000 , 'yetenek' : 'dans' },{ 'dizin' : 2 , 'İD' : 153 , 'kişi' : 'Tulası' , 'ödül' : 1200 , 'yetenek' : 'okuma' }]}

Örnek 7: İndeks Parametresiyle

Öncelikle index parametresini “True” yaparak geçiyoruz. Dizin konumunun bir sözlükte anahtar olarak döndürüldüğünü her sütun değeri için göreceksiniz.

İkinci çıktıda, dizin 'False' olarak ayarlandığından, dizin konumları olmadan yalnızca sütun adları ('sütunlar') ve kayıtlar ('veriler') döndürülür.

# To_json() ile index=True kullanarak JSON'a dönüştürün

json_skills_data = skill_df.toPandas().to_json(index=Doğru)

print(json_skills_data, ' \N ' )

# to_json() ile index=False kullanarak JSON'a dönüştürün

json_skills_data= skill_df.toPandas().to_json(index=False,orient= 'bölmek' )

print(json_skills_data)

Çıktı:

{ 'İD' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'kişi' :{ '0' : 'Bal' , '1' : 'Mouni' , '2' : 'Tulası' }, 'ödül' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'yetenek' :{ '0' : 'tablo' , '1' : 'dans' , '2' : 'okuma' }}

{ 'sütunlar' :[ 'İD' , 'kişi' , 'ödül' , 'yetenek' ], 'veri' :[[ 123 , 'Bal' , 25000 , 'tablo' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulası' , 1200 , 'okuma' ]]

ToJSON() Kullanarak JSON'a PySpark DataFrame

toJSON() yöntemi, PySpark DataFrame'i bir JSON nesnesine dönüştürmek için kullanılır. Temel olarak, bir listeyle çevrili bir JSON dizesi döndürür. bu ['{sütun:değer,…}',…. ] bu işlev tarafından döndürülen biçimdir. Burada, PySpark DataFrame'den gelen her satır, anahtar olarak sütun adı ile bir sözlük olarak döndürülür.

Sözdizimi:

dataframe_object.toJSON()

Dizin, sütun etiketleri ve veri türü gibi parametreleri geçirmek mümkün olabilir.

Örnek:

5 satır ve 4 sütun içeren bir 'skills_df' PySpark DataFrame oluşturun. toJSON() yöntemini kullanarak bu DataFrame'i JSON'a dönüştürün.

pyspark'ı içe aktar

pyspark.sql'den SparkSession'ı içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# 5 satır ve 4 sütun içeren beceri verileri

beceriler =[{ 'İD' : 123 , 'kişi' : 'Bal' , 'yetenek' : 'tablo' , 'ödül' : 25000 },

{ 'İD' : 112 , 'kişi' : 'Mouni' , 'yetenek' : 'müzik/dans' , 'ödül' : 2000 },

{ 'İD' : 153 , 'kişi' : 'Tulası' , 'yetenek' : 'okuma' , 'ödül' : 1200 },

{ 'İD' : 173 , 'kişi' : 'Koştu' , 'yetenek' : 'müzik' , 'ödül' : 2000 },

{ 'İD' : 43 , 'kişi' : 'Kamala' , 'yetenek' : 'okuma' , 'ödül' : 10000 }

]

# yukarıdaki verilerden beceri veri çerçevesini oluşturun

beceriler_df = linuxhint_spark_app.createDataFrame(beceriler)

# Gerçek beceri verileri

Skills_df.show()

# JSON dizisine dönüştür

json_skills_data = skill_df.toJSON().collect()

print(json_skills_data)

Çıktı:

+---+------+-----+-------------+

| kimlik|kişi|ödül| beceri|

+---+------+-----+-------------+

| 123 | tatlım| 25000 | boyama|

| 112 | Mouni| 2000 |müzik/dans|

| 153 |Tülası| 1200 | okuma|

| 173 | koştu| 2000 | müzik|

| 43 |Kamala| 10000 | okuma|

+---+------+-----+-------------+

[ '{'id':123,'person':'Honey','prize':25000,'skill':'resim'}' , '{'id':112,'person':'Mouni','prize':2000,'skill':'müzik/dans'}' , '{'id':153,'person':'Tulasi','prize':1200,'skill':'okuma'}' , '{'id':173,'person':'Ran','prize':2000,'skill':'music'}' , '{'id':43,'person':'Kamala','prize':10000,'skill':'okuma'}' ]

PySpark DataFrame'de 5 satır vardır. Tüm bu 5 satır, virgülle ayrılmış dizelerin bir sözlüğü olarak döndürülür.

Write.json() Kullanarak JSON'a PySpark DataFrame

PySpark DataFrame'i bir JSON dosyasına yazan/kaydeden write.json() yöntemi PySpark'ta mevcuttur. Dosya adını/yolunu parametre olarak alır. Temel olarak, JSON'u birden çok dosyada (bölümlenmiş dosyalar) döndürür. Hepsini tek bir dosyada birleştirmek için birleştirme() yöntemini kullanabiliriz.

Sözdizimi:

dataframe_object.coalesce( 1 ).write.json('dosya_adı')
  1. Ekleme Modu – dataframe_object.write.mode('append').json('file_name')
  2. Üzerine Yazma Modu – dataframe_object.write.mode('üzerine yaz').json('dosya_adı')

Mevcut JSON'u eklemek/üzerine yazmak mümkün olabilir. write.mode() kullanarak, bu fonksiyona “append” ileterek veriyi ekleyebilir veya “overwrite” ileterek mevcut JSON verisinin üzerine yazabiliriz.

Örnek 1:

3 satır ve 4 sütun içeren bir 'skills_df' PySpark DataFrame oluşturun. Bu DataFrame'i JSON'a yazın.

pyspark'ı içe aktar

pandaları ithal et

pyspark.sql'den SparkSession'ı içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

# 3 satır ve 4 sütun içeren beceri verileri

beceriler =[{ 'İD' : 123 , 'kişi' : 'Bal' , 'yetenek' : 'tablo' , 'ödül' : 25000 },

{ 'İD' : 112 , 'kişi' : 'Mouni' , 'yetenek' : 'dans' , 'ödül' : 2000 },

{ 'İD' : 153 , 'kişi' : 'Tulası' , 'yetenek' : 'okuma' , 'ödül' : 1200 }

]

# yukarıdaki verilerden beceri veri çerçevesini oluşturun

becerileri_df = linuxhint_spark_app.createDataFrame(beceriler)

# yaz.json()

skill_df.coalesce( 1 ).write.json( 'beceri_verisi' )

JSON Dosyası:

Skills_data klasörünün bölümlenmiş JSON verilerini içerdiğini görebiliriz.

JSON dosyasını açalım. PySpark DataFrame'deki tüm satırların JSON'a dönüştürüldüğünü görebiliriz.

PySpark DataFrame'de 5 satır vardır. Tüm bu 5 satır, virgülle ayrılmış dizelerin bir sözlüğü olarak döndürülür.

Örnek 2:

Tek satırlı bir 'skills2_df' PySpark DataFrame oluşturun. Modu 'ekleme' olarak belirterek önceki JSON dosyasına bir satır ekleyin.

pyspark'ı içe aktar

pandaları ithal et

pyspark.sql'den SparkSession'ı içe aktarın

linuxhint_spark_app = SparkSession.builder.appName( 'Linux İpucu' ).getOrCreate()

beceriler2 =[{ 'İD' : 78 , 'kişi' : 'Mary' , 'yetenek' : 'binme' , 'ödül' : 8960 }

]

# yukarıdaki verilerden beceri veri çerçevesini oluşturun

beceriler2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() ile ekleme modu.

skill2_df.write.mode( 'ekle' ).json( 'beceri_verisi' )

JSON Dosyası:

Bölümlenmiş JSON dosyalarını görebiliriz. İlk dosya, ilk DataFrame kayıtlarını tutar ve ikinci dosya, ikinci DataFrame kaydını tutar.

Çözüm

PySpark DataFrame'i JSON'a dönüştürmenin üç farklı yolu vardır. İlk olarak PySpark DataFrame'i Pandas DataFrame'e çevirerek JSON'a dönüşen to_json() yöntemini farklı parametreler göz önünde bulundurarak farklı örneklerle ele aldık. Ardından, toJSON() yöntemini kullandık. Son olarak, PySpark DataFrame'i JSON'a yazmak için write.json() işlevini nasıl kullanacağımızı öğrendik. Bu işlevle ekleme ve üzerine yazma mümkündür.