Apr 21, 2016

Repartition Spark Dataframes

When using spark dataframes, we may have one common question:

Do we need to repartition the data frame by some field(s) commonly used in later joins or aggregations?

It is really not a simple question for we have to calculate carefully on the communication cost, partition shuffle cost, etc. My experience is that, when joining large data, YOU SHOULD!

HOW?
Repartitioning the spark data frame by some fields is not that straightforward, because the data frame object does not come with .partitionBy() method. However, dataframe is essentially a RDD with structured type mapping, so we can repartition the underlying RDD and create a new data frame out of that. Here is the code in pyspark:

sqlsc = SQLContext(sc)
df_new = sqlsc.createDataFrame(
    df_old.rdd.map(lambda x: (x[0], x)).partitionBy(5).values(),
    df_old.schema)

In the above code, df_new is created via partitioning df_old into 5 partitions by the first field as the key.


I still have to emphasize that weigh the benefit and cost carefully before you do the repartition for repartition cost is also high. Other than joining large data frames, I don't do the repartition in my practice. Another way to speed up the join when data is not that large is to try dataframe.coalesce(numPartitions) function (since v1.4) to reduce the number of partitions. Shuffling the data is typically not involved in the coalesce operation.

Mar 2, 2016

Difference between SVM and Logistic Regression -- actually not that much different

It is always a very hot topic --  what is the difference between SVM and logistic. The answer is actually too easy too list, isn't it?

The two algorithms are developed from very different intuitions: SVM aims to separate the positives and negatives with the maximal margin in the high dimensional linear space, while logistic regression tries to estimate the underlying probability for a positive observation given the independent variables.

The approaches to estimate the coefficients are also very different, SVM is solved by quadratic programming, while logistic regression is typically estimated by maximal likelihood estimation (MLE).

SVM is typically working with kernel tricks, which enable it to fitting the non-linear hyperspace.  Though logistic regression can also work with kernel but is less popular.

On the other hand , logistic regression can produce the probabilistic output, i.e., the positive probability given independent variables. This could be very meaningful in later analysis in many business cases. Moreover, because of its deep roots in statistics research, the significance testing and feature selection methods are well-studied. Actually, many ideas and formulas in the significance testing and feature selection methods for the logistic regression can be modified to the SVM situation as well. For example, forward backward selection can be easily modified to the context of SVM, and both of them can be regularized via \(L_1\) norm penalty. So when the results require more statistical soundness, I would prefer logistic regression. It does not mean we don't have any results for SVM but just less "well-known".


Similarities between SVM and logistic regression

But now I would like to talk about the similarity between them: they actually belongs to the same family of optimization problem but with different loss function.

