Sessionization in SQL, Hive, Pig and Python

Why sessionization?

Sessionization is the act of turning event-based data into sessions, the ordered list of a user’s actions in completing a task. It is widely used in several domains, such as:

  • Web analytics. This is the most common use, where a session is composed of a user’s actions during one particular visit to the website. You can think of this as a buying session on a e-business website for example. Using sessions enables you to answer the following questions: what are the most frequent paths to purchase on my website? How do my users get to a specific page? When / why do they leave? Are some acquisition funnels more efficient than others?

  • Trip analytics. Given the GPS coordinates history of a vehicle, you can compute sessions in order to extract the different trips. Each trip can then be labelled distinctly (user going to work, on holidays, …).

  • Predictive maintenance. Here a session can be all information relative to a machine’s behavior (machine working, on idle … etc.) until a change in its assignation.

So given a “user id” or a “machine id”, the question is: how can we recreate sessions? Or more precisely: how do we choose the boundaries of a session? In the above examples, the third one is easy to deal with: a new session is defined when there is a change of assignation. If we look at the two first examples however, we see that we can define a new session if the user has been inactive for a certain amount of time T. So in this case, a session is defined by two things: a user id and a threshold time such as if the next action is in a range of time greater than T, it defines the start of a new session.

This threshold can depend on the web site constraints themselves: for instance, banks close their users’ navigation sessions after around 15 minutes of inaction, and so do the e-business companies.

Note that the two first examples can be seen as a generalization of the third one where we do not have clear boundaries, so we will focus on this type of session creation. For now, we will use this toy data as a support for understanding. Once imported into DSS, the dataset should look like this.

"Toy dataset after import (user_id (text), mytimestamp(Date)"

Note that this dataset is already sorted by time and that there are two different users.

Sessionization in SQL

We are going to assume that your SQL database system supports window functions (otherwise, you may have hard times for doing advanced analytics :-)). In this example, we will be using PostgreSQL.

The objective is to detect on which row there is a new session in the ordered dataset. To do that, we need to calculate the interval between two rows of the same user. The window function lag() does it for us:

      ,  extract(epoch from mytimestamp)
         - lag(extract(epoch from mytimestamp))
         over (PARTITION BY user_id order by mytimestamp) as time_interval
FROM toy_data_psql;

This SQL query outputs a new column which is the difference between the timestamp of the current row and the previous one, by user_id. Notice that the row of the first appearance of a user contains a NULL time interval since the value cannot be calculated.

"Toy dataset including time_interval(Date(needs parsing))"

Based on this interval, we are now going to flag each session of given user. Assuming a new session is defined after 30 minutes of inactivity, we can slightly transform the previous expression into this one:

  , CASE
      WHEN EXTRACT(EPOCH FROM mytimestamp)
           - LAG(EXTRACT(EPOCH FROM mytimestamp))
           OVER (PARTITION BY user_id ORDER BY mytimestamp) >= 30 * 60
      THEN 1
      ELSE 0
    END as new_session

This query creates a boolean column, a value of 1 indicating a new session, 0 otherwise.

Finally, we can just create a cumulative sum over this boolean column to create a session id. To make it easier to visualize, we can concatenate it with the user_id, and then build our final session_id column:

  , user_id || '_' || SUM(new_session)
  OVER (PARTITION BY user_id ORDER BY mytimestamp) AS session_id
    , CASE
       WHEN EXTRACT(EPOCH FROM mytimestamp)
          - LAG(EXTRACT(EPOCH FROM mytimestamp))
            OVER (PARTITION BY user_id ORDER BY mytimestamp) >= 30 * 60
       THEN 1
       ELSE 0
      END as new_session
) s1

We finally have our sessionized data! We would be able to move now towards more advanced analytics and deriving new KPI’s.

"Final sessionized dataset, including new_session(Integer) and session_id(Text)"

