Python'da Gerçek Zamanlı Veri Akışı Nasıl Uygulanır?

Python Da Gercek Zamanli Veri Akisi Nasil Uygulanir



Python'da gerçek zamanlı veri akışının uygulanmasında uzmanlaşmak, günümüzün veri odaklı dünyasında önemli bir beceridir. Bu kılavuz, Python'da gerçek zamanlı veri akışını özgünlükle kullanmak için temel adımları ve temel araçları araştırıyor. Apache Kafka veya Apache Pulsar gibi uygun bir çerçeve seçmekten, zahmetsiz veri tüketimi, işleme ve etkili görselleştirme için Python kodu yazmaya kadar, çevik ve verimli gerçek zamanlı veri kanallarını oluşturmak için gerekli becerileri edineceğiz.

Örnek 1: Python'da Gerçek Zamanlı Veri Akışının Uygulanması

Python'da gerçek zamanlı veri akışının uygulanması günümüzün veri odaklı çağında ve dünyasında çok önemlidir. Bu ayrıntılı örnekte, Google Colab'da Apache Kafka ve Python kullanarak gerçek zamanlı bir veri akış sistemi oluşturma sürecini adım adım anlatacağız.







Örneği kodlamaya başlamadan önce başlatmak için Google Colab'da belirli bir ortam oluşturmak çok önemlidir. Yapmamız gereken ilk şey gerekli kütüphaneleri kurmak. Kafka entegrasyonu için “kafka-python” kütüphanesini kullanıyoruz.



! pip düzenlemek kafka-python


Bu komut, Python işlevlerini ve Apache Kafka için bağlamaları sağlayan “kafka-python” kitaplığını yükler. Daha sonra projemiz için gerekli kütüphaneleri import ediyoruz. “KafkaProducer” ve “KafkaConsumer” dahil olmak üzere gerekli kütüphanelerin içe aktarılması, “kafka-python” kütüphanesindeki Kafka brokerleri ile etkileşime girmemizi sağlayan sınıflardır. JSON, mesajları serileştirmek ve seri durumdan çıkarmak için kullandığımız JSON verileriyle çalışan Python kütüphanesidir.



kafka'dan Kafka'yı içe aktarYapımcı, KafkaTüketici
json'u içe aktar


Bir Kafka Yapımcısının Yaratılışı





Bu önemlidir çünkü bir Kafka üreticisi verileri bir Kafka konusuna gönderir. Örneğimizde, 'gerçek zamanlı konu' adı verilen bir konuya simüle edilmiş gerçek zamanlı veri gönderecek bir yapımcı oluşturuyoruz.

Kafka brokerinin adresini “localhost:9092” olarak belirten bir “KafkaProducer” örneği oluşturuyoruz. Daha sonra veriyi Kafka’ya göndermeden önce seri hale getiren “value_serializer” fonksiyonunu kullanıyoruz. Bizim durumumuzda bir lambda işlevi, verileri UTF-8 kodlu JSON olarak kodlar. Şimdi bazı gerçek zamanlı verileri simüle edip Kafka konusuna gönderelim.



yapımcı = KafkaYapımcısı ( bootstrap_servers = 'yerel ana makine:9092' ,
value_serializer =lambda v: json.dumps ( içinde ) .kodlamak ( 'utf-8' ) )
# Simüle edilmiş gerçek zamanlı veriler
veri = { 'sensör_kimliği' : 1 , 'sıcaklık' : 25.5 , 'nem' : 60.2 }
# Konuya veri gönderme
yapımcı.send ( 'gerçek zamanlı konu' , veri )


Bu satırlarda simüle edilmiş bir sensör verisini temsil eden bir “veri” sözlüğü tanımlıyoruz. Daha sonra bu verileri “gerçek zamanlı konuya” yayınlamak için “send” yöntemini kullanıyoruz.

Daha sonra bir Kafka tüketicisi oluşturmak istiyoruz ve bir Kafka tüketicisi, bir Kafka konusundaki verileri okuyor. “Gerçek zamanlı konu” içerisindeki mesajları tüketecek ve işleyecek bir tüketici yaratıyoruz. Tüketmek istediğimiz konuyu, örneğin (gerçek zamanlı konu) ve Kafka komisyoncusunun adresini belirten bir 'KafkaConsumer' örneği oluşturuyoruz. O halde “value_deserializer”, Kafka'dan alınan verileri seri durumdan çıkaran bir fonksiyondur. Bizim durumumuzda, bir lambda işlevi verilerin kodunu UTF-8 kodlu JSON olarak çözer.

