Apr 21, 2016

Repartition Spark Dataframes

When using spark dataframes, we may have one common question:

Do we need to repartition the data frame by some field(s) commonly used in later joins or aggregations?

It is really not a simple question for we have to calculate carefully on the communication cost, partition shuffle cost, etc. My experience is that, when joining large data, YOU SHOULD!

HOW?
Repartitioning the spark data frame by some fields is not that straightforward, because the data frame object does not come with .partitionBy() method. However, dataframe is essentially a RDD with structured type mapping, so we can repartition the underlying RDD and create a new data frame out of that. Here is the code in pyspark:

sqlsc = SQLContext(sc)
df_new = sqlsc.createDataFrame(
    df_old.rdd.map(lambda x: (x[0], x)).partitionBy(5).values(),
    df_old.schema)

In the above code, df_new is created via partitioning df_old into 5 partitions by the first field as the key.


I still have to emphasize that weigh the benefit and cost carefully before you do the repartition for repartition cost is also high. Other than joining large data frames, I don't do the repartition in my practice. Another way to speed up the join when data is not that large is to try dataframe.coalesce(numPartitions) function (since v1.4) to reduce the number of partitions. Shuffling the data is typically not involved in the coalesce operation.

Mar 2, 2016

Difference between SVM and Logistic Regression -- actually not that much different

It is always a very hot topic --  what is the difference between SVM and logistic. The answer is actually too easy too list, isn't it?

The two algorithms are developed from very different intuitions: SVM aims to separate the positives and negatives with the maximal margin in the high dimensional linear space, while logistic regression tries to estimate the underlying probability for a positive observation given the independent variables.

The approaches to estimate the coefficients are also very different, SVM is solved by quadratic programming, while logistic regression is typically estimated by maximal likelihood estimation (MLE).

SVM is typically working with kernel tricks, which enable it to fitting the non-linear hyperspace.  Though logistic regression can also work with kernel but is less popular.

On the other hand , logistic regression can produce the probabilistic output, i.e., the positive probability given independent variables. This could be very meaningful in later analysis in many business cases. Moreover, because of its deep roots in statistics research, the significance testing and feature selection methods are well-studied. Actually, many ideas and formulas in the significance testing and feature selection methods for the logistic regression can be modified to the SVM situation as well. For example, forward backward selection can be easily modified to the context of SVM, and both of them can be regularized via \(L_1\) norm penalty. So when the results require more statistical soundness, I would prefer logistic regression. It does not mean we don't have any results for SVM but just less "well-known".


Similarities between SVM and logistic regression

But now I would like to talk about the similarity between them: they actually belongs to the same family of optimization problem but with different loss function.

