When using spark dataframes, we may have one common question:
Do we need to repartition the data frame by some field(s) commonly used in later joins or aggregations?
It is really not a simple question for we have to calculate carefully on the communication cost, partition shuffle cost, etc. My experience is that, when joining large data, YOU SHOULD!
HOW?
Repartitioning the spark data frame by some fields is not that straightforward, because the data frame object does not come with .partitionBy() method. However, dataframe is essentially a RDD with structured type mapping, so we can repartition the underlying RDD and create a new data frame out of that. Here is the code in pyspark:
sqlsc = SQLContext(sc)
df_new = sqlsc.createDataFrame(
df_old.rdd.map(lambda x: (x[0], x)).partitionBy(5).values(),
df_old.schema)
In the above code, df_new is created via partitioning df_old into 5 partitions by the first field as the key.
I still have to emphasize that weigh the benefit and cost carefully before you do the repartition for repartition cost is also high. Other than joining large data frames, I don't do the repartition in my practice. Another way to speed up the join when data is not that large is to try dataframe.coalesce(numPartitions) function (since v1.4) to reduce the number of partitions. Shuffling the data is typically not involved in the coalesce operation.
Showing posts with label Spark. Show all posts
Showing posts with label Spark. Show all posts
Apr 21, 2016
Feb 18, 2016
Setup spyder for Spark -- a step-by-step tutorial
Although there are many good online tutorials about spark coding in Scala, Java, or Python for the beginners, when a beginner start to put all the pieces together for their "Hello World" spark application, he or she can always find another important piece of the puzzle missing, which is very frustrating. So, why not another tutorial, even maybe a trivial one.
I particularly like Spyder as my python IDE instead of the popular Jupyter Notebook, because I came from a Matlab background and Spyder offer an interface really like Matlab. Actually, most the steps in this post won't impact which IDE you use, and you can still follow the most critical steps in this post.
For a beginner I would recommend to start with a virtual machine or a dock with major components already setup, such as Cloudera, which is the one I use for this post. Cloudera vm is essentially a Linux system with Python 2.6. You can update the python ecosystem using Anaconda, which would install Spyder by default.
[cloudera@quickstart ~]$ gedit ~/.bashrc
Add the following lines to the file:
export SPARK_HOME="/usr/lib/spark"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
where $SPARK_HOME points to your spark folder. The highlighted folder is the folder in my case. $PYTHONPATH is the location where later python interpreter finds the spark library. Save, exit, and reload .bashrc in the terminal to make it effective:
source ~/.bashrc
[cloudera@quickstart ~]$ cp ~/anaconda2/bin/spyder ~/anaconda2/bin/spyder.py
The highlighted part is your Spyder. Mine is the default Anaconda configuration. If you do not use the default configuration or don't know where is your Spyder, just type which spyder in ternimal.
Now lanch Spyader for spark by typing the following line in the terminal:
spark-submit ~/anaconda2/bin/spyder.py
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
testFile = "file:///one/of/your/text_file.txt"
testData = sc.textFile(testFile).cache()
numAs = testData.filter(lambda s: 'a' in s).count()
numBs = testData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Make sure the highlighted part points to a local file in your folder (not HDFS).
Download the jar files. spark-csv needs two jar files: spark-csv_2.10-1.3.0.jar and commons-csv-1.2.jar.
Lunch Spyder with the two jars in terminal:
[cloudera@quickstart ~]$ spark-submit --jars /home/cloudera/lib/Spark/commons-csv-1.2.jar,/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar ~/anaconda2/bin/spyder.py
Change the highlighted part to the location of your jar files, separated by comma (no space). Try the following script to test the package:
from pyspark import SparkContext, SQLContext
sc = SparkContext("local", "Simple App")
testFile = "file:///one/of/your/csv_file.csv"
sqlContext = SQLContext(sc)
df = sqlContext.load(source="com.databricks.spark.csv", header = 'true', inferSchema = 'true',path = testFile)
df.show()
You sould be able to see first few rows of the csv data.
Load the third party packages by default: It could be very tedious to load the external package from the command line. You can add the package to the default list, and they will be loaded automatically every time you launch spark-submit. Edit the spark default configuration file via terminal:
[cloudera@quickstart ~]$ sudo gedit $SPARK_HOME/conf/spark-defaults.conf
Add the following line to the file:
spark.driver.extraClassPath /home/cloudera/lib/Spark/commons-csv-1.2.jar:/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar
The highlighted part is the jar files you would like to load by default, separated by ":".
Enjoy Spark!
I particularly like Spyder as my python IDE instead of the popular Jupyter Notebook, because I came from a Matlab background and Spyder offer an interface really like Matlab. Actually, most the steps in this post won't impact which IDE you use, and you can still follow the most critical steps in this post.
Spark and Python Environment
For a beginner I would recommend to start with a virtual machine or a dock with major components already setup, such as Cloudera, which is the one I use for this post. Cloudera vm is essentially a Linux system with Python 2.6. You can update the python ecosystem using Anaconda, which would install Spyder by default.
Launch Spyder for Spark
setup environment variables in your .bashrc
Edit your .bashrc in you terminal:[cloudera@quickstart ~]$ gedit ~/.bashrc
Add the following lines to the file:
export SPARK_HOME="/usr/lib/spark"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
where $SPARK_HOME points to your spark folder. The highlighted folder is the folder in my case. $PYTHONPATH is the location where later python interpreter finds the spark library. Save, exit, and reload .bashrc in the terminal to make it effective:
source ~/.bashrc
Launch Spyder for Spark coding
First find your Spyder program and make a copy:[cloudera@quickstart ~]$ cp ~/anaconda2/bin/spyder ~/anaconda2/bin/spyder.py
The highlighted part is your Spyder. Mine is the default Anaconda configuration. If you do not use the default configuration or don't know where is your Spyder, just type which spyder in ternimal.
Now lanch Spyader for spark by typing the following line in the terminal:
spark-submit ~/anaconda2/bin/spyder.py
Test spark application
Run the following codes in Spyder to test your spark:from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
testFile = "file:///one/of/your/text_file.txt"
testData = sc.textFile(testFile).cache()
numAs = testData.filter(lambda s: 'a' in s).count()
numBs = testData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Make sure the highlighted part points to a local file in your folder (not HDFS).
Load third party package for your Spark application
We usually need third party packages for spark application, one good example is spark-csv, which allows you to read csv file into spark RDD. I will just use spark-csv as an example.Download the jar files. spark-csv needs two jar files: spark-csv_2.10-1.3.0.jar and commons-csv-1.2.jar.
Lunch Spyder with the two jars in terminal:
[cloudera@quickstart ~]$ spark-submit --jars /home/cloudera/lib/Spark/commons-csv-1.2.jar,/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar ~/anaconda2/bin/spyder.py
Change the highlighted part to the location of your jar files, separated by comma (no space). Try the following script to test the package:
from pyspark import SparkContext, SQLContext
sc = SparkContext("local", "Simple App")
testFile = "file:///one/of/your/csv_file.csv"
sqlContext = SQLContext(sc)
df = sqlContext.load(source="com.databricks.spark.csv", header = 'true', inferSchema = 'true',path = testFile)
df.show()
You sould be able to see first few rows of the csv data.
Load the third party packages by default: It could be very tedious to load the external package from the command line. You can add the package to the default list, and they will be loaded automatically every time you launch spark-submit. Edit the spark default configuration file via terminal:
[cloudera@quickstart ~]$ sudo gedit $SPARK_HOME/conf/spark-defaults.conf
Add the following line to the file:
spark.driver.extraClassPath /home/cloudera/lib/Spark/commons-csv-1.2.jar:/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar
The highlighted part is the jar files you would like to load by default, separated by ":".
Enjoy Spark!
Subscribe to:
Posts (Atom)