tüketici = KafkaTüketici ( 'gerçek zamanlı konu' ,
bootstrap_servers = 'yerel ana makine:9092' ,
value_serializer =lambda x: json.loads ( x.kod çözme ( 'utf-8' ) ) )


Konudaki mesajları sürekli olarak tüketmek ve işlemek için yinelenen bir döngü kullanıyoruz.

# Gerçek zamanlı verileri okuma ve işleme
için İleti içinde tüketici:
veri = mesaj.değer
Yazdır ( F 'Alınan Veriler: {data}' )


Döngü içindeki her mesajın değerini ve simüle edilmiş sensör verilerimizi alıp konsola yazdırıyoruz. Kafka üreticisini ve tüketicisini çalıştırmak, bu kodun Google Colab'da çalıştırılmasını ve kod hücrelerinin ayrı ayrı yürütülmesini içerir. Üretici simüle edilmiş verileri Kafka konusuna gönderir ve tüketici alınan verileri okuyup yazdırır.


Kod Çalışırken Çıktının Analizi

Üretilen ve tüketilen gerçek zamanlı bir veriyi gözlemleyeceğiz. Veri formatı simülasyonumuza veya gerçek veri kaynağımıza bağlı olarak değişebilir. Bu ayrıntılı örnekte, Google Colab'da Apache Kafka ve Python kullanarak gerçek zamanlı bir veri akışı sistemi kurma sürecinin tamamını ele alıyoruz. Her kod satırını ve bu sistemin oluşturulmasındaki önemini açıklayacağız. Gerçek zamanlı veri akışı güçlü bir yetenektir ve bu örnek, daha karmaşık gerçek dünya uygulamaları için bir temel görevi görmektedir.

Örnek 2: Borsa Verilerini Kullanarak Python'da Gerçek Zamanlı Veri Akışını Uygulama

Farklı bir senaryo kullanarak Python'da gerçek zamanlı veri akışının uygulanmasına ilişkin benzersiz bir örnek daha yapalım; Bu sefer borsa verilerine odaklanacağız. Hisse senedi fiyat değişikliklerini yakalayan ve bunları Google Colab'da Apache Kafka ve Python kullanarak işleyen gerçek zamanlı bir veri akış sistemi oluşturuyoruz. Önceki örnekte gösterildiği gibi Google Colab'da ortamımızı yapılandırarak başlıyoruz. Öncelikle gerekli kütüphaneleri kuruyoruz:

! pip düzenlemek kafka-python yfinance


Buraya gerçek zamanlı borsa verisi almamızı sağlayan “yfinance” kütüphanesini ekliyoruz. Daha sonra gerekli kütüphaneleri import ediyoruz. Kafka etkileşimi için “kafka-python” kütüphanesinden “KafkaProducer” ve “KafkaConsumer” sınıflarını kullanmaya devam ediyoruz. JSON verileriyle çalışmak için JSON'u içe aktarıyoruz. Gerçek zamanlı borsa verilerini elde etmek için de “yfinance”ı kullanıyoruz. Ayrıca gerçek zamanlı güncellemeleri simüle etmek amacıyla bir zaman gecikmesi eklemek için 'zaman' kitaplığını da içe aktarıyoruz.

kafka'dan Kafka'yı içe aktarYapımcı, KafkaTüketici
json'u içe aktar
ithalat yfinance gibi yf
içe aktarmak zaman


Şimdi stok veriler için bir Kafka üreticisi oluşturuyoruz. Kafka yapımcımız gerçek zamanlı bir hisse senedi verisi alıyor ve bunu “hisse senedi fiyatı” isimli Kafka konusuna gönderiyor.

yapımcı = KafkaYapımcısı ( bootstrap_servers = 'yerel ana makine:9092' ,
value_serializer =lambda v: json.dumps ( içinde ) .kodlamak ( 'utf-8' ) )

