Comment implémenter le streaming de données en temps réel en Python

Comment Implementer Le Streaming De Donnees En Temps Reel En Python



Maîtriser la mise en œuvre du streaming de données en temps réel dans Python constitue une compétence essentielle dans le monde actuel des données. Ce guide explore les étapes principales et les outils essentiels pour utiliser le streaming de données en temps réel avec authenticité en Python. De la sélection d'un framework approprié comme Apache Kafka ou Apache Pulsar à l'écriture d'un code Python pour une consommation, un traitement et une visualisation efficace des données sans effort, nous acquerrons les compétences nécessaires pour construire des canaux de données en temps réel agiles et efficaces.

Exemple 1 : implémentation du streaming de données en temps réel en Python

La mise en œuvre d’un streaming de données en temps réel dans Python est cruciale à l’ère et dans le monde d’aujourd’hui axés sur les données. Dans cet exemple détaillé, nous allons parcourir le processus de création d'un système de streaming de données en temps réel à l'aide d'Apache Kafka et Python dans Google Colab.







Pour initialiser l'exemple avant de commencer le codage, la création d'un environnement spécifique dans Google Colab est essentielle. La première chose à faire est d’installer les bibliothèques nécessaires. Nous utilisons la bibliothèque « kafka-python » pour l'intégration de Kafka.



! pépin installer kafka-python


Cette commande installe la bibliothèque « kafka-python » qui fournit les fonctions Python et les liaisons pour Apache Kafka. Ensuite, nous importons les bibliothèques requises pour notre projet. L'importation des bibliothèques requises, notamment « KafkaProducer » et « KafkaConsumer », sont les classes de la bibliothèque « kafka-python » qui nous permettent d'interagir avec les courtiers Kafka. JSON est la bibliothèque Python permettant de travailler avec les données JSON que nous utilisons pour sérialiser et désérialiser les messages.



à partir de l'importation de kafka KafkaProducer, KafkaConsumer
importer json


Création d'un producteur Kafka





Ceci est important car un producteur Kafka envoie les données à un sujet Kafka. Dans notre exemple, nous créons un producteur pour envoyer des données simulées en temps réel à un sujet appelé « sujet en temps réel ».

Nous créons une instance « KafkaProducer » qui spécifie l'adresse du courtier Kafka comme « localhost:9092 ». Ensuite, nous utilisons le « value_serializer », une fonction qui sérialise les données avant de les envoyer à Kafka. Dans notre cas, une fonction lambda code les données au format JSON codé en UTF-8. Maintenant, simulons quelques données en temps réel et envoyons-les au sujet Kafka.



producteur = KafkaProducteur ( serveurs_amorçage = 'hôte local : 9092' ,
valeur_sérialiseur =lambda v : json.dumps ( dans ) .encoder ( 'utf-8' ) )
# Données simulées en temps réel
données = { 'identifiant_capteur' : 1 , 'température' : 25,5 , 'humidité' : 60,2 }
# Envoi de données au sujet
producteur.envoyer ( 'sujet en temps réel' , données )


Dans ces lignes, nous définissons un dictionnaire de « données » qui représente les données d'un capteur simulé. Nous utilisons ensuite la méthode « envoyer » pour publier ces données sur le « sujet en temps réel ».

Ensuite, nous voulons créer un consommateur Kafka, et un consommateur Kafka lit les données d'un sujet Kafka. Nous créons un consommateur pour consommer et traiter les messages dans le « sujet en temps réel ». Nous créons une instance « KafkaConsumer », spécifiant le sujet que nous souhaitons consommer, par exemple (sujet en temps réel) et l'adresse du courtier Kafka. Ensuite, le « value_deserializer » est une fonction qui désérialise les données reçues de Kafka. Dans notre cas, une fonction lambda décode les données au format JSON codé en UTF-8.

consommateur = KafkaConsommateur ( 'sujet en temps réel' ,
serveurs_amorçage = 'hôte local : 9092' ,
value_deserializer =lambda x : json.loads ( x.decode ( 'utf-8' ) ) )


Nous utilisons une boucle itérative pour consommer et traiter en continu les messages du sujet.

# Lecture et traitement des données en temps réel
pour message dans consommateur:
données = message.valeur
imprimer ( F 'Données reçues : {data}' )


Nous récupérons la valeur de chaque message et les données de nos capteurs simulés à l'intérieur de la boucle et les imprimons sur la console. L'exécution du producteur et du consommateur Kafka implique l'exécution de ce code dans Google Colab et l'exécution des cellules de code individuellement. Le producteur envoie les données simulées au sujet Kafka, et le consommateur lit et imprime les données reçues.


Analyse de la sortie pendant l'exécution du code

Nous observerons en temps réel les données produites et consommées. Le format des données peut varier en fonction de notre simulation ou de la source de données réelle. Dans cet exemple détaillé, nous couvrons l'ensemble du processus de configuration d'un système de streaming de données en temps réel à l'aide d'Apache Kafka et Python dans Google Colab. Nous expliquerons chaque ligne de code et sa signification dans la construction de ce système. Le streaming de données en temps réel est une fonctionnalité puissante, et cet exemple sert de base à des applications réelles plus complexes.

Exemple 2 : Implémentation d'un streaming de données en temps réel en Python à l'aide de données boursières

