Aiuta lo sviluppo del sito, condividendo l'articolo con gli amici!

Introduzione a Spark Transformations

Una trasformazione è una funzione che restituisce un nuovo RDD modificando gli RDD esistenti. L'RDD di input non viene modificato poiché gli RDD non sono modificabili. Tutte le trasformazioni vengono eseguite da Spark in modo pigro: i risultati non vengono calcolati immediatamente. Il calcolo delle trasformazioni avviene solo quando una determinata azione viene eseguita sull'RDD.

Tipi di trasformazioni in Spark

Sono classificati in due tipi:

  • Trasformazione stretta: Tutti i dati richiesti per calcolare i record in una partizione risiedono in una partizione dell'RDD padre. Si verifica nel caso dei seguenti metodi:

map(), flatMap(), filter(), sample(), union() ecc.

  • Wide Transformation: Tutti i dati richiesti per calcolare i record in una partizione risiedono in più di una partizione negli RDD padre. Si verifica nel caso dei seguenti metodi:

distinct(), groupByKey(), reduceByKey(), join() , repartition() ecc.

Esempi di trasformazioni Spark

Qui discutiamo degli esempi menzionati di seguito.

1. Trasformazioni strette

  • map(): questa funzione prende una funzione come parametro e applica questa funzione a ogni elemento dell'RDD.

Codice:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERRORE)
"val rdd=sc.parallelize(Array(10,15,50,100))
"println(L&39;RDD di base è:)
">"rdd.foreach(x=print(x+ ))
println()
>val rddNew=rdd.map(x=x+10)
"println(RDD dopo aver applicato il metodo MAP:)
""rddNew.foreach(x=>print(x+ ))

Uscita:

Nel metodo MAP sopra, aggiungiamo ogni elemento di 10 e questo si riflette nell'output.

  • FlatMap(): è simile alla mappa ma può generare più elementi di output corrispondenti a un elemento di input. Pertanto, la funzione deve restituire una sequenza anziché un singolo elemento.

Codice:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array(1:2:3,4:5:6))
">"val rddNew=rdd.flatMap(x=x.split())
"rddNew.foreach(x=>print(x+ ))

Uscita:

Questa funzione passata come parametro divide ogni input per ":" e restituisce un array e il metodo FlatMap appiattisce l'array.

  • filter(): Prende una funzione come parametro e restituisce tutti gli elementi dell'RDD per i quali la funzione restituisce true.

Codice:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERRORE)
""val rdd=sc.parallelize(Array(com.whatsapp.prod,com.facebook.prod,com.instagram.prod,com.whatsapp.test))
""println(L&39;RDD di base è:)
">"rdd.foreach(x=print(x+ ))
println()
>"val rddNew=rdd.filter (x=!x.contains(test))
"println(RDD dopo aver applicato il metodo MAP:)
""rddNew.foreach(x=>print(x+ ))

Uscita:

Nel codice sopra prendiamo stringhe che non contengono la parola “test”.

  • sample(): Restituisce una frazione dei dati, con o senza sostituzione, utilizzando un determinato seme generatore di numeri casuali (questo è però opzionale).

Codice:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5,6,7,10,11,12,15,20,50))
val rddNew=rdd.sample(false,.5)
"rddNew.foreach(x=>print(x+ ))

Uscita:

Nel codice sopra, stiamo ricevendo campioni casuali senza sostituzione.

  • union(): Restituisce l'unione dell'RDD sorgente e dell'RDD passato come parametro.

Codice:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5))
val rdd2=sc.parallelize(Array(-1,-2,-3,-4,-5))
val rddUnion=rdd.union(rdd2)
"rddUnion.foreach(x=>print(x+ ))

Uscita:

Il risultante RDD rddUnion contiene tutti gli elementi di rdd e rdd2.

2. Ampie trasformazioni

  • distinct(): Questo metodo restituisce gli elementi distinti dell'RDD.

Codice:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERRORE)
"val rdd=sc.parallelize(Array(1,1,3,4,5,5,5))
"println(L&39;RDD di base è:)
">"rdd.foreach(x=print(x+ ))
println()
val rddNew=rdd.distinct()
"println(RDD dopo aver applicato il metodo MAP:)
""rddNew.foreach(x=>print(x+ ))

Uscita:

stiamo ottenendo gli elementi distinti 4,1,3,5 nell'output.

  • groupByKey(): Questa funzione è applicabile agli RDD a coppie. Un RDD a coppie è uno in cui ogni elemento è una tupla in cui il primo elemento è la chiave e il secondo elemento è il valore. Questa funzione raggruppa tutti i valori corrispondenti a un tasto.

Codice:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100)))
"val rddNew=rdd.groupByKey()
"rddNew.foreach(x=>print(x+ ))

Uscita:

Come previsto tutti i valori per i tasti “a” e “b” sono raggruppati insieme.

  • reduceByKey(): Questa operazione è applicabile anche agli RDD a coppie. Aggrega i valori per ogni chiave secondo un metodo di riduzione fornito che deve essere del tipo (v,v)=v.

Codice:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100),(c,50)))
">val rddNew=rdd.reduceByKey((x,y)=x+y )
"rddNew.foreach(x=>print(x+ ))

Uscita:

Nel caso precedente, stiamo sommando tutti i valori di una chiave.

  • join(): L'operazione di unione è applicabile agli RDD a coppie. Il metodo di unione combina due set di dati in base alla chiave.

Codice:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERRORE)
""val rdd1=sc.parallelize(Array((key1,10),(key2,15),(key3,100)))
""val rdd2=sc.parallelize(Array((key2,11),(key2,20),(key1,75)))
"val rddJoined=rdd1.join(rdd2)
"println(RDD dopo l&39;unione:)
""rddJoined.foreach(x=>print(x+ ))

Uscita:

  • repartition(): Rimescola i dati nell'RDD in modo casuale nel numero di partizioni passate come parametro. Può sia aumentare che diminuire le partizioni.

Codice:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5,10,15,18,243,50),10)
"println(Partizioni prima di: +rdd.getNumPartitions)
"val rddNew=rdd.repartition(15)
"println(Partizioni dopo: +rddNew.getNumPartitions)"

Uscita:

Nel caso precedente, stiamo aumentando le partizioni da 10 a 15.

Aiuta lo sviluppo del sito, condividendo l'articolo con gli amici!