Note that this code is enough for this simple example. However, if we had a big partitioned hive table, a simple increment may not be enough (because we would have collision for each partitions). In this case, it is possible to concatenate the user id with the epoch of the first row of the session. This can be done by calculating the first timestamp of each session and join it on the session_id previously calculated.

Sessionization in Hive

If your data is stored in Hadoop (HDFS), and you can use Hive (and a version >= 0.11, where window partitioning functions were introduced), creating sessions will be very similar to the previous example in PostgreSQL. To make it work properly, juste make sure that your timestamp column has been serialized as “Hive” in your DSS dataset, (in Dataset => Settings => Preview).

    , CONCAT(user_id,
        SUM(new_session) OVER (PARTITION BY user_id ORDER BY mytimestamp)
      ) AS session_id
    SELECT *
        , CASE
            WHEN UNIX_TIMESTAMP(mytimestamp)
                 - LAG (UNIX_TIMESTAMP(mytimestamp))
                 OVER (PARTITION BY user_id ORDER BY mytimestamp) >= 30 * 60
            THEN 1
            ELSE 0
          END AS new_session
    FROM toydata_hdfs
) s1

Sessionization in Pig

Still in the Hadoop ecosystem, one can create sessions using Pig. Even if base Pig has plenty of functions, we can recommend using the very nice Datafu collection of user-defined functions (UDF).

In DSS, do not forget to set your dataset’s quoting style to “No escaping nor quoting”. To get rid of the date parsing issues in Pig, make sure to set the Date Serialization Format (in Dataset, Settings, Preview) to “ISO”.

The Pig script below reproduces the previous examples, including setting the limit for new sessions to 30 minutes:

-- register datafu jar and create functions.
REGISTER '/path/to/datafu-1.2.0.jar';

DEFINE Sessionize datafu.pig.sessions.Sessionize('30m');

DEFINE Enumerate datafu.pig.bags.Enumerate('1');

-- Read input DSS dataset
a = DKULOAD 'toy_data_hdfs';

-- format for datafu
b = FOREACH a GENERATE mytimestamp, user_id ;

-- sessionize
c = FOREACH (GROUP b BY user_id) {
     ordered = ORDER b BY mytimestamp ;
     GENERATE FLATTEN(Sessionize(ordered)) AS (mytimestamp, user_id, session_id);

-- order dataset per size to be comparable with other methods
d = ORDER c BY user_id, mytimestamp;

-- Store output as DSS dataset
DKUSTORE d INTO 'toy_data_sessionized';

The final result is different here. We have no increment but a session_id built with a hash. Like previously, it is possible to use this id to create a new one as the concatenation between the user_id and the first unix timestamp in the session.

Sessionization in Python

Finally, let’s build sessions using Python. There are several ways to do it, from a pure Python implementation to reproducing the logic above using the Pandas library, which is what we are going to show here.

import dataiku
import pandas as pd
from datetime import timedelta

# define treshold value
T = timedelta(seconds=30*60)

# load dataset
toy_data = dataiku.Dataset("toy_data").get_dataframe()

# add a column containing previous timestamp
toy_data =  pd.concat([toy_data,
                       toy_data.groupby('user_id').transform(lambda x:x.shift(1))]
toy_data.columns = ['user_id','mytimestamp','prev_mytimestamp']

# create the new session column
toy_data['new_session'] = ((toy_data['mytimestamp']
                            - toy_data['prev_mytimestamp'])>=T).astype(int)

# create the session_id
toy_data['increment'] = toy_data.groupby("user_id")['new_session'].cumsum()
toy_data['session_id'] = toy_data['user_id'].astype(str) + '_'
                                + toy_data['increment'].astype(str)

# to get the same result as with hive/postgresql
toy_data = toy_data.sort(['user_id','mytimestamp'])

The output of this code should give you the sessionized dataset similar to the hive/postgresql example. We can see that the shift function is equivalent to lag in postgresql, and transform to a window function.

By now, you should be able to sessionize your data in your favorite language and derive the KPI’s needed wether you have weblogs, telematics or other kinds of data.