Ajutați dezvoltarea site -ului, împărtășind articolul cu prietenii!

Introducere în Spark Transformations

O transformare este o funcție care returnează un nou RDD prin modificarea RDD-urilor existente. RDD de intrare nu este modificat deoarece RDD-urile sunt imuabile. Toate transformările sunt executate de Spark într-o manieră leneșă - Rezultatele nu sunt calculate imediat. Calculul transformărilor are loc numai atunci când o anumită acțiune este efectuată pe RDD.

Tipuri de transformări în Spark

Sunt clasificate în general în două tipuri:

  • Transformare restrânsă: Toate datele necesare pentru a calcula înregistrările dintr-o partiție se află într-o partiție a RDD-ului părinte. Apare în cazul următoarelor metode:

map(), flatMap(), filter(), sample(), union() etc.

  • Wide Transformation: Toate datele necesare pentru a calcula înregistrările dintr-o partiție se află în mai multe partiții din RDD-urile părinte. Apare în cazul următoarelor metode:

distinct(), groupByKey(), reduceByKey(), join() , repartition() etc.

Exemple de transformări Spark

Aici discutăm exemplele menționate mai jos.

1. Transformări înguste

  • map(): Această funcție ia o funcție ca parametru și aplică această funcție fiecărui element al RDD.

Cod:

"val conf=nou SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(EROARE)
"val rdd=sc.parallelize(Matrice(10,15,50,100))
"println(DDR de bază este:)
">"rdd.foreach(x=print(x+ ))
println()
>val rddNew=rdd.map(x=x+10)
"println(RDD după aplicarea metodei MAP:)
""rddNew.foreach(x=>print(x+ ))

Ieșire:

În metoda MAP de mai sus, adăugăm fiecare element cu 10, iar acest lucru se reflectă în rezultat.

  • FlatMap(): Este similar cu harta, dar poate genera mai multe elemente de ieșire corespunzătoare unui articol de intrare. Astfel, funcția trebuie să returneze o secvență în loc de un singur articol.

Cod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Matrice(1:2:3,4:5:6))
">"val rddNew=rdd.flatMap(x=x.split(:))
"rddNew.foreach(x=>print(x+ ))

Ieșire:

Această funcție transmisă ca parametru împarte fiecare intrare cu „:” și returnează o matrice, iar metoda FlatMap aplatizează matricea.

  • filter(): Ia o funcție ca parametru și returnează toate elementele RDD pentru care funcția returnează adevărat.

Cod:

"val conf=nou SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(EROARE)
""val rdd=sc.parallelize(Array(com.whatsapp.prod,com.facebook.prod,com.instagram.prod,com.whatsapp.test))
""println(DDR de bază este:)
">"rdd.foreach(x=print(x+ ))
println()
>"val rddNew=rdd.filter (x=!x.contains(test))
"println(RDD după aplicarea metodei MAP:)
""rddNew.foreach(x=>print(x+ ))

Ieșire:

În codul de mai sus luăm șiruri care nu au cuvântul „test”.

  • sample(): Returnează o fracțiune din date, cu sau fără înlocuire, folosind un generator de numere aleatorii (Acesta este totuși opțional).

Cod:

"val conf=nou SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Matrice(1,2,3,4,5,6,7,10,11,12,15,20,50))
val rddNew=rdd.sample(false,.5)
"rddNew.foreach(x=>print(x+ ))

Ieșire:

În codul de mai sus, primim mostre aleatorii fără înlocuire.

  • union(): Returnează uniunea RDD sursă și RDD transmis ca parametru.

Cod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Matrice(1,2,3,4,5))
val rdd2=sc.parallelize(Matrice(-1,-2,-3,-4,-5))
val rddUnion=rdd.union(rdd2)
"rddUnion.foreach(x=>print(x+ ))

Ieșire:

RDD rddUnion rezultată conține toate elementele din rdd și rdd2.

2. Transformări ample

  • distinct(): Această metodă returnează elementele distincte ale RDD.

Cod:

"val conf=nou SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(EROARE)
"val rdd=sc.parallelize(Matrice(1,1,3,4,5,5,5))
"println(DDR de bază este:)
">"rdd.foreach(x=print(x+ ))
println()
val rddNew=rdd.distinct()
"println(RDD după aplicarea metodei MAP:)
""rddNew.foreach(x=>print(x+ ))

Ieșire:

primim elementele distincte 4,1,3,5 în ieșire.

  • groupByKey(): Această funcție este aplicabilă RDD-urilor perechi. Un RDD în pereche este unul în care fiecare element este un tuplu unde primul element este cheia și al doilea element este valoarea. Această funcție grupează toate valorile corespunzătoare unei taste.

Cod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Matrice((a,1),(b,2),(a,3),(b,10),(a,100)))
"val rddNew=rdd.groupByKey()
"rddNew.foreach(x=>print(x+ ))

Ieșire:

După cum era de așteptat, toate valorile pentru cheile „a” și „b” sunt grupate împreună.

  • reduceByKey(): Această operație se aplică și RDD-urilor în funcție de perechi. Acesta adună valorile pentru fiecare cheie conform unei metode de reducere furnizată, care trebuie să fie de tipul (v,v)=v.

Cod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Matrice((a,1),(b,2),(a,3),(b,10),(a,100),(c,50)))
">val rddNew=rdd.reduceByKey((x,y)=x+y )
"rddNew.foreach(x=>print(x+ ))

Ieșire:

În cazul de mai sus, însumăm toate valorile unei chei.

  • join(): Operația de îmbinare este aplicabilă RDD-urilor în funcție de perechi. Metoda de îmbinare combină două seturi de date bazate pe cheie.

Cod:

"val conf=nou SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(EROARE)
""val rdd1=sc.parallelize(Matrice((key1,10),(key2,15),(key3,100)))
""val rdd2=sc.parallelize(Matrice((key2,11),(key2,20),(key1,75)))
"val rddJoined=rdd1.join(rdd2)
"println(RDD după alăturare:)
""rddJoined.foreach(x=>print(x+ ))

Ieșire:

  • repartition(): Regrupează aleatoriu datele din RDD în numărul de partiții trecute ca parametru. Poate crește și micșora partițiile.

Cod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Matrice(1,2,3,4,5,10,15,18,243,50),10)
"println(Partiții înainte de: +rdd.getNumPartitions)
"val rddNew=rdd.repartition(15)
"println(Partiții după: +rddNew.getNumPartitions)"

Ieșire:

În cazul de mai sus, creștem partițiile de la 10 la 15.

Ajutați dezvoltarea site -ului, împărtășind articolul cu prietenii!