Conversion de PySpark DataFrame en CSV

Conversion De Pyspark Dataframe En Csv



Examinons les quatre scénarios différents de conversion de PySpark DataFrame en CSV. Directement, nous utilisons la méthode write.csv() pour convertir le PySpark DataFrame en CSV. À l'aide de la fonction to_csv(), nous convertissons le DataFrame PySpark Pandas en CSV. Cela peut également être possible en le convertissant en tableau NumPy.

Sujet du contenu :

Si vous voulez en savoir plus sur PySpark DataFrame et l'installation du module, passez par ce article .







PySpark DataFrame en CSV en convertissant en Pandas DataFrame

Le to_csv() est une méthode disponible dans le module Pandas qui convertit le Pandas DataFrame en CSV. Tout d'abord, nous devons convertir notre PySpark DataFrame en Pandas DataFrame. La méthode toPandas() est utilisée pour cela. Voyons la syntaxe de to_csv() avec ses paramètres.



Syntaxe:



pandas_dataframe_obj.to_csv(chemin/ 'nom_fichier.csv' , entête ,index,colonnes,mode...)
  1. Nous devons spécifier le nom de fichier du fichier CSV. Si vous souhaitez stocker le fichier CSV téléchargé à un emplacement particulier sur votre PC, vous pouvez également spécifier le chemin avec le nom du fichier.
  2. Les colonnes sont incluses si l'en-tête est défini sur 'True'. Si vous n'avez pas besoin de colonnes, définissez l'en-tête sur 'False'.
  3. Les index sont spécifiés si l'index est défini sur 'True'. Si vous n'avez pas besoin d'index, définissez l'index sur 'False'.
  4. Le paramètre Columns prend une liste de noms de colonnes dans laquelle nous pouvons spécifier quelles colonnes particulières sont extraites dans le fichier CSV.
  5. Nous pouvons ajouter les enregistrements au CSV en utilisant le paramètre mode. Append - 'a' est utilisé pour ce faire.

Exemple 1 : avec les paramètres d'en-tête et d'index

Créez le DataFrame PySpark 'skills_df' avec 3 lignes et 4 colonnes. Convertissez ce DataFrame en CSV en le convertissant d'abord en Pandas DataFrame.





importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données de compétences avec 3 lignes et 4 colonnes

compétences =[{ 'identifiant' : 123 , 'personne' : 'Chéri' , 'compétence' : 'peinture' , 'prix' : 25000 },

{ 'identifiant' : 112 , 'personne' : 'Mouni' , 'compétence' : 'danse' , 'prix' : 2000 },

{ 'identifiant' : 153 , 'personne' : 'Tulasi' , 'compétence' : 'en lisant' , 'prix' : 1200 }

]

# créer le cadre de données des compétences à partir des données ci-dessus

skills_df = linuxhint_spark_app.createDataFrame(compétences)

skills_df.show()

# Convertir skills_df en pandas DataFrame

pandas_skills_df= skills_df.toPandas()

impression(pandas_skills_df)

# Convertissez ce DataFrame en csv avec en-tête et index

pandas_skills_df.to_csv( 'pandas_skills1.csv' , entête =Vrai, indice=Vrai)

Sortir:



Nous pouvons voir que le PySpark DataFrame est converti en Pandas DataFrame. Voyons s'il est converti en CSV avec les noms de colonnes et les indices :

Exemple 2 : Ajouter les données au CSV

Créez un autre PySpark DataFrame avec 1 enregistrement et ajoutez-le au CSV qui est créé dans le cadre de notre premier exemple. Assurez-vous que nous devons définir l'en-tête sur 'Faux' avec le paramètre de mode. Sinon, les noms de colonne sont également ajoutés sous forme de ligne.

importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

compétences =[{ 'identifiant' : 90 , 'personne' : 'Bhargav' , 'compétence' : 'en lisant' , 'prix' : 12000 }

]

# créer le cadre de données des compétences à partir des données ci-dessus

skills_df = linuxhint_spark_app.createDataFrame(compétences)

# Convertir skills_df en pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Ajoutez ce DataFrame au fichier pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , mode= 'un' , entête =Faux)

Sortie CSV :

Nous pouvons voir qu'une nouvelle ligne est ajoutée au fichier CSV.

