Conversion de PySpark DataFrame en JSON

Conversion De Pyspark Dataframe En Json



La transmission d'une donnée structurée à l'aide de JSON est possible et consomme également peu de mémoire. Comparé à PySpark RDD ou PySpark DataFrame, JSON consomme peu de mémoire et de sérialisation, ce qui est possible avec JSON. Nous sommes en mesure de convertir le PySpark DataFrame en JSON en utilisant la méthode pyspark.sql.DataFrameWriter.json(). En dehors de cela, il existe deux autres façons de convertir le DataFrame en JSON.

Sujet du contenu :

Considérons un simple PySpark DataFrame dans tous les exemples et convertissons-le en JSON à l'aide des fonctions mentionnées.







Module requis :

Installez la bibliothèque PySpark dans votre environnement si elle n'est pas encore installée. Vous pouvez vous référer à la commande suivante pour l'installer :



pip installer pyspark

PySpark DataFrame vers JSON en utilisant To_json() avec ToPandas()

La méthode to_json() est disponible dans le module Pandas qui convertit le Pandas DataFrame en JSON. Nous pouvons utiliser cette méthode si nous convertissons notre PySpark DataFrame en Pandas DataFrame. Afin de convertir le PySpark DataFrame en Pandas DataFrame, la méthode toPandas() est utilisée. Voyons la syntaxe de to_json() avec ses paramètres.



Syntaxe:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient est utilisé pour afficher le JSON converti au format souhaité. Il prend 'enregistrements', 'table', 'valeurs', 'colonnes', 'index', 'split'.
  2. Index est utilisé pour inclure/supprimer l'index de la chaîne JSON convertie. S'il est défini sur 'True', les indices sont affichés. Sinon, les indices ne seront pas affichés si l'orient est 'split' ou 'table'.

Exemple 1 : Orienter en tant que 'Records'

Créez un DataFrame PySpark 'skills_df' avec 3 lignes et 4 colonnes. Convertissez ce DataFrame en JSON en spécifiant le paramètre orient comme 'records'.

importer pyspark

importer des pandas

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données de compétences avec 3 lignes et 4 colonnes

compétences =[{ 'identifiant' : 123 , 'personne' : 'Chéri' , 'compétence' : 'peinture' , 'prix' : 25000 },

{ 'identifiant' : 112 , 'personne' : 'Mouni' , 'compétence' : 'danse' , 'prix' : 2000 },

{ 'identifiant' : 153 , 'personne' : 'Tulasi' , 'compétence' : 'en lisant' , 'prix' : 1200 }

]

# créer le cadre de données des compétences à partir des données ci-dessus

skills_df = linuxhint_spark_app.createDataFrame(compétences)

# Données réelles sur les compétences

skills_df.show()

# Convertir en JSON en utilisant to_json() avec orient comme 'records'

json_skills_data = skills_df.toPandas().to_json(orient= 'enregistrements' )

impression(json_skills_data)

Sortir:



+---+------+-----+--------+

| identifiant|personne|prix| compétence|

+---+------+-----+--------+

| 123 | Miel| 25000 |peinture|

| 112 | Mouni| 2000 | danse |

| 153 |Tulassi| 1200 | lecture |

+---+------+-----+--------+

[{ 'identifiant' : 123 , 'personne' : 'Chéri' , 'prix' : 25000 , 'compétence' : 'peinture' },{ 'identifiant' : 112 , 'personne' : 'Mouni' , 'prix' : 2000 , 'compétence' : 'danse' },{ 'identifiant' : 153 , 'personne' : 'Tulasi' , 'prix' : 1200 , 'compétence' : 'en lisant' }]

Nous pouvons voir que le PySpark DataFrame est converti en tableau JSON avec un dictionnaire de valeurs. Ici, les clés représentent le nom de la colonne et la valeur représente la valeur de la ligne/cellule dans le PySpark DataFrame.

Exemple 2 : Orienter en tant que 'Split'

Le format JSON renvoyé par l'orient 'split' inclut les noms de colonne qui ont une liste de colonnes, une liste d'index et une liste de données. Voici le format de l'orient 'split'.