sırasında Doğru:
hisse senedi = yf.Ticker ( 'AAPL' ) # Örnek: Apple Inc. hisse senedi
stok_verisi = stok.geçmiş ( dönem = '1 gün' )
son_fiyat = stok_verisi [ 'Kapalı' ] .iloc [ - 1 ]
veri = { 'sembol' : 'AAPL' , 'fiyat' : son fiyat }
yapımcı.send ( 'hisse senedi fiyatı' , veri )
Uyku zamanı ( 10 ) # Her 10 saniyede bir gerçek zamanlı güncellemeleri simüle edin


Bu kod içerisinde Kafka brokerinin adresi ile “KafkaProducer” örneğini oluşturuyoruz. Döngünün içinde, Apple Inc.'in ('AAPL') en son hisse senedi fiyatını almak için 'yfinance'ı kullanırız. Daha sonra son kapanış fiyatını çıkartıp “hisse senedi fiyatı” konusuna gönderiyoruz. Sonunda, her 10 saniyede bir gerçek zamanlı güncellemeleri simüle etmek için bir zaman gecikmesi sunuyoruz.

“Hisse senedi fiyatı” konusundan hisse senedi fiyatı verilerini okuyup işleyecek bir Kafka tüketicisi oluşturalım.

tüketici = KafkaTüketici ( 'hisse senedi fiyatı' ,
bootstrap_servers = 'yerel ana makine:9092' ,
value_serializer =lambda x: json.loads ( x.kod çözme ( 'utf-8' ) ) )

için İleti içinde tüketici:
stok_verisi = mesaj.değer
Yazdır ( F 'Alınan Hisse Senedi Verileri: {stock_data['symbol']} - Fiyat: {stock_data['price']}' )


Bu kod önceki örneğin tüketici kurulumuna benzer. Sürekli olarak “hisse senedi fiyatı” konusundan gelen mesajları okuyup işleyerek hisse senedi sembolünü ve fiyatını konsola yazdırır. Üreticiyi ve tüketiciyi çalıştırmak için kod hücrelerini sırayla, örneğin Google Colab'da tek tek çalıştırıyoruz. Üretici gerçek zamanlı hisse senedi fiyat güncellemelerini alıp gönderirken tüketici de bu verileri okuyup görüntüler.

! pip düzenlemek kafka-python yfinance
kafka'dan Kafka'yı içe aktarYapımcı, KafkaTüketici
json'u içe aktar
ithalat yfinance gibi yf
içe aktarmak zaman
yapımcı = KafkaYapımcısı ( bootstrap_servers = 'yerel ana makine:9092' ,
value_serializer =lambda v: json.dumps ( içinde ) .kodlamak ( 'utf-8' ) )

sırasında Doğru:
hisse senedi = yf.Ticker ( 'AAPL' ) # Apple Inc. hisse senedi
stok_verisi = stok.geçmiş ( dönem = '1 gün' )
son_fiyat = stok_verisi [ 'Kapalı' ] .iloc [ - 1 ]

veri = { 'sembol' : 'AAPL' , 'fiyat' : son fiyat }

yapımcı.send ( 'hisse senedi fiyatı' , veri )

Uyku zamanı ( 10 ) # Her 10 saniyede bir gerçek zamanlı güncellemeleri simüle edin
tüketici = KafkaTüketici ( 'hisse senedi fiyatı' ,
bootstrap_servers = 'yerel ana makine:9092' ,
value_serializer =lambda x: json.loads ( x.kod çözme ( 'utf-8' ) ) )

için İleti içinde tüketici:
stok_verisi = mesaj.değer
Yazdır ( F 'Alınan Hisse Senedi Verileri: {stock_data['symbol']} - Fiyat: {stock_data['price']}' )


Kod çalıştırıldıktan sonraki çıktının analizinde, Apple Inc.'in üretilip tüketildiği gerçek zamanlı hisse senedi fiyat güncellemelerini gözlemleyeceğiz.

Çözüm

Bu benzersiz örnekte, borsa verilerini yakalamak ve işlemek için Apache Kafka ve 'yfinance' kütüphanesini kullanarak Python'da gerçek zamanlı veri akışının uygulanmasını gösterdik. Kodun her satırını detaylı bir şekilde açıkladık. Gerçek zamanlı veri akışı, finans, Nesnelerin İnterneti ve daha birçok alanda gerçek dünya uygulamalarını oluşturmak için çeşitli alanlara uygulanabilir.