Dans ce guide, nous nous concentrerons principalement sur la lecture/le chargement du fichier parquet dans le PySpark DataFrame/SQL à l'aide de la fonction read.parquet() qui est disponible dans la classe pyspark.sql.DataFrameReader.
Sujet du contenu :
Lire le fichier Parquet dans le DataFrame PySpark
Lire le fichier Parquet dans PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Cette fonction est utilisée pour lire le fichier parquet et le charger dans le PySpark DataFrame. Il prend le chemin/nom de fichier du fichier parquet. Nous pouvons simplement utiliser la fonction read.parquet() puisqu'il s'agit de la fonction générique.
Syntaxe:
Voyons la syntaxe de read.parquet() :
spark_app.read.parquet(file_name.parquet/path)Commencez par installer le module PySpark à l'aide de la commande pip :
pip installer pyspark
Obtenir le fichier parquet
Pour lire un fichier parquet, vous avez besoin des données dans lesquelles le fichier parquet est généré à partir de ces données. Dans cette partie, nous verrons comment générer un fichier parquet à partir du PySpark DataFrame.
Créons un DataFrame PySpark avec 5 enregistrements et écrivons-le dans le fichier parquet 'industry_parquet'.
importer pysparkdepuis pyspark.sql importer SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()
# créer la base de données qui stocke les détails de l'industrie
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Agriculture' ,Zone= 'ETATS-UNIS' ,
Note= 'Chaud' ,Total_employés= 100 ),
Ligne(Type= 'Agriculture' ,Zone= 'Inde' ,Note= 'Chaud' ,Total_employés= 200 ),
Ligne(Type= 'Développement' ,Zone= 'ETATS-UNIS' ,Note= 'Chaud' ,Total_employés= 100 ),
Ligne(Type= 'Éducation' ,Zone= 'ETATS-UNIS' ,Note= 'Cool' ,Total_employés= 400 ),
Ligne(Type= 'Éducation' ,Zone= 'ETATS-UNIS' ,Note= 'Chaud' ,Total_employés= vingt )
])
# Frame de données réelle
industrie_df.show()
# Écrivez le fichier industry_df dans le fichier parquet
industrie_df.coalesce( 1 ).écrire.parquet( 'industrie_parquet' )
Sortir:
C'est le DataFrame qui contient 5 enregistrements.
Un fichier parquet est créé pour le DataFrame précédent. Ici, notre nom de fichier avec une extension est 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. Nous utilisons ce fichier dans tout le didacticiel.
Lire le fichier Parquet dans le DataFrame PySpark
Nous avons le dossier parquet. Lisons ce fichier à l'aide de la fonction read.parquet() et chargeons-le dans le PySpark DataFrame.
importer pysparkdepuis pyspark.sql importer SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()
# Lit le fichier parquet dans l'objet dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Afficher le dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Sortir:
Nous affichons le DataFrame en utilisant la méthode show() qui a été créée à partir du fichier parquet.
Requêtes SQL avec le fichier Parquet
Après le chargement dans le DataFrame, il peut être possible de créer les tables SQL et d'afficher les données présentes dans le DataFrame. Nous devons créer une VUE TEMPORAIRE et utiliser les commandes SQL pour renvoyer les enregistrements du DataFrame créé à partir du fichier parquet.
Exemple 1:
Créez une vue temporaire nommée 'Sectors' et utilisez la commande SELECT pour afficher les enregistrements dans le DataFrame. Vous pouvez vous référer à ceci Didacticiel qui explique comment créer une VIEW dans Spark – SQL.
importer pysparkdepuis pyspark.sql importer SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()
# Lit le fichier parquet dans l'objet dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Créer une vue à partir du fichier de parquet ci-dessus nommé - 'Sectors'
dataframe_from_parquet.createOrReplaceTempView( 'Secteurs' )
# Requête pour afficher tous les enregistrements des secteurs
linuxhint_spark_app.sql( 'sélectionnez * parmi les secteurs' ).montrer()
Sortir:
Exemple 2 :
En utilisant la VUE précédente, écrivez la requête SQL :
- Pour afficher tous les enregistrements des secteurs qui appartiennent à 'Inde'.
- Pour afficher tous les enregistrements des secteurs avec un employé supérieur à 100.
linuxhint_spark_app.sql( 'select * from Sectors where Area='India'' ).montrer()
# Requête pour afficher tous les enregistrements des secteurs avec un employé supérieur à 100
linuxhint_spark_app.sql( 'select * from Sectors where Total_employees>100' ).montrer()
Sortir:
Il n'y a qu'un seul enregistrement avec une zone qui est 'Inde' et deux enregistrements avec des employés supérieurs à 100.
Lire le fichier Parquet dans PySpark SQL
Tout d'abord, nous devons créer une VUE à l'aide de la commande CREATE. En utilisant le mot-clé 'path' dans la requête SQL, nous pouvons lire le fichier parquet dans Spark SQL. Après le chemin, nous devons spécifier le nom de fichier/l'emplacement du fichier.
Syntaxe:
spark_app.sql( 'CREATE TEMPORARY VIEW view_name USING parquet OPTIONS (path ' nom_fichier.parquet ')' )Exemple 1:
Créez une vue temporaire nommée 'Secteur2' et lisez-y le fichier parquet. À l'aide de la fonction sql(), écrivez la requête de sélection pour afficher tous les enregistrements présents dans la vue.
importer pysparkdepuis pyspark.sql importer SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()
# Lire le fichier parquet dans Spark-SQL
linuxhint_spark_app.sql( 'CRÉER UNE VUE TEMPORAIRE Secteur2 À L'AIDE DES OPTIONS parquet (chemin ' partie-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Requête pour afficher tous les enregistrements du Secteur2
linuxhint_spark_app.sql( 'sélectionner * du secteur2' ).montrer()
Sortir:
Exemple 2 :
Utilisez la VUE précédente et écrivez la requête pour afficher tous les enregistrements avec la note « Chaud » ou « Froid ».
# Requête pour afficher tous les enregistrements du Sector2 avec Rating- Hot ou Cool.linuxhint_spark_app.sql( 'select * from Sector2 where Rating='Hot' OR Rating='Cool'' ).montrer()
Sortir:
Il y a trois enregistrements avec la cote 'Hot' ou 'Cool'.
Conclusion
Dans PySpark, la fonction write.parquet() écrit le DataFrame dans le fichier parquet. La fonction read.parquet() lit le fichier parquet dans le PySpark DataFrame ou tout autre DataSource. Nous avons appris à lire le fichier parquet dans le PySpark DataFrame et dans la table PySpark. Dans le cadre de ce didacticiel, nous avons également expliqué comment créer les tables à partir de PySpark DataFrame et filtrer les données à l'aide de la clause WHERE.