# Convertir en JSON en utilisant to_json() avec orient comme 'split'

json_skills_data = skills_df.toPandas().to_json(orient= 'diviser' )

impression(json_skills_data)

Sortir:

{ 'Colonnes' :[ 'identifiant' , 'personne' , 'prix' , 'compétence' ], 'indice' :[ 0 , 1 , 2 ], 'données' :[[ 123 , 'Chéri' , 25000 , 'peinture' ],[ 112 , 'Mouni' , 2000 , 'danse' ],[ 153 , 'Tulasi' , 1200 , 'en lisant' ]]}

Exemple 3 : Orienter comme 'Index'

Ici, chaque ligne du PySpark DataFrame est retirée sous la forme d'un dictionnaire avec la clé comme nom de colonne. Pour chaque dictionnaire, la position d'index est spécifiée sous forme de clé.

# Convertir en JSON en utilisant to_json() avec orient comme 'index'

json_skills_data = skills_df.toPandas().to_json(orient= 'indice' )

impression(json_skills_data)

Sortir:

{ '0' :{ 'identifiant' : 123 , 'personne' : 'Chéri' , 'prix' : 25000 , 'compétence' : 'peinture' }, '1' :{ 'identifiant' : 112 , 'personne' : 'Mouni' , 'prix' : 2000 , 'compétence' : 'danse' }, '2' :{ 'identifiant' : 153 , 'personne' : 'Tulasi' , 'prix' : 1200 , 'compétence' : 'en lisant' }}

Exemple 4 : Orienter en tant que 'Colonnes'

Les colonnes sont la clé de chaque enregistrement. Chaque colonne contient un dictionnaire qui prend les valeurs de colonne avec des numéros d'index.

# Convertir en JSON en utilisant to_json() avec orient comme 'columns'

json_skills_data = skills_df.toPandas().to_json(orient= 'Colonnes' )

impression(json_skills_data)

Sortir:

{ 'identifiant' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'personne' :{ '0' : 'Chéri' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'prix' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'compétence' :{ '0' : 'peinture' , '1' : 'danse' , '2' : 'en lisant' }}

Exemple 5 : Orienter en tant que 'Valeurs'

Si vous n'avez besoin que des valeurs en JSON, vous pouvez opter pour l'orientation 'valeurs'. Il affiche chaque ligne dans une liste. Enfin, toutes les listes sont stockées dans une liste. Ce JSON est du type liste imbriquée.

# Convertir en JSON en utilisant to_json() avec orient comme 'values'

json_skills_data = skills_df.toPandas().to_json(orient= 'valeurs' )

impression(json_skills_data)

Sortir:

[[ 123 , 'Chéri' , 25000 , 'peinture' ],[ 112 , 'Mouni' , 2000 , 'danse' ],[ 153 , 'Tulasi' , 1200 , 'en lisant' ]]

Exemple 6 : Orienter comme 'Table'

L'orient 'table' renvoie le JSON qui inclut le schéma avec les noms de champ ainsi que les types de données de colonne, l'index en tant que clé primaire et la version de Pandas. Les noms de colonne avec des valeurs sont affichés en tant que 'données'.

# Convertir en JSON en utilisant to_json() avec orient comme 'table'

json_skills_data = skills_df.toPandas().to_json(orient= 'tableau' )

impression(json_skills_data)

Sortir:

{ 'schéma' :{ 'des champs' :[{ 'nom' : 'indice' , 'taper' : 'entier' },{ 'nom' : 'identifiant' , 'taper' : 'entier' },{ 'nom' : 'personne' , 'taper' : 'chaîne' },{ 'nom' : 'prix' , 'taper' : 'entier' },{ 'nom' : 'compétence' , 'taper' : 'chaîne' }], 'clé primaire' :[ 'indice' ], 'version_pandas' : '1.4.0' }, 'données' :[{ 'indice' : 0 , 'identifiant' : 123 , 'personne' : 'Chéri' , 'prix' : 25000 , 'compétence' : 'peinture' },{ 'indice' : 1 , 'identifiant' : 112 , 'personne' : 'Mouni' , 'prix' : 2000 , 'compétence' : 'danse' },{ 'indice' : 2 , 'identifiant' : 153 , 'personne' : 'Tulasi' , 'prix' : 1200 , 'compétence' : 'en lisant' }]}

Exemple 7 : avec paramètre d'index

Tout d'abord, nous passons le paramètre index en le définissant sur 'True'. Vous verrez pour chaque valeur de colonne que la position d'index est renvoyée sous forme de clé dans un dictionnaire.

Dans la deuxième sortie, seuls les noms de colonne (« colonnes ») et les enregistrements (« données ») sont renvoyés sans les positions d'index puisque l'index est défini sur « False ».

# Convertir en JSON en utilisant to_json() avec index=True

json_skills_data = skills_df.toPandas().to_json(index=True)

impression(json_skills_data, ' \n ' )

# Convertir en JSON en utilisant to_json() avec index=False

json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'diviser' )

