PySpark Pandas_Udf()

Pyspark Pandas Udf



La transformation du PySpark DataFrame est possible à l'aide de la fonction pandas_udf(). Il s'agit d'une fonction définie par l'utilisateur qui est appliquée sur le PySpark DataFrame avec une flèche. Nous pouvons effectuer les opérations vectorisées à l'aide de pandas_udf(). Il peut être implémenté en passant cette fonction en tant que décorateur. Plongeons-nous dans ce guide pour connaître la syntaxe, les paramètres et différents exemples.

Sujet du contenu :

Si vous voulez en savoir plus sur l'installation de PySpark DataFrame et du module, passez par ce article .







Pyspark.sql.functions.pandas_udf()

Le pandas_udf () est disponible dans le module sql.functions de PySpark qui peut être importé à l'aide du mot-clé 'from'. Il est utilisé pour effectuer les opérations vectorisées sur notre PySpark DataFrame. Cette fonction est implémentée comme un décorateur en passant trois paramètres. Après cela, nous pouvons créer une fonction définie par l'utilisateur qui renvoie les données au format vectoriel (comme nous utilisons series/NumPy pour cela) à l'aide d'une flèche. Dans cette fonction, nous sommes en mesure de retourner le résultat.



Structure et syntaxe :



Examinons d'abord la structure et la syntaxe de cette fonction :

@pandas_udf(type de données)
def nom_fonction(opération) -> convert_format :
déclaration de retour

Ici, le nom_fonction est le nom de notre fonction définie. Le type de données spécifie le type de données renvoyé par cette fonction. Nous pouvons retourner le résultat en utilisant le mot-clé 'return'. Toutes les opérations sont effectuées à l'intérieur de la fonction avec l'affectation des flèches.





Pandas_udf (Fonction et Type de retour)

  1. Le premier paramètre est la fonction définie par l'utilisateur qui lui est transmise.
  2. Le deuxième paramètre est utilisé pour spécifier le type de données de retour de la fonction.

Données:

Dans l'ensemble de ce guide, nous n'utilisons qu'un seul PySpark DataFrame pour la démonstration. Toutes les fonctions définies par l'utilisateur que nous définissons sont appliquées sur ce PySpark DataFrame. Assurez-vous de créer ce DataFrame dans votre environnement après l'installation de PySpark.



importer pyspark

depuis pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Indice Linux' ).getOrCreate()

depuis pyspark.sql.functions importer pandas_udf

à partir de l'importation de pyspark.sql.types *

importer des pandas en tant que panda

# détails sur les légumes

légume =[{ 'taper' : 'légume' , 'nom' : 'tomate' , 'localiser_pays' : 'ETATS-UNIS' , 'quantité' : 800 },

{ 'taper' : 'fruit' , 'nom' : 'banane' , 'localiser_pays' : 'CHINE' , 'quantité' : vingt },

{ 'taper' : 'légume' , 'nom' : 'tomate' , 'localiser_pays' : 'ETATS-UNIS' , 'quantité' : 800 },

{ 'taper' : 'légume' , 'nom' : 'Mangue' , 'localiser_pays' : 'JAPON' , 'quantité' : 0 },

{ 'taper' : 'fruit' , 'nom' : 'citron' , 'localiser_pays' : 'INDE' , 'quantité' : 1700 },

{ 'taper' : 'légume' , 'nom' : 'tomate' , 'localiser_pays' : 'ETATS-UNIS' , 'quantité' : 1200 },

{ 'taper' : 'légume' , 'nom' : 'Mangue' , 'localiser_pays' : 'JAPON' , 'quantité' : 0 },

{ 'taper' : 'fruit' , 'nom' : 'citron' , 'localiser_pays' : 'INDE' , 'quantité' : 0 }

]

# créer le dataframe du marché à partir des données ci-dessus

market_df = linuxhint_spark_app.createDataFrame(légumes)

market_df.show()

Sortir:

Ici, nous créons ce DataFrame avec 4 colonnes et 8 lignes. Maintenant, nous utilisons pandas_udf() pour créer les fonctions définies par l'utilisateur et les appliquer à ces colonnes.

Pandas_udf() avec différents types de données

Dans ce scénario, nous créons des fonctions définies par l'utilisateur avec pandas_udf() et les appliquons sur des colonnes et affichons les résultats à l'aide de la méthode select(). Dans chaque cas, nous utilisons pandas.Series lorsque nous effectuons les opérations vectorisées. Cela considère les valeurs de colonne comme un tableau unidimensionnel et l'opération est appliquée sur la colonne. Dans le décorateur lui-même, nous spécifions le type de retour de la fonction.

Exemple 1 : Pandas_udf() avec le type de chaîne

Ici, nous créons deux fonctions définies par l'utilisateur avec le type de retour de chaîne pour convertir les valeurs de colonne de type chaîne en majuscules et minuscules. Enfin, nous appliquons ces fonctions sur les colonnes 'type' et 'locate_country'.

# Convertir la colonne de type en majuscule avec pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series :

return i.str.upper()

# Convertir la colonne locate_country en minuscule avec pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series :

retourner i.str.lower()

# Afficher les colonnes en utilisant select()

market_df.select( 'taper' ,type_majuscule( 'taper' ), 'localiser_pays' ,
country_lower_case( 'localiser_pays' )).montrer()