Exemple 3 : Avec le paramètre Columns

Prenons le même DataFrame et convertissons-le en CSV avec deux colonnes : 'personne' et 'prix'.

importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données de compétences avec 3 lignes et 4 colonnes

compétences =[{ 'identifiant' : 123 , 'personne' : 'Chéri' , 'compétence' : 'peinture' , 'prix' : 25000 },

{ 'identifiant' : 112 , 'personne' : 'Mouni' , 'compétence' : 'danse' , 'prix' : 2000 },

{ 'identifiant' : 153 , 'personne' : 'Tulasi' , 'compétence' : 'en lisant' , 'prix' : 1200 }

]

# créer le cadre de données des compétences à partir des données ci-dessus

skills_df = linuxhint_spark_app.createDataFrame(compétences)

# Convertir skills_df en pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Convertir ce DataFrame en csv avec des colonnes spécifiques

pandas_skills_df.to_csv( 'pandas_skills2.csv' , colonnes=[ 'personne' , 'prix' ])

Sortie CSV :

Nous pouvons voir que seules les colonnes 'personne' et 'prix' existent dans le fichier CSV.

PySpark Pandas DataFrame vers CSV à l'aide de la méthode To_Csv()

Le to_csv() est une méthode disponible dans le module Pandas qui convertit le Pandas DataFrame en CSV. Tout d'abord, nous devons convertir notre PySpark DataFrame en Pandas DataFrame. La méthode toPandas() est utilisée pour cela. Voyons la syntaxe de to_csv() avec ses paramètres :

Syntaxe:

pyspark_pandas_dataframe_obj.to_csv(chemin/ 'nom_fichier.csv' , entête ,index,colonnes,...)
  1. Nous devons spécifier le nom de fichier du fichier CSV. Si vous souhaitez stocker le fichier CSV téléchargé à un emplacement particulier sur votre PC, vous pouvez également spécifier le chemin avec le nom du fichier.
  2. Les colonnes sont incluses si l'en-tête est défini sur 'True'. Si vous n'avez pas besoin de colonnes, définissez l'en-tête sur 'False'.
  3. Les index sont spécifiés si l'index est défini sur 'True'. Si vous n'avez pas besoin d'index, définissez l'index sur 'False'.
  4. Le paramètre columns prend une liste de noms de colonnes dans laquelle nous pouvons spécifier quelles colonnes particulières sont extraites dans le fichier CSV.

Exemple 1 : Avec le paramètre Columns

Créez un DataFrame PySpark Pandas avec 3 colonnes et convertissez-le en CSV en utilisant to_csv() avec les colonnes 'person' et 'prize'.

de pyspark importer des pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'identifiant' :[ 90 , 78 , 90 , 57 ], 'personne' :[ 'Chéri' , 'Mouni' , 'lui-même' , 'raha' ], 'prix' :[ 1 , 2 , 3 , 4 ]})

imprimer (pyspark_pandas_dataframe)

# Convertir ce DataFrame en csv avec des colonnes spécifiques

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , colonnes=[ 'personne' , 'prix' ])

Sortir:

Nous pouvons voir que le DataFrame PySpark Pandas est converti en CSV avec deux partitions. Chaque partition contient 2 enregistrements. De plus, les colonnes du CSV sont « personne » et « prix » uniquement.

Fichier de partition 1 :

Fichier de partition 2 :

Exemple 2 : avec le paramètre d'en-tête

Utilisez le DataFrame précédent et spécifiez le paramètre d'en-tête en le définissant sur 'True'.

de pyspark importer des pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'identifiant' :[ 90 , 78 , 90 , 57 ], 'personne' :[ 'Chéri' , 'Mouni' , 'lui-même' , 'raha' ], 'prix' :[ 1 , 2 , 3 , 4 ]})

# Convertissez ce DataFrame en csv avec en-tête.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , entête = Vrai)

Sortie CSV :

Nous pouvons voir que le DataFrame PySpark Pandas est converti en CSV avec deux partitions. Chaque partition contient 2 enregistrements avec des noms de colonne.

Fichier de partition 1 :

Fichier de partition 2 :

PySpark Pandas DataFrame en CSV en convertissant en tableau NumPy