Faisons un autre exemple unique d'implémentation d'un streaming de données en temps réel en Python en utilisant un scénario différent ; cette fois, nous nous concentrerons sur les données boursières. Nous créons un système de streaming de données en temps réel qui capture les variations du cours des actions et les traite à l'aide d'Apache Kafka et Python dans Google Colab. Comme démontré dans l'exemple précédent, nous commençons par configurer notre environnement dans Google Colab. Tout d'abord, nous installons les bibliothèques requises :

! pépin installer kafka-python yfinance


Ici, nous ajoutons la bibliothèque « yfinance » qui nous permet d'obtenir une donnée boursière en temps réel. Ensuite, nous importons les bibliothèques nécessaires. Nous continuons à utiliser les classes « KafkaProducer » et « KafkaConsumer » de la bibliothèque « kafka-python » pour l'interaction Kafka. Nous importons JSON pour travailler avec les données JSON. Nous utilisons également « yfinance » pour obtenir des données boursières en temps réel. Nous importons également la bibliothèque « time » pour ajouter un délai afin de simuler les mises à jour en temps réel.

à partir de l'importation de kafka KafkaProducer, KafkaConsumer
importer json
importer du financement comme ouf
importer temps


Maintenant, nous créons un producteur Kafka pour les données boursières. Notre producteur Kafka obtient des données boursières en temps réel et les envoie à un sujet Kafka nommé « cours des actions ».

producteur = KafkaProducteur ( serveurs_amorçage = 'hôte local : 9092' ,
valeur_sérialiseur =lambda v : json.dumps ( dans ) .encoder ( 'utf-8' ) )

alors que Vrai:
stock = yf.Ticker ( 'AAPL' ) # Exemple : actions Apple Inc.
stock_data = stock.historique ( période = '1j' )
dernier_prix = stock_data [ 'Fermer' ] .iloc [ - 1 ]
données = { 'symbole' : 'AAPL' , 'prix' : dernier prix }
producteur.envoyer ( 'prix de l'action' , données )
le sommeil de temps ( dix ) # Simulez des mises à jour en temps réel toutes les 10 secondes


Nous créons une instance « KafkaProducer » avec l'adresse du courtier Kafka dans ce code. Dans la boucle, nous utilisons « yfinance » pour obtenir le dernier cours de l’action Apple Inc. (« AAPL »). Ensuite, nous extrayons le dernier cours de clôture et l'envoyons au sujet « cours de l'action ». Finalement, nous introduisons un délai pour simuler les mises à jour en temps réel toutes les 10 secondes.

Créons un consommateur Kafka pour lire et traiter les données boursières du sujet « cours des actions ».

consommateur = KafkaConsommateur ( 'prix de l'action' ,
serveurs_amorçage = 'hôte local : 9092' ,
value_deserializer =lambda x : json.loads ( x.decode ( 'utf-8' ) ) )

pour message dans consommateur:
stock_data = message.valeur
imprimer ( F 'Données boursières reçues : {stock_data['symbol']} - Prix : {stock_data['price']}' )


Ce code est similaire à la configuration consommateur de l’exemple précédent. Il lit et traite en permanence les messages du sujet « cours de l'action » et imprime le symbole boursier et le prix sur la console. Nous exécutons les cellules de code de manière séquentielle, par exemple une par une dans Google Colab pour exécuter le producteur et le consommateur. Le producteur reçoit et envoie les mises à jour du cours des actions en temps réel pendant que le consommateur lit et affiche ces données.

! pépin installer kafka-python yfinance
à partir de l'importation de kafka KafkaProducer, KafkaConsumer
importer json
importer du financement comme ouf
importer temps
producteur = KafkaProducteur ( serveurs_amorçage = 'hôte local : 9092' ,
valeur_sérialiseur =lambda v : json.dumps ( dans ) .encoder ( 'utf-8' ) )

alors que Vrai:
stock = yf.Ticker ( 'AAPL' ) # d'actions Apple Inc.
stock_data = stock.historique ( période = '1j' )
dernier_prix = stock_data [ 'Fermer' ] .iloc [ - 1 ]

données = { 'symbole' : 'AAPL' , 'prix' : dernier prix }

producteur.envoyer ( 'prix de l'action' , données )

le sommeil de temps ( dix ) # Simulez des mises à jour en temps réel toutes les 10 secondes
consommateur = KafkaConsommateur ( 'prix de l'action' ,
serveurs_amorçage = 'hôte local : 9092' ,
value_deserializer =lambda x : json.loads ( x.decode ( 'utf-8' ) ) )

pour message dans consommateur:
stock_data = message.valeur
imprimer ( F 'Données boursières reçues : {stock_data['symbol']} - Prix : {stock_data['price']}' )


Dans l'analyse du résultat après l'exécution du code, nous observerons les mises à jour du cours des actions en temps réel d'Apple Inc. produites et consommées.

Conclusion

Dans cet exemple unique, nous avons démontré la mise en œuvre du streaming de données en temps réel en Python à l'aide d'Apache Kafka et de la bibliothèque « yfinance » pour capturer et traiter les données boursières. Nous avons expliqué en détail chaque ligne du code. Le streaming de données en temps réel peut être appliqué à divers domaines pour créer des applications réelles dans les domaines de la finance, de l'IoT, etc.