For SVM (with soft margin), the optimization problem isto minimize the follows
\begin{equation}J_{\mathrm{SVM}}=\frac{1}{n}\sum_{i=1}^n\bigl[(1-y_i(\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0)\bigr]_{+}+\lambda\|\boldsymbol\beta\|^2, \text{where } y_i\in\{1, -1\}.\end{equation}
The part in the summation is the hinge loss (\([\ ]_+\) means flooring at 0). So SVM is actually minimizing the total hinge loss.

For logistic regression, the optimization problem is to minimize the follows
\begin{equation}\label{opt-lr-1}J_{\mathrm{RL}}=\frac{1}{n}\sum_{i=1}^n\ln\left[1+e^{-y_i(\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0)}\right], \text{where } y_i\in\{1, -1\}.\end{equation}
The loss function here is known as the (negative) binomial log-likelihood or simply logistic loss.

Remark: the label \(y_i\) is very critical to the form of the loss function, though does not impact the mathematical meaning. In the formula above, as in the most of the cases, I used \(\{1, -1\}\) to indicate the positive and negative observations. Using \(\{1, 0\}\) to indicate the positives and negatives is also popular, especially for logistic regression. If using \(y_i\in\{1, 0\}\), the optimization in \eqref{opt-lr-1} would become to maximize the follows:
\begin{equation}\label{opt-lr-2}\frac{1}{n}\sum_{i=1}^n\left[y_i\ln\left(\frac{1}{1+e^{-(\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0)}}\right)+(1-y_i)\ln\left(\frac{1}{1+e^{\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0}}\right)\right], \text{where } y_i\in\{1, 0\}.\end{equation}
\eqref{opt-lr-2} is exactly the MLE for logistic regression.

Feb 18, 2016

Setup spyder for Spark -- a step-by-step tutorial

Although there are many good online tutorials about spark coding in Scala, Java, or Python for the beginners, when a beginner start to put all the pieces together for their "Hello World" spark application, he or she can always find another important piece of the puzzle missing, which is very frustrating. So, why not another tutorial, even maybe a trivial one.

I particularly like Spyder as my python IDE instead of the popular Jupyter Notebook, because I came from a Matlab background and Spyder offer an interface really like Matlab. Actually, most the steps in this post won't impact which IDE you use, and you can still follow the most critical steps in this post.

Spark and Python Environment


For a beginner I would recommend to start with a virtual machine or a dock with major components already setup, such as Cloudera, which is the one I use for this post. Cloudera vm is essentially a Linux system with Python 2.6. You can update the python ecosystem using Anaconda, which would install Spyder by default.

Launch Spyder for Spark

setup environment variables in your .bashrc 

Edit your .bashrc in you terminal: 

[cloudera@quickstart ~]$ gedit ~/.bashrc

Add the following lines to the file: 

export SPARK_HOME="/usr/lib/spark"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
 


where $SPARK_HOME points to your spark folder. The highlighted folder is the folder in my case. $PYTHONPATH is the location where later python interpreter finds the spark library. Save, exit, and reload .bashrc in the terminal to make it effective:

source ~/.bashrc

Launch Spyder for Spark coding

First find your Spyder  program and make a copy:

[cloudera@quickstart ~]$ cp ~/anaconda2/bin/spyder ~/anaconda2/bin/spyder.py

The highlighted part is your Spyder. Mine is the default Anaconda configuration. If you do not use the default configuration or don't know where is your Spyder, just type which spyder in ternimal.
Now lanch Spyader for spark by typing the following line in the terminal:

spark-submit ~/anaconda2/bin/spyder.py

Test spark application

Run the following codes in Spyder to test your spark:

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")

testFile = "file:///one/of/your/text_file.txt"
testData = sc.textFile(testFile).cache()
numAs = testData.filter(lambda s: 'a' in s).count()
numBs = testData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)



Make sure the highlighted part points to a local file in your folder (not HDFS).

Load third party package for your Spark application

We usually need third party packages for spark application, one good example is spark-csv, which allows you to read csv file into spark RDD. I will just use spark-csv as an example.

Download the jar files. spark-csv needs two jar files: spark-csv_2.10-1.3.0.jar and commons-csv-1.2.jar.

Lunch Spyder with the two jars in terminal:

[cloudera@quickstart ~]$ spark-submit --jars /home/cloudera/lib/Spark/commons-csv-1.2.jar,/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar ~/anaconda2/bin/spyder.py

Change the highlighted part to the location of your jar files, separated by comma (no space). Try the following script to test the package:

from pyspark import SparkContext, SQLContext

sc = SparkContext("local", "Simple App")
testFile = "file:///
one/of/your/csv_file.csv"
sqlContext = SQLContext(sc)
df = sqlContext.load(source="com.databricks.spark.csv", header = 'true', inferSchema = 'true',path = testFile)
df.show()


You sould be able to see first few rows of the csv data.

Load the third party packages by default: It could be very tedious to load the external package from the command line. You can add the package to the default list, and they will be loaded automatically every time you launch spark-submit. Edit the spark default configuration file via terminal:

[cloudera@quickstart ~]$ sudo gedit $SPARK_HOME/conf/spark-defaults.conf

Add the following line to the file:

