PySpark Read.Parquet()

Pyspark Read Parquet



Dans PySpark, la fonction write.parquet() écrit le DataFrame dans le fichier parquet et read.parquet() lit le fichier parquet dans le PySpark DataFrame ou tout autre DataSource. Pour traiter les colonnes dans Apache Spark rapidement et efficacement, nous devons compresser les données. La compression des données économise notre mémoire et toutes les colonnes sont converties en niveau plat. Cela signifie que le stockage au niveau de la colonne plate existe. Le fichier qui les stocke est connu sous le nom de fichier PARQUET.

Dans ce guide, nous nous concentrerons principalement sur la lecture/le chargement du fichier parquet dans le PySpark DataFrame/SQL à l'aide de la fonction read.parquet() qui est disponible dans la classe pyspark.sql.DataFrameReader.

Sujet du contenu :







Obtenir le fichier parquet



Lire le fichier Parquet dans le DataFrame PySpark



Lire le fichier Parquet dans PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Cette fonction est utilisée pour lire le fichier parquet et le charger dans le PySpark DataFrame. Il prend le chemin/nom de fichier du fichier parquet. Nous pouvons simplement utiliser la fonction read.parquet() puisqu'il s'agit de la fonction générique.

Syntaxe:



Voyons la syntaxe de read.parquet() :

spark_app.read.parquet(file_name.parquet/path)

Commencez par installer le module PySpark à l'aide de la commande pip :

pip installer pyspark

Obtenir le fichier parquet

Pour lire un fichier parquet, vous avez besoin des données dans lesquelles le fichier parquet est généré à partir de ces données. Dans cette partie, nous verrons comment générer un fichier parquet à partir du PySpark DataFrame.

Créons un DataFrame PySpark avec 5 enregistrements et écrivons-le dans le fichier parquet 'industry_parquet'.

importer pyspark

depuis pyspark.sql importer SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# créer la base de données qui stocke les détails de l'industrie

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Agriculture' ,Zone= 'ETATS-UNIS' ,
Note= 'Chaud' ,Total_employés= 100 ),

Ligne(Type= 'Agriculture' ,Zone= 'Inde' ,Note= 'Chaud' ,Total_employés= 200 ),

Ligne(Type= 'Développement' ,Zone= 'ETATS-UNIS' ,Note= 'Chaud' ,Total_employés= 100 ),

Ligne(Type= 'Éducation' ,Zone= 'ETATS-UNIS' ,Note= 'Cool' ,Total_employés= 400 ),

Ligne(Type= 'Éducation' ,Zone= 'ETATS-UNIS' ,Note= 'Chaud' ,Total_employés= vingt )

])

# Frame de données réelle

industrie_df.show()

# Écrivez le fichier industry_df dans le fichier parquet

industrie_df.coalesce( 1 ).écrire.parquet( 'industrie_parquet' )

Sortir:

C'est le DataFrame qui contient 5 enregistrements.

Un fichier parquet est créé pour le DataFrame précédent. Ici, notre nom de fichier avec une extension est 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. Nous utilisons ce fichier dans tout le didacticiel.

Lire le fichier Parquet dans le DataFrame PySpark

Nous avons le dossier parquet. Lisons ce fichier à l'aide de la fonction read.parquet() et chargeons-le dans le PySpark DataFrame.

importer pyspark

depuis pyspark.sql importer SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# Lit le fichier parquet dans l'objet dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Afficher le dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Sortir:

Nous affichons le DataFrame en utilisant la méthode show() qui a été créée à partir du fichier parquet.

Requêtes SQL avec le fichier Parquet

Après le chargement dans le DataFrame, il peut être possible de créer les tables SQL et d'afficher les données présentes dans le DataFrame. Nous devons créer une VUE TEMPORAIRE et utiliser les commandes SQL pour renvoyer les enregistrements du DataFrame créé à partir du fichier parquet.

Exemple 1:

Créez une vue temporaire nommée 'Sectors' et utilisez la commande SELECT pour afficher les enregistrements dans le DataFrame. Vous pouvez vous référer à ceci Didacticiel qui explique comment créer une VIEW dans Spark – SQL.

importer pyspark

depuis pyspark.sql importer SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# Lit le fichier parquet dans l'objet dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Créer une vue à partir du fichier de parquet ci-dessus nommé - 'Sectors'

dataframe_from_parquet.createOrReplaceTempView( 'Secteurs' )

# Requête pour afficher tous les enregistrements des secteurs

linuxhint_spark_app.sql( 'sélectionnez * parmi les secteurs' ).montrer()

Sortir:

Exemple 2 :

En utilisant la VUE précédente, écrivez la requête SQL :

  1. Pour afficher tous les enregistrements des secteurs qui appartiennent à 'Inde'.
  2. Pour afficher tous les enregistrements des secteurs avec un employé supérieur à 100.
# Requête pour afficher tous les enregistrements des secteurs appartenant à 'Inde'.

linuxhint_spark_app.sql( 'select * from Sectors where Area='India'' ).montrer()

# Requête pour afficher tous les enregistrements des secteurs avec un employé supérieur à 100

linuxhint_spark_app.sql( 'select * from Sectors where Total_employees>100' ).montrer()

Sortir:

Il n'y a qu'un seul enregistrement avec une zone qui est 'Inde' et deux enregistrements avec des employés supérieurs à 100.

Lire le fichier Parquet dans PySpark SQL

Tout d'abord, nous devons créer une VUE à l'aide de la commande CREATE. En utilisant le mot-clé 'path' dans la requête SQL, nous pouvons lire le fichier parquet dans Spark SQL. Après le chemin, nous devons spécifier le nom de fichier/l'emplacement du fichier.

Syntaxe:

spark_app.sql( 'CREATE TEMPORARY VIEW view_name USING parquet OPTIONS (path ' nom_fichier.parquet ')' )

Exemple 1:

Créez une vue temporaire nommée 'Secteur2' et lisez-y le fichier parquet. À l'aide de la fonction sql(), écrivez la requête de sélection pour afficher tous les enregistrements présents dans la vue.

importer pyspark

depuis pyspark.sql importer SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# Lire le fichier parquet dans Spark-SQL

linuxhint_spark_app.sql( 'CRÉER UNE VUE TEMPORAIRE Secteur2 À L'AIDE DES OPTIONS parquet (chemin ' partie-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Requête pour afficher tous les enregistrements du Secteur2

linuxhint_spark_app.sql( 'sélectionner * du secteur2' ).montrer()

Sortir:

Exemple 2 :

Utilisez la VUE précédente et écrivez la requête pour afficher tous les enregistrements avec la note « Chaud » ou « Froid ».

# Requête pour afficher tous les enregistrements du Sector2 avec Rating- Hot ou Cool.

linuxhint_spark_app.sql( 'select * from Sector2 where Rating='Hot' OR Rating='Cool'' ).montrer()

Sortir:

Il y a trois enregistrements avec la cote 'Hot' ou 'Cool'.

Conclusion

Dans PySpark, la fonction write.parquet() écrit le DataFrame dans le fichier parquet. La fonction read.parquet() lit le fichier parquet dans le PySpark DataFrame ou tout autre DataSource. Nous avons appris à lire le fichier parquet dans le PySpark DataFrame et dans la table PySpark. Dans le cadre de ce didacticiel, nous avons également expliqué comment créer les tables à partir de PySpark DataFrame et filtrer les données à l'aide de la clause WHERE.