Comment lire et écrire des données de table dans PySpark

Comment Lire Et Ecrire Des Donnees De Table Dans Pyspark



Le traitement des données dans PySpark est plus rapide si les données sont chargées sous forme de tableau. Avec cela, en utilisant les expressions SQl, le traitement sera rapide. Ainsi, convertir le PySpark DataFrame/RDD en une table avant de l'envoyer pour traitement est la meilleure approche. Aujourd'hui, nous verrons comment lire les données de la table dans le PySpark DataFrame, écrire le PySpark DataFrame dans la table et insérer un nouveau DataFrame dans la table existante à l'aide des fonctions intégrées. Allons-y!

Pyspark.sql.DataFrameWriter.saveAsTable()

Tout d'abord, nous verrons comment écrire le DataFrame PySpark existant dans la table à l'aide de la fonction write.saveAsTable(). Il prend le nom de la table et d'autres paramètres facultatifs tels que modes, partionBy, etc., pour écrire le DataFrame dans la table. Il est stocké sous forme de fichier parquet.

Syntaxe:







dataframe_obj.write.saveAsTable(chemin/nom_table,mode,partitionBy,…)
  1. Table_name est le nom de la table créée à partir de dataframe_obj.
  2. Nous pouvons ajouter/écraser les données de la table en utilisant le paramètre mode.
  3. Le partitionBy prend les colonnes simples/multiples pour créer des partitions basées sur les valeurs de ces colonnes fournies.

Exemple 1:

Créez un DataFrame PySpark avec 5 lignes et 4 colonnes. Écrivez ce Dataframe dans une table nommée 'Agri_Table1'.



importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données agricoles avec 5 lignes et 5 colonnes

agricole =[{ 'Le type de sol' : 'Noir' , 'Irrigation_disponibilité' : 'Non' , 'Acres' : 2500 , 'Statut_du_sol' : 'Sec' ,
'Pays' : 'ETATS-UNIS' },

{ 'Le type de sol' : 'Noir' , 'Irrigation_disponibilité' : 'Oui' , 'Acres' : 3500 , 'Statut_du_sol' : 'Humide' ,
'Pays' : 'Inde' },

{ 'Le type de sol' : 'Rouge' , 'Irrigation_disponibilité' : 'Oui' , 'Acres' : 210 , 'Statut_du_sol' : 'Sec' ,
'Pays' : 'ROYAUME-UNI' },

{ 'Le type de sol' : 'Autre' , 'Irrigation_disponibilité' : 'Non' , 'Acres' : 1000 , 'Statut_du_sol' : 'Humide' ,
'Pays' : 'ETATS-UNIS' },

{ 'Le type de sol' : 'Sable' , 'Irrigation_disponibilité' : 'Non' , 'Acres' : 500 , 'Statut_du_sol' : 'Sec' ,
'Pays' : 'Inde' }]



# créer le dataframe à partir des données ci-dessus

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Écrivez le DataFrame ci-dessus dans la table.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Sortir:







Nous pouvons voir qu'un fichier parquet est créé avec les données PySpark précédentes.



Exemple 2 :

Considérez le DataFrame précédent et écrivez le « Agri_Table2 » dans la table en partitionnant les enregistrements en fonction des valeurs de la colonne « Pays ».

# Écrivez le DataFrame ci-dessus dans la table avec le paramètre partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Pays' ])

Sortir:

Il y a trois valeurs uniques dans la colonne 'Pays' : 'Inde', 'Royaume-Uni' et 'États-Unis'. Ainsi, trois partitions sont créées. Chaque partition contient les dossiers du parquet.

Pyspark.sql.DataFrameReader.table()

Chargeons la table dans le PySpark DataFrame à l'aide de la fonction spark.read.table(). Il ne prend qu'un seul paramètre qui est le chemin/nom de la table. Il charge directement la table dans le PySpark DataFrame et toutes les fonctions SQL qui sont appliquées au PySpark DataFrame peuvent également être appliquées sur ce DataFrame chargé.

Syntaxe:

spark_app.read.table(chemin/'nom_table')

Dans ce scénario, nous utilisons la table précédente qui a été créée à partir de PySpark DataFrame. Assurez-vous que vous devez implémenter les extraits de code du scénario précédent dans votre environnement.

Exemple:

Chargez la table 'Agri_Table1' dans le DataFrame nommé 'loaded_data'.

données_chargées = linuxhint_spark_app.read.table( 'Agri_Table1' )

données_chargées.show()

Sortir:

Nous pouvons voir que la table est chargée dans le PySpark DataFrame.

Exécution des requêtes SQL

Maintenant, nous exécutons des requêtes SQL sur le DataFrame chargé à l'aide de la fonction spark.sql().

# Utilisez la commande SELECT pour afficher toutes les colonnes du tableau ci-dessus.

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).montrer()

# Clause OÙ

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).montrer()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000 ' ).montrer()

Sortir:

  1. La première requête affiche toutes les colonnes et tous les enregistrements du DataFrame.
  2. La deuxième requête affiche les enregistrements basés sur la colonne 'Soil_status'. Il n'y a que trois enregistrements avec l'élément « Dry ».
  3. La dernière requête renvoie deux enregistrements avec 'Acres' supérieurs à 2 000.

Pyspark.sql.DataFrameWriter.insertInto()

En utilisant la fonction insertInto(), nous pouvons ajouter le DataFrame dans la table existante. Nous pouvons utiliser cette fonction avec le selectExpr () pour définir les noms de colonne, puis l'insérer dans la table. Cette fonction prend également le tableName comme paramètre.

Syntaxe:

DataFrame_obj.write.insertInto('Table_name')

Dans ce scénario, nous utilisons la table précédente qui a été créée à partir de PySpark DataFrame. Assurez-vous que vous devez implémenter les extraits de code du scénario précédent dans votre environnement.

Exemple:

Créez un nouveau DataFrame avec deux enregistrements et insérez-les dans la table 'Agri_Table1'.

importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données agricoles avec 2 lignes

agricole =[{ 'Le type de sol' : 'Sable' , 'Irrigation_disponibilité' : 'Non' , 'Acres' : 2500 , 'Statut_du_sol' : 'Sec' ,
'Pays' : 'ETATS-UNIS' },

{ 'Le type de sol' : 'Sable' , 'Irrigation_disponibilité' : 'Non' , 'Acres' : 1200 , 'Statut_du_sol' : 'Humide' ,
'Pays' : 'Japon' }]

# créer le dataframe à partir des données ci-dessus

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# écriture.insertInto()

agri_df2.selectExpr( 'Acres' , 'Pays' , « Irrigation_disponibilité » , 'Le type de sol' ,
'Statut_du_sol' ).write.insertInto( 'Agri_Table1' )

# Afficher l'Agri_Table1 final

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).montrer()

Sortir:

Maintenant, le nombre total de lignes présentes dans le DataFrame est de 7.

Conclusion

Vous comprenez maintenant comment écrire le DataFrame PySpark dans la table à l'aide de la fonction write.saveAsTable(). Il prend le nom de la table et d'autres paramètres facultatifs. Ensuite, nous avons chargé cette table dans le PySpark DataFrame à l'aide de la fonction spark.read.table(). Il ne prend qu'un seul paramètre qui est le chemin/nom de la table. Si vous souhaitez ajouter le nouveau DataFrame dans la table existante, utilisez la fonction insertInto().