impression(json_skills_data)

Sortir:

{ 'identifiant' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'personne' :{ '0' : 'Chéri' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'prix' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'compétence' :{ '0' : 'peinture' , '1' : 'danse' , '2' : 'en lisant' }}

{ 'Colonnes' :[ 'identifiant' , 'personne' , 'prix' , 'compétence' ], 'données' :[[ 123 , 'Chéri' , 25000 , 'peinture' ],[ 112 , 'Mouni' , 2000 , 'danse' ],[ 153 , 'Tulasi' , 1200 , 'en lisant' ]]

PySpark DataFrame en JSON à l'aide de ToJSON()

La méthode toJSON() est utilisée pour convertir le DataFrame PySpark en un objet JSON. Fondamentalement, il renvoie une chaîne JSON qui est entourée d'une liste. Le ['{colonne:valeur,…}',…. ] est le format renvoyé par cette fonction. Ici, chaque ligne du PySpark DataFrame est renvoyée sous forme de dictionnaire avec le nom de la colonne comme clé.

Syntaxe:

dataframe_object.toJSON()

Il peut être possible de transmettre des paramètres tels que l'index, les étiquettes de colonne et le type de données.

Exemple:

Créez un DataFrame PySpark 'skills_df' avec 5 lignes et 4 colonnes. Convertissez ce DataFrame en JSON à l'aide de la méthode toJSON().

importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données de compétences avec 5 lignes et 4 colonnes

compétences =[{ 'identifiant' : 123 , 'personne' : 'Chéri' , 'compétence' : 'peinture' , 'prix' : 25000 },

{ 'identifiant' : 112 , 'personne' : 'Mouni' , 'compétence' : 'musique/danse' , 'prix' : 2000 },

{ 'identifiant' : 153 , 'personne' : 'Tulasi' , 'compétence' : 'en lisant' , 'prix' : 1200 },

{ 'identifiant' : 173 , 'personne' : 'Couru' , 'compétence' : 'musique' , 'prix' : 2000 },

{ 'identifiant' : 43 , 'personne' : 'Kamala' , 'compétence' : 'en lisant' , 'prix' : 10000 }

]

# créer le cadre de données des compétences à partir des données ci-dessus

skills_df = linuxhint_spark_app.createDataFrame(compétences)

# Données réelles sur les compétences

skills_df.show()

# Convertir en tableau JSON

json_skills_data = skills_df.toJSON().collect()

impression(json_skills_data)

Sortir:

+---+------+-----+-----------+

| identifiant|personne|prix| compétence|

+---+------+-----+-----------+

| 123 | Miel| 25000 | peinture |

| 112 | Mouni| 2000 |musique/danse|

| 153 |Tulassi| 1200 | lecture |

| 173 | Couru| 2000 | musique |

| 43 |Kamala| 10000 | lecture |

+---+------+-----+-----------+

[ '{'id':123,'person':'Honey','prize':25000,'skill':'peinture'}' , '{'id':112,'person':'Mouni','prix':2000,'skill':'musique/danse'}' , '{'id':153,'person':'Tulasi','prize':1200,'skill':'lecture'}' , '{'id':173,'person':'Ran','prize':2000,'skill':'musique'}' , '{'id':43,'person':'Kamala','prize':10000,'skill':'lecture'}' ]

Il y a 5 lignes dans le PySpark DataFrame. Toutes ces 5 lignes sont renvoyées sous la forme d'un dictionnaire de chaînes séparées par des virgules.

PySpark DataFrame en JSON à l'aide de Write.json()

La méthode write.json() est disponible dans PySpark qui écrit/enregistre le PySpark DataFrame dans un fichier JSON. Il prend le nom/chemin du fichier comme paramètre. Fondamentalement, il renvoie le JSON dans plusieurs fichiers (fichiers partitionnés). Pour les fusionner tous dans un seul fichier, nous pouvons utiliser la méthode coalesce().

Syntaxe:

dataframe_object.coalesce( 1 ).write.json('nom_fichier')
  1. Mode d'ajout - dataframe_object.write.mode(‘append’).json(‘file_name’)
  2. Mode d'écrasement – dataframe_object.write.mode('overwrite').json('file_name')

Il peut être possible d'ajouter/de remplacer le JSON existant. En utilisant le write.mode (), nous pouvons ajouter les données en passant 'append' ou écraser les données JSON existantes en passant 'overwrite' à cette fonction.

Exemple 1:

Créez un DataFrame PySpark 'skills_df' avec 3 lignes et 4 colonnes. Écrivez ce DataFrame en JSON.

importer pyspark

importer des pandas

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

# données de compétences avec 3 lignes et 4 colonnes

compétences =[{ 'identifiant' : 123 , 'personne' : 'Chéri' , 'compétence' : 'peinture' , 'prix' : 25000 },

{ 'identifiant' : 112 , 'personne' : 'Mouni' , 'compétence' : 'danse' , 'prix' : 2000 },

{ 'identifiant' : 153 , 'personne' : 'Tulasi' , 'compétence' : 'en lisant' , 'prix' : 1200 }

]

# créer le cadre de données des compétences à partir des données ci-dessus

skills_df = linuxhint_spark_app.createDataFrame(compétences)

# écriture.json()

compétences_df.coalesce( 1 ).write.json( 'données_compétences' )

Fichier JSON :

Nous pouvons voir que le dossier skills_data inclut les données JSON partitionnées.

Ouvrons le fichier JSON. Nous pouvons voir que toutes les lignes du PySpark DataFrame sont converties en JSON.

Il y a 5 lignes dans le PySpark DataFrame. Toutes ces 5 lignes sont renvoyées sous la forme d'un dictionnaire de chaînes séparées par des virgules.

Exemple 2 :

Créez un DataFrame PySpark 'skills2_df' avec une ligne. Ajoutez une ligne au fichier JSON précédent en spécifiant le mode 'append'.

importer pyspark

importer des pandas

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

compétences2 =[{ 'identifiant' : 78 , 'personne' : 'Marie' , 'compétence' : 'équitation' , 'prix' : 8960 }

]

# créer le cadre de données des compétences à partir des données ci-dessus

skills2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() avec le mode ajout.

skills2_df.write.mode( 'ajouter' ).json( 'données_compétences' )

Fichier JSON :

Nous pouvons voir les fichiers JSON partitionnés. Le premier fichier contient les premiers enregistrements DataFrame et le deuxième fichier contient le deuxième enregistrement DataFrame.

Conclusion

Il existe trois façons différentes de convertir le DataFrame PySpark en JSON. Tout d'abord, nous avons discuté de la méthode to_json() qui convertit en JSON en convertissant le PySpark DataFrame en Pandas DataFrame avec différents exemples en considérant différents paramètres. Ensuite, nous avons utilisé la méthode toJSON(). Enfin, nous avons appris à utiliser la fonction write.json() pour écrire le PySpark DataFrame en JSON. L'ajout et l'écrasement sont possibles avec cette fonction.