Nous avons la possibilité de convertir le DataFrame PySpark Pandas en CSV en le convertissant dans le tableau Numpy. Le to_numpy() est une méthode disponible dans le module PySpark Pandas qui convertit le DataFrame PySpark Pandas en tableau NumPy.

Syntaxe:

pyspark_pandas_dataframe_obj.to_numpy()

Il ne prendra aucun paramètre.

Utilisation de la méthode Tofile()

Après la conversion en tableau NumPy, nous pouvons utiliser la méthode tofile() pour convertir NumPy en CSV. Ici, il stocke chaque enregistrement dans une nouvelle cellule en colonnes dans le fichier CSV.

Syntaxe:

array_obj.to_numpy(filename/path,sep=' ')

Il prend le nom de fichier ou le chemin d'un CSV et un séparateur.

Exemple:

Créez PySpark Pandas DataFrame avec 3 colonnes et 4 enregistrements et convertissez-le en CSV en le convertissant d'abord en un tableau NumPy.

de pyspark importer des pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'identifiant' :[ 90 , 78 , 90 , 57 ], 'personne' :[ 'Chéri' , 'Mouni' , 'lui-même' , 'raha' ], 'prix' :[ 1 , 2 , 3 , 4 ]})

# Convertir le DataFrame ci-dessus en tableau numpy

converti = pyspark_pandas_dataframe.to_numpy()

imprimer (converti)

# Utilisation de fichier()

converti.tofile( 'converti1.csv' , sep = ',' )

Sortir:

[[ 90 'Chéri' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'lui-même' 3 ]

[ 57 'raha' 4 ]]

Nous pouvons voir que le DataFrame PySpark Pandas est converti en un tableau NumPy (12 valeurs). Si vous pouvez voir les données CSV, il stocke chaque valeur de cellule dans une nouvelle colonne.

PySpark DataFrame vers CSV à l'aide de la méthode Write.Csv()

La méthode write.csv() prend le nom/chemin du fichier où nous devons enregistrer le fichier CSV en tant que paramètre.

Syntaxe:

dataframe_object.coalesce( 1 ).write.csv( 'nom de fichier' )

En fait, le CSV est enregistré sous forme de partitions (plus d'une). Afin de s'en débarrasser, nous fusionnons tous les fichiers CSV partitionnés en un seul. Dans ce scénario, nous utilisons la fonction coalesce(). Maintenant, nous ne pouvons voir qu'un seul fichier CSV avec toutes les lignes du PySpark DataFrame.

Exemple:

Considérez le PySpark DataFrame avec 4 enregistrements ayant 4 colonnes. Écrivez ce DataFrame au format CSV avec le fichier nommé 'market_details'.

importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données de marché avec 4 lignes et 4 colonnes

marché =[{ 'milieu' : 'mz-001' , 'm_name' : 'ABC' , 'm_ville' : 'Delhi' , 'm_état' : 'Delhi' },

{ 'milieu' : 'mz-002' , 'm_name' : 'XYZ' , 'm_ville' : 'patna' , 'm_état' : 'chance maintenant' },

{ 'milieu' : 'mz-003' , 'm_name' : 'PQR' , 'm_ville' : 'Floride' , 'm_état' : 'un' },

{ 'milieu' : 'mz-004' , 'm_name' : 'ABC' , 'm_ville' : 'Delhi' , 'm_état' : 'chance maintenant' }

]



# créer le dataframe du marché à partir des données ci-dessus

market_df = linuxhint_spark_app.createDataFrame(marché)

# Données réelles du marché

market_df.show()

# écriture.csv()

market_df.coalesce( 1 ).write.csv( 'détails_du_marché' )

Sortir:

Vérifions le fichier :

Ouvrez le dernier fichier pour voir les enregistrements.

Conclusion

Nous avons appris les quatre scénarios différents qui convertissent le PySpark DataFrame en CSV avec des exemples en considérant différents paramètres. Lorsque vous travaillez avec PySpark DataFrame, vous avez deux options pour convertir ce DataFrame en CSV : une méthode consiste à utiliser la méthode write() et une autre à utiliser la méthode to_csv() en convertissant en Pandas DataFrame. Si vous travaillez avec PySpark Pandas DataFrame, vous pouvez également utiliser to_csv() et tofile() en convertissant en tableau NumPy.