spark.driver.extraClassPath /home/cloudera/lib/Spark/commons-csv-1.2.jar:/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar

The highlighted part is the jar files you would like to load by default, separated by ":".

Enjoy Spark!




Jan 28, 2016

Simple aggregation performance comparison: R data.tables vs Python pandas

Motivation

Both R and python are popular tools for data scientist and statisticians. The well-known package data.tables in R and pandas in Python make the data manipulation task much easier and more efficient. Both of them are nowadays well adopted by more and more statistics and data science tools and packages. Most of popular machine learning algorithms are available in both R version and python version.

There is really no way to compare these two tools because the choices that we make really depend our own projects, coding environment, coding habits, tools availability, and even personal preference.
This comparison is just for my own curiosity.

Particularly, I am more interested in their aggregation performance. This is because I often have to run some model to provide predictions for each individual, and the prediction is typically based on the data of each individual. So group and aggregation would be typical operations for me:
  1. Split the big data into multiple pieces each of which presents an individual
  2. Run my own model against each small piece of data
  3. Put the prediction for each individual together in a large data set.
 Both data.tables and pandas can make such job easy, and I observe there is a significant performance difference between the two tools when performing such task. Here I just provide a very simple comparison to highlight the difference.

Test

The test procedure is pretty straightforward:
  1. Create a table with 100K row, and each row has a unique ID (this is not part of test but the data preparation).
  2. Expend each record to 100 rows (totally 10m), and create the 2nd column (called seq)
  3. Calculate the average value of seq for each ID.
The 3rd operation is a typical aggregation to a grouped data. The reason I create the 2nd test step is that, quite often, the output of a individual prediction is not only a value or a flag, but also series of statistics, such as scores, p-values, confidence intervals, and even predicted time series. In other words, the output is another data frame, and the program should be able to concatenate them into a large data set efficiently.

Here is the testing environment:
  • data.tables 1.9.6 with R 3.2.2 (64 bits)
  • pandas 0.16.2 with Python 2.7.10 |Anaconda 2.3.0 (64-bit)

Code and Result

R/data.tables:
> a = data.table(ID = 1:100e3)
> 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.28    0.08    0.36 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.27    0.03    0.29 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.20    0.01    0.21 
> 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.10    0.02    0.11 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.09    0.00    0.09 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.10    0.00    0.09 

Python/pandas:

import pandas as pd
import numpy as np
import math

def f(d, k):

    return(pd.DataFrame({'Seq': np.arange(k)}))

a = pd.DataFrame({'ID': np.arange(100e3)})
b = a.groupby(a['ID']).apply(f, 100).reset_index()
c = b.groupby(b['ID'])[['Seq']].mean()



In [25]: %timeit b = a.groupby(a['ID']).apply(f, 100).reset_index()
...: %timeit c = b.groupby(b['ID'])[['Seq']].mean()
1 loops, best of 3: 26.7 s per loop

1 loops, best of 3: 421 ms per loop


Clearly, data.tables is more efficient.

One interesting thing is that when I do the 3rd test step (c = b.groupby(b['ID'])[['Seq']].mean()), I use ID as a pandas Series column instead of as an index object. This is because, I found that as the number of operating rows increases, using columns/Series is more efficient than using index. Here is the same test using index:

In [28]: b = a.groupby(a['ID']).apply(f, 100)
In [29]: %timeit c = b.groupby(b.index).mean()
1 loops, best of 3: 29 s per loop



Other Notes

If coding the 2nd testing step in python, we can actually output a 1D series, and do the transpose (stack). This is more efficient than the code above:

def f2(d, k):
    return pd.Series(np.arange(k))
In [32]: %timeit b = a.groupby(a['ID']).apply(f2, 100).stack()
1 loops, best of 3: 11 s per loop


However, we can do this only if the output for each individual record can be written in a 1D array object during the aggregation. If we have to use data frame to store the output for each split of data, the efficiency would be significantly impacted.