For SVM (with soft margin), the optimization problem isto minimize the follows
\begin{equation}J_{\mathrm{SVM}}=\frac{1}{n}\sum_{i=1}^n\bigl[(1-y_i(\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0)\bigr]_{+}+\lambda\|\boldsymbol\beta\|^2, \text{where } y_i\in\{1, -1\}.\end{equation}
The part in the summation is the hinge loss (\([\ ]_+\) means flooring at 0). So SVM is actually minimizing the total hinge loss.

For logistic regression, the optimization problem is to minimize the follows
\begin{equation}\label{opt-lr-1}J_{\mathrm{RL}}=\frac{1}{n}\sum_{i=1}^n\ln\left[1+e^{-y_i(\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0)}\right], \text{where } y_i\in\{1, -1\}.\end{equation}
The loss function here is known as the (negative) binomial log-likelihood or simply logistic loss.

Remark: the label \(y_i\) is very critical to the form of the loss function, though does not impact the mathematical meaning. In the formula above, as in the most of the cases, I used \(\{1, -1\}\) to indicate the positive and negative observations. Using \(\{1, 0\}\) to indicate the positives and negatives is also popular, especially for logistic regression. If using \(y_i\in\{1, 0\}\), the optimization in \eqref{opt-lr-1} would become to maximize the follows:
\begin{equation}\label{opt-lr-2}\frac{1}{n}\sum_{i=1}^n\left[y_i\ln\left(\frac{1}{1+e^{-(\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0)}}\right)+(1-y_i)\ln\left(\frac{1}{1+e^{\boldsymbol\beta\cdot \boldsymbol x_i+\beta_0}}\right)\right], \text{where } y_i\in\{1, 0\}.\end{equation}
\eqref{opt-lr-2} is exactly the MLE for logistic regression.

Feb 18, 2016

Setup spyder for Spark -- a step-by-step tutorial

Although there are many good online tutorials about spark coding in Scala, Java, or Python for the beginners, when a beginner start to put all the pieces together for their "Hello World" spark application, he or she can always find another important piece of the puzzle missing, which is very frustrating. So, why not another tutorial, even maybe a trivial one.

I particularly like Spyder as my python IDE instead of the popular Jupyter Notebook, because I came from a Matlab background and Spyder offer an interface really like Matlab. Actually, most the steps in this post won't impact which IDE you use, and you can still follow the most critical steps in this post.

Spark and Python Environment


For a beginner I would recommend to start with a virtual machine or a dock with major components already setup, such as Cloudera, which is the one I use for this post. Cloudera vm is essentially a Linux system with Python 2.6. You can update the python ecosystem using Anaconda, which would install Spyder by default.

Launch Spyder for Spark

setup environment variables in your .bashrc 

Edit your .bashrc in you terminal: 

[cloudera@quickstart ~]$ gedit ~/.bashrc

Add the following lines to the file: 

export SPARK_HOME="/usr/lib/spark"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
 


where $SPARK_HOME points to your spark folder. The highlighted folder is the folder in my case. $PYTHONPATH is the location where later python interpreter finds the spark library. Save, exit, and reload .bashrc in the terminal to make it effective:

source ~/.bashrc

Launch Spyder for Spark coding

First find your Spyder  program and make a copy:

[cloudera@quickstart ~]$ cp ~/anaconda2/bin/spyder ~/anaconda2/bin/spyder.py

The highlighted part is your Spyder. Mine is the default Anaconda configuration. If you do not use the default configuration or don't know where is your Spyder, just type which spyder in ternimal.
Now lanch Spyader for spark by typing the following line in the terminal:

spark-submit ~/anaconda2/bin/spyder.py

Test spark application

Run the following codes in Spyder to test your spark:

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")

testFile = "file:///one/of/your/text_file.txt"
testData = sc.textFile(testFile).cache()
numAs = testData.filter(lambda s: 'a' in s).count()
numBs = testData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)



Make sure the highlighted part points to a local file in your folder (not HDFS).

Load third party package for your Spark application

We usually need third party packages for spark application, one good example is spark-csv, which allows you to read csv file into spark RDD. I will just use spark-csv as an example.

Download the jar files. spark-csv needs two jar files: spark-csv_2.10-1.3.0.jar and commons-csv-1.2.jar.

Lunch Spyder with the two jars in terminal:

[cloudera@quickstart ~]$ spark-submit --jars /home/cloudera/lib/Spark/commons-csv-1.2.jar,/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar ~/anaconda2/bin/spyder.py

Change the highlighted part to the location of your jar files, separated by comma (no space). Try the following script to test the package:

from pyspark import SparkContext, SQLContext

sc = SparkContext("local", "Simple App")
testFile = "file:///
one/of/your/csv_file.csv"
sqlContext = SQLContext(sc)
df = sqlContext.load(source="com.databricks.spark.csv", header = 'true', inferSchema = 'true',path = testFile)
df.show()


You sould be able to see first few rows of the csv data.

Load the third party packages by default: It could be very tedious to load the external package from the command line. You can add the package to the default list, and they will be loaded automatically every time you launch spark-submit. Edit the spark default configuration file via terminal:

[cloudera@quickstart ~]$ sudo gedit $SPARK_HOME/conf/spark-defaults.conf

Add the following line to the file:

spark.driver.extraClassPath /home/cloudera/lib/Spark/commons-csv-1.2.jar:/home/cloudera/lib/Spark/spark-csv_2.10-1.3.0.jar

The highlighted part is the jar files you would like to load by default, separated by ":".

Enjoy Spark!




Jan 28, 2016

Simple aggregation performance comparison: R data.tables vs Python pandas

Motivation

Both R and python are popular tools for data scientist and statisticians. The well-known package data.tables in R and pandas in Python make the data manipulation task much easier and more efficient. Both of them are nowadays well adopted by more and more statistics and data science tools and packages. Most of popular machine learning algorithms are available in both R version and python version.

There is really no way to compare these two tools because the choices that we make really depend our own projects, coding environment, coding habits, tools availability, and even personal preference.
This comparison is just for my own curiosity.

Particularly, I am more interested in their aggregation performance. This is because I often have to run some model to provide predictions for each individual, and the prediction is typically based on the data of each individual. So group and aggregation would be typical operations for me:
  1. Split the big data into multiple pieces each of which presents an individual
  2. Run my own model against each small piece of data
  3. Put the prediction for each individual together in a large data set.
 Both data.tables and pandas can make such job easy, and I observe there is a significant performance difference between the two tools when performing such task. Here I just provide a very simple comparison to highlight the difference.

