Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Feb 18, 2016

Setup spyder for Spark -- a step-by-step tutorial

Although there are many good online tutorials about spark coding in Scala, Java, or Python for the beginners, when a beginner start to put all the pieces together for their "Hello World" spark application, he or she can always find another important piece of the puzzle missing, which is very frustrating. So, why not another tutorial, even maybe a trivial one.

I particularly like Spyder as my python IDE instead of the popular Jupyter Notebook, because I came from a Matlab background and Spyder offer an interface really like Matlab. Actually, most the steps in this post won't impact which IDE you use, and you can still follow the most critical steps in this post.

Spark and Python Environment


For a beginner I would recommend to start with a virtual machine or a dock with major components already setup, such as Cloudera, which is the one I use for this post. Cloudera vm is essentially a Linux system with Python 2.6. You can update the python ecosystem using Anaconda, which would install Spyder by default.

Launch Spyder for Spark

setup environment variables in your .bashrc 

Edit your .bashrc in you terminal: 

[cloudera@quickstart ~]$ gedit ~/.bashrc

Add the following lines to the file: 

export SPARK_HOME="/usr/lib/spark"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
 


where $SPARK_HOME points to your spark folder. The highlighted folder is the folder in my case. $PYTHONPATH is the location where later python interpreter finds the spark library. Save, exit, and reload .bashrc in the terminal to make it effective:

source ~/.bashrc

Launch Spyder for Spark coding

First find your Spyder  program and make a copy:

[cloudera@quickstart ~]$ cp ~/anaconda2/bin/spyder ~/anaconda2/bin/spyder.py

The highlighted part is your Spyder. Mine is the default Anaconda configuration. If you do not use the default configuration or don't know where is your Spyder, just type which spyder in ternimal.
Now lanch Spyader for spark by typing the following line in the terminal:

spark-submit ~/anaconda2/bin/spyder.py

Test spark application

Run the following codes in Spyder to test your spark:

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")

testFile = "file:///one/of/your/text_file.txt"
testData = sc.textFile(testFile).cache()
numAs = testData.filter(lambda s: 'a' in s).count()
numBs = testData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)



Make sure the highlighted part points to a local file in your folder (not HDFS).

Load third party package for your Spark application

We usually need third party packages for spark application, one good example is spark-csv, which allows you to read csv file into spark RDD. I will just use spark-csv as an example.

Download the jar files. spark-csv needs two jar files: spark-csv_2.10-1.3.0.jar and commons-csv-1.2.jar.

Lunch Spyder with the two jars in terminal:

[cloudera@quickstart ~]$ spark-submit --jars /home/cloudera/lib/Spark/commons-csv-1.2.jar,/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar ~/anaconda2/bin/spyder.py

Change the highlighted part to the location of your jar files, separated by comma (no space). Try the following script to test the package:

from pyspark import SparkContext, SQLContext

sc = SparkContext("local", "Simple App")
testFile = "file:///
one/of/your/csv_file.csv"
sqlContext = SQLContext(sc)
df = sqlContext.load(source="com.databricks.spark.csv", header = 'true', inferSchema = 'true',path = testFile)
df.show()


You sould be able to see first few rows of the csv data.

Load the third party packages by default: It could be very tedious to load the external package from the command line. You can add the package to the default list, and they will be loaded automatically every time you launch spark-submit. Edit the spark default configuration file via terminal:

[cloudera@quickstart ~]$ sudo gedit $SPARK_HOME/conf/spark-defaults.conf

Add the following line to the file:

spark.driver.extraClassPath /home/cloudera/lib/Spark/commons-csv-1.2.jar:/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar

The highlighted part is the jar files you would like to load by default, separated by ":".

Enjoy Spark!




Jan 28, 2016

Simple aggregation performance comparison: R data.tables vs Python pandas

Motivation

Both R and python are popular tools for data scientist and statisticians. The well-known package data.tables in R and pandas in Python make the data manipulation task much easier and more efficient. Both of them are nowadays well adopted by more and more statistics and data science tools and packages. Most of popular machine learning algorithms are available in both R version and python version.

There is really no way to compare these two tools because the choices that we make really depend our own projects, coding environment, coding habits, tools availability, and even personal preference.
This comparison is just for my own curiosity.

Particularly, I am more interested in their aggregation performance. This is because I often have to run some model to provide predictions for each individual, and the prediction is typically based on the data of each individual. So group and aggregation would be typical operations for me:
  1. Split the big data into multiple pieces each of which presents an individual
  2. Run my own model against each small piece of data
  3. Put the prediction for each individual together in a large data set.
 Both data.tables and pandas can make such job easy, and I observe there is a significant performance difference between the two tools when performing such task. Here I just provide a very simple comparison to highlight the difference.

Test

The test procedure is pretty straightforward:
  1. Create a table with 100K row, and each row has a unique ID (this is not part of test but the data preparation).
  2. Expend each record to 100 rows (totally 10m), and create the 2nd column (called seq)
  3. Calculate the average value of seq for each ID.
The 3rd operation is a typical aggregation to a grouped data. The reason I create the 2nd test step is that, quite often, the output of a individual prediction is not only a value or a flag, but also series of statistics, such as scores, p-values, confidence intervals, and even predicted time series. In other words, the output is another data frame, and the program should be able to concatenate them into a large data set efficiently.

Here is the testing environment:
  • data.tables 1.9.6 with R 3.2.2 (64 bits)
  • pandas 0.16.2 with Python 2.7.10 |Anaconda 2.3.0 (64-bit)

Code and Result

R/data.tables:
> a = data.table(ID = 1:100e3)
> 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.28    0.08    0.36 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.27    0.03    0.29 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.20    0.01    0.21 
> 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.10    0.02    0.11 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.09    0.00    0.09 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.10    0.00    0.09 

Python/pandas:

import pandas as pd
import numpy as np
import math

def f(d, k):

    return(pd.DataFrame({'Seq': np.arange(k)}))

a = pd.DataFrame({'ID': np.arange(100e3)})
b = a.groupby(a['ID']).apply(f, 100).reset_index()
c = b.groupby(b['ID'])[['Seq']].mean()



In [25]: %timeit b = a.groupby(a['ID']).apply(f, 100).reset_index()
...: %timeit c = b.groupby(b['ID'])[['Seq']].mean()
1 loops, best of 3: 26.7 s per loop

1 loops, best of 3: 421 ms per loop


Clearly, data.tables is more efficient.

One interesting thing is that when I do the 3rd test step (c = b.groupby(b['ID'])[['Seq']].mean()), I use ID as a pandas Series column instead of as an index object. This is because, I found that as the number of operating rows increases, using columns/Series is more efficient than using index. Here is the same test using index:

In [28]: b = a.groupby(a['ID']).apply(f, 100)
In [29]: %timeit c = b.groupby(b.index).mean()
1 loops, best of 3: 29 s per loop



Other Notes

If coding the 2nd testing step in python, we can actually output a 1D series, and do the transpose (stack). This is more efficient than the code above:

def f2(d, k):
    return pd.Series(np.arange(k))
In [32]: %timeit b = a.groupby(a['ID']).apply(f2, 100).stack()
1 loops, best of 3: 11 s per loop


However, we can do this only if the output for each individual record can be written in a 1D array object during the aggregation. If we have to use data frame to store the output for each split of data, the efficiency would be significantly impacted.