Sortir:

Explication:

La fonction StringType() est disponible dans le module pyspark.sql.types. Nous avons déjà importé ce module lors de la création du PySpark DataFrame.

  1. Tout d'abord, UDF (fonction définie par l'utilisateur) renvoie les chaînes en majuscules à l'aide de la fonction str.upper(). Le str.upper() est disponible dans la structure de données de la série (car nous convertissons en série avec une flèche à l'intérieur de la fonction) qui convertit la chaîne donnée en majuscule. Enfin, cette fonction est appliquée à la colonne 'type' qui est spécifiée dans la méthode select(). Auparavant, toutes les chaînes de la colonne type étaient en minuscules. Maintenant, ils sont changés en majuscules.
  2. Deuxièmement, UDF renvoie les chaînes en majuscules à l'aide de la fonction str.lower(). Le str.lower() est disponible dans la structure de données de la série qui convertit la chaîne donnée en minuscules. Enfin, cette fonction est appliquée à la colonne 'type' qui est spécifiée dans la méthode select(). Auparavant, toutes les chaînes de la colonne type étaient en majuscules. Maintenant, ils sont changés en minuscules.

Exemple 2 : Pandas_udf() avec un type entier

Créons une UDF qui convertit la colonne d'entiers PySpark DataFrame en série Pandas et ajoutons 100 à chaque valeur. Passez la colonne 'quantité' à cette fonction dans la méthode select().

# Ajouter 100

@pandas_udf(TypeEntier())

def add_100(i: panda.Series) -> panda.Series :

retour i+ 100

# Passez la colonne de quantité à la fonction ci-dessus et affichez.

market_df.select( 'quantité' ,add_100( 'quantité' )).montrer()

Sortir:

Explication:

À l'intérieur de l'UDF, nous itérons toutes les valeurs et les convertissons en séries. Après cela, nous ajoutons 100 à chaque valeur de la série. Enfin, nous passons la colonne 'quantité' à cette fonction et nous pouvons voir que 100 est ajouté à toutes les valeurs.

Pandas_udf() avec différents types de données à l'aide de Groupby() et Agg()

Regardons les exemples pour passer l'UDF aux colonnes agrégées. Ici, les valeurs des colonnes sont d'abord regroupées à l'aide de la fonction groupby() et l'agrégation est effectuée à l'aide de la fonction agg(). Nous passons notre UDF à l'intérieur de cette fonction d'agrégation.

Syntaxe:

pyspark_dataframe_object.groupby( 'grouping_column' ).agg(UDF
(pyspark_dataframe_object[ 'colonne' ]))

Ici, les valeurs de la colonne de regroupement sont regroupées en premier. Ensuite, l'agrégation se fait sur chaque donnée groupée par rapport à notre UDF.

Exemple 1 : Pandas_udf() avec la moyenne agrégée()

Ici, nous créons une fonction définie par l'utilisateur avec un type de retour float. À l'intérieur de la fonction, nous calculons la moyenne à l'aide de la fonction mean(). Cette UDF est transmise à la colonne 'quantité' pour obtenir la quantité moyenne pour chaque type.

# renvoie la moyenne/moyenne

@pandas_udf( 'flotter' )

def fonction_moyenne(i: panda.Series) -> float :

retourner je.moyenne()

# Passez la colonne de quantité à la fonction en regroupant la colonne de type.

market_df.groupby( 'taper' ).agg(fonction_moyenne(market_df[ 'quantité' ])).montrer()

Sortir:

Nous regroupons en fonction des éléments dans la colonne 'type'. Deux groupes sont formés – « fruit » et « légume ». Pour chaque groupe, la moyenne est calculée et renvoyée.

Exemple 2 : Pandas_udf() avec Aggregate Max() et Min()

Ici, nous créons deux fonctions définies par l'utilisateur avec le type de retour entier (int). La première UDF renvoie la valeur minimale et la seconde UDF renvoie la valeur maximale.

# pandas_udf qui renvoie la valeur minimale

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int :

retourner i.min()

# pandas_udf qui renvoie la valeur maximale

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int :

retourner i.max()

# Passez la colonne de quantité au min_ pandas_udf en regroupant locate_country.

market_df.groupby( 'localiser_pays' ).agg(min_(market_df[ 'quantité' ])).montrer()

# Passez la colonne de quantité au max_ pandas_udf en regroupant locate_country.

market_df.groupby( 'localiser_pays' ).agg(max_(market_df[ 'quantité' ])).montrer()

Sortir:

Pour renvoyer les valeurs minimales et maximales, nous utilisons les fonctions min() et max() dans le type de retour des UDF. Maintenant, nous regroupons les données dans la colonne « locate_country ». Quatre groupes sont formés (« CHINE », « INDE », « JAPON », « USA »). Pour chaque groupe, nous retournons la quantité maximale. De même, nous renvoyons la quantité minimale.

Conclusion

Fondamentalement, le pandas_udf () est utilisé pour effectuer les opérations vectorisées sur notre PySpark DataFrame. Nous avons vu comment créer le pandas_udf() et l'appliquer au PySpark DataFrame. Pour une meilleure compréhension, nous avons discuté des différents exemples en considérant tous les types de données (string, float et integer). Il peut être possible d'utiliser pandas_udf() avec groupby() via la fonction agg().