Test

The test procedure is pretty straightforward:
  1. Create a table with 100K row, and each row has a unique ID (this is not part of test but the data preparation).
  2. Expend each record to 100 rows (totally 10m), and create the 2nd column (called seq)
  3. Calculate the average value of seq for each ID.
The 3rd operation is a typical aggregation to a grouped data. The reason I create the 2nd test step is that, quite often, the output of a individual prediction is not only a value or a flag, but also series of statistics, such as scores, p-values, confidence intervals, and even predicted time series. In other words, the output is another data frame, and the program should be able to concatenate them into a large data set efficiently.

Here is the testing environment:
  • data.tables 1.9.6 with R 3.2.2 (64 bits)
  • pandas 0.16.2 with Python 2.7.10 |Anaconda 2.3.0 (64-bit)

Code and Result

R/data.tables:
> a = data.table(ID = 1:100e3)
> 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.28    0.08    0.36 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.27    0.03    0.29 
> system.time({ b = a[, .(Seq = ID*100+1:100), keyby = ID] })
   user  system elapsed 
   0.20    0.01    0.21 
> 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.10    0.02    0.11 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.09    0.00    0.09 
> system.time({ c = b[, .(m = mean(Seq)), keyby = ID] })
   user  system elapsed 
   0.10    0.00    0.09 

Python/pandas:

import pandas as pd
import numpy as np
import math

def f(d, k):

    return(pd.DataFrame({'Seq': np.arange(k)}))

a = pd.DataFrame({'ID': np.arange(100e3)})
b = a.groupby(a['ID']).apply(f, 100).reset_index()
c = b.groupby(b['ID'])[['Seq']].mean()



In [25]: %timeit b = a.groupby(a['ID']).apply(f, 100).reset_index()
...: %timeit c = b.groupby(b['ID'])[['Seq']].mean()
1 loops, best of 3: 26.7 s per loop

1 loops, best of 3: 421 ms per loop


Clearly, data.tables is more efficient.

One interesting thing is that when I do the 3rd test step (c = b.groupby(b['ID'])[['Seq']].mean()), I use ID as a pandas Series column instead of as an index object. This is because, I found that as the number of operating rows increases, using columns/Series is more efficient than using index. Here is the same test using index:

In [28]: b = a.groupby(a['ID']).apply(f, 100)
In [29]: %timeit c = b.groupby(b.index).mean()
1 loops, best of 3: 29 s per loop



Other Notes

If coding the 2nd testing step in python, we can actually output a 1D series, and do the transpose (stack). This is more efficient than the code above:

def f2(d, k):
    return pd.Series(np.arange(k))
In [32]: %timeit b = a.groupby(a['ID']).apply(f2, 100).stack()
1 loops, best of 3: 11 s per loop


However, we can do this only if the output for each individual record can be written in a 1D array object during the aggregation. If we have to use data frame to store the output for each split of data, the efficiency would be significantly impacted.

Jul 12, 2015

A summary of dplyr and data.table

In R, Both dplyr package and data.table package are powerful tools in manipulating data. dplyr is an enhanced version of plyr package, and data.table is becoming more and more popular in handling the large datasets. Several comparisons can be found online, all indicate that, when there are a small number of groups in the data, dplyr and data.table achieve comparable efficiency; when there are a large number of groups, data.table outperforms dplyr functions.

Here, I would like to give a quick summary of the commonly used functions for the two packages. My personal feeling is that I like dplyr better, because it's interface is more consistent and provide some convenience functions to manipulate columns to which I want to apply functions (I will talk about this in details later).



dplyr
data.tables
Select columns
Select columns by name
data = select(mtcars, mpg, cyl, disp, wt)

var_name = c('mpg', 'cyl', 'disp', 'wt')
data = select_ (mtcars, .dots = var_name)

data = select(mtcars, starts_with("m"))
data = select(mtcars, ends_with("t"))
data = select(mtcars, contains("s"))
# select columns starts with, ends with, or contains some keywords. This is the part I like dplyr slightly better. You can do such things in data.tables as well, but need a little more coding effort.
mtcars.dt = data.table(mtcars)
var_name = c('mpg', 'cyl', 'disp', 'wt')

data.dt = mtcars.dt[, .(mpg, cyl, disp, wt)]
data.dt = mtcars.dt[, var_name, with = F]
# with=F would allow you to do normal data.frame operations in data.tables.
Select columns by index
data = mtcars[, 2:5]
data.dt = mtcars.dt[, 2:5, with = F]
Derived columns

data = mutate(mtcars,
x = mpg/cyl,
mpg_new = x*cyl,
mpg_mean = mean(mpg)
)
# newly added one can be used in later calculation
data.dt = mtcars.dt[, x := mpg/cyl]
# add a new column called x

data.dt = mtcars.dt[, `:=`(x = mpg/cyl, mpg_mean = mean(mpg))][, mpg_new := x*cyl]
# add two new columns, and use chain operator add the third column
Select rows
Select rows by condition

Comments: select rows by condition in data.tables sometimes may not be straightforward
data = filter(mtcars, cyl==4, mpg>=30) # and relation
data = filter(mtcars, cyl==4 | mpg>=30) # or relation
data = filter(mtcars, cyl %in% c(4, 6))
data.dt = mtcars.dt[cyl==4 & mpg>=30]
data.dt = mtcars.dt[cyl==4 & vs==1]
# you don't have to put a commar for the second dimension

setkey(mtcars.dt, cyl, vs)
data.dt = mtcars.dt[list(4, 1)]
data.dt = mtcars.dt[.(4, 1)]
# yo have to use list to select rows with cyl==4 and vs==1; .() is the same as list()

data.dt = mtcars.dt[cyl %in% c(4,6)]
data.dt2 = mtcars.dt[J(c(4,6))]
# the same as above, cyl is either 4 or 6, use must use J() or .(). without that it will select the 4th and 6th row (see below)

data.dt = mtcars.dt[cyl %in% c(4,6) & vs==1]
# cyl is 4 or 6, and vs==1
data.dt2 = mtcars.dt[.(c(6,4), 1)]
# the same as above, .() is the same as J()
Select rows by index
# straightforward
data.dt = mtcars.dt[c(6,4)]
# the sixth and 4th row, NOT cyl== 4 or 6
Aggregation functions
Aggregate n values to 1 value
myfun = function(x,y) return(mean(x/y))

data = mtcars %>% group_by(vs) %>%
summarise(
avg_mpg = mean(mpg),
avg_mpg_per_cyl = mean(mpg/cyl),
avg_mpg_per_cyl2 = myfun(mpg, cyl),
row_cnt = n()
)
myfun = function(x,y) return(mean(x/y))

data.dt = mtcars.dt[, list(
avg_mpg = mean(mpg),
avg_mpg_per_cyl = mean(mpg/cyl),
avg_mpg_per_cyl2 = myfun(mpg, cyl),
row_cnt = .N), by = "vs"]

# you have to use list() to wrap all the aggregation functions.
Window function (n values to n values)
data = mtcars %>% group_by(cyl) %>%
mutate(
lag_mpg = lag(mpg),
mpg_rnk = min_rank(desc(mpg))
) %>% arrange(cyl, mpg)


var_name = c('mpg', 'cyl', 'disp', 'wt')

data = mtcars %>% group_by(cyl) %>%
mutate_each_(funs(lag), var_name) %>%
arrange(cyl)
# apply 1 functions to multiple columns. this will replace the original column by the lag value.

data = mtcars %>% group_by(cyl) %>%
mutate_each_(funs(lag, mean), var_name) %>%arrange(cyl)

# apply 2 functions to multiple columns. this will CREATE additional columns for lag and mean, the origial column is also kept
mtcars.dt = data.table(mtcars)
setkey(mtcars.dt, cyl)
# you don't have to setkey before using 'by='

mtcars.dt = mtcars.dt[, mpg_lag := c(NA, mpg[-.N]), by = cyl]
# data.table way to write lag function

mtcars.dt = data.table(mtcars)
lag_var_name = paste(var_name, "_lag", sep="")
mtcars.dt = mtcars.dt[, (lag_var_name) := lapply(.SD, function(x){c(NA, x[-.N])}), by = cyl, .SDcols = var_name]
# you must use (lag_var_name) instead of lag_var_name. if you use lag_var_name := xxxx, it will create one column called "lag_var_name"
More general (n to m values)
# I don't think you have a direct way to do that. Use “mutate” or “summarise” function, the output has to have n or one variables respectively. You can write a function that returns one list, and dplyr will create a column, with each cell contains a list.

half_list = function(l, na.last = TRUE, ties.method = "first"){
    r = rank(l, na.last =na.last,
                  ties.method = ties.method)
    return(l[r<=length(r)/2])
}

data = mtcars %>% group_by(cyl) %>%
do(min.50pct.mpg = half_list(.$mpg)) %>%
mutate(list.len = length(min.50pct.mpg))
half_list = function(l, na.last = TRUE, ties.method = "first"){
    r = rank(l, na.last =na.last,
                  ties.method = ties.method)
    return(l[r&lt;=length(r)/2])
}

data.dt = mtcars.dt[,
    .(min.50pct.mpg = half_list(mpg),
      max.50pct.mpg = -half_list(-mpg)),
    by=cyl
]

# data.table is more "smart", it will merge the newly created list to the by-variables of the original data table. In the case that multiple lists are created, they must have the same length. Otherwise, it will repeat the shorter lists.

Jun 17, 2015

group_by() and grouped_df

package dplyr is really a must tool in manipulating data. It provide lots of functions close to traditional SQL. If you are a SQL person, you may like this package very much after getting familiar with these functions such as inner_join, left_join, semi_join, and anti_join.

group_by is another commonly used function in dplyr. Combining group_by with n_distinct(), n(), min(), max(), median(), would achieve the those group by functions in SQL.

One point worth to mention is that group_by() return a data type that is essentially a data frame (or data table, depending on your input) but behaves slightly different when combining with other dplyr functions. They are most likely to be grouped_df or grouped_dt. The difference is that when you apply a dplyr function to the grouped_df (or grouped_dt), the function applies to each group, i.e., within the group. For example, apply arrange() to a grouped_df would sort the rows within each group, instead of over the whole dataset. If you want to sort the whole dataset, you need to convert the grouped_df to a regular data frame, using ungroup().

You can find a good intro about dplyr here.

Jul 1, 2014

Aggregate() in R: what if a applied function returns a list?

In R, one of keys in improving the efficiency in data manipulation is to avoid for loops. When we need to apply the same function to all the lists in a data frame, functions like lapply, by, and aggregate are very useful to eliminate for loops. Here is a good summary on these different 'apply-like' functions.

Aggregate function calculates the user-specified summary statistics to each list in the data frame by segmentation. Here is a simple example:

Dat = data.frame(G = c(rep('F',4),rep('M',6)),X1 = 10:1, X2 = 1:10)
Dat.mean = aggregate(.~G, Dat, mean)

> Dat.mean
  G  X1  X2
1 F 8.5 2.5
2 M 3.5 7.5 

It gives the mean of X1 and X2 by G variable, and the output is a data frame. The applied function (mean) in this case return a scale value for each group of X1 and X2 respectively. This is much like the aggregation function in SQL:

select G, avg(X1) as X1_mean, avg(X2) as X2_mean
from Dat group by G

This would work well as long as the function passed to aggregate returns a scale value. What if the applied function return a list? For example, we have a very simple moving average function:

ma = function(x, n) list(as.numeric(filter(x, rep(1/n,n),sides=1)))

Dat.ma = aggregate(.~G, Dat, ma, 3)

Let's see the output data.

> class(Dat.ma)
[1] "data.frame"
> dim(Dat.ma)
[1] 2 3

The output is still a 2*3 data frame. Our moving average time series are actually, as lists, sit in entries of the output data frame:

> Dat.ma[1,]
  G           X1           X2
1 F NA, NA, 9, 8 NA, NA, 2, 3
> Dat.ma[[1,2]]
[1] NA NA  9  8
> class(Dat.ma[1,2])
[1] "list"
> class(Dat.ma[[1,2]])
[1] "numeric" 
> length(Dat.ma[[1,2]])

[1] 4

However, the nested data frame sometimes is not convenient for further data manipulation. Can we organize to the format close to the original data frame (not nested)? Our do.call and by function can help us!

Let's take a look the following function that organizes the output data frame (agg.out) from aggregation function:

agg.output.df = function(agg.out){
  df = function(x){
    r = do.call(data.frame,x)
    colnames(r) = colnames(x)
    r
  }
  
  r = by(agg.out, 1:nrow(agg.out), df)
  r = do.call(rbind, r)
}

do.call function takes one row in agg.out, and organize it into a data frame. by function applies this to each row and generates a list of data frames, and do.call (rbind, r)  then combines all the data frames into one data frame. Here is the output:

> Dat.ma.df = agg.output.df(Dat.ma)
> Dat.ma.df
    G X1 X2
1.1 F NA NA
1.2 F NA NA
1.3 F  9  2
1.4 F  8  3
2.1 M NA NA
2.2 M NA NA
2.3 M  5  6
2.4 M  4  7
2.5 M  3  8
2.6 M  2  9
> dim(Dat.ma.df)
[1] 10  3

Combining those 'apply-like' functions together with do.call function can makes our data manipulation job easier and more efficient.