Thursday, June 12, 2014

Real-time Clickstream Analytics using Flume, Avro, Kite Morphlines and Impala


In this post, I will explain how to capture clickstream logs from webservers and ingest into Hive/Impala in real-time for clickstream analytics. We will ingest clickstream logs in realtime using Flume, transform them using Morphlines and store them as avro records in HDFS, which can be used for analytics in Hive and Impala.

Apache Flume
Apache Flume is a high performance system for data collection which is ideal for real-time log data ingestion. It is horizontally scalable and very extensible as we will soon see below.Flume supports a wide variety of sources to pull data from - I have used the spoolingdirectory source which expects immutable files to placed in a particular directory from where the files are processed. The spooling source guarantees reliability of delivery of the events as long as the files are "immutable and uniquely named".

In my case, the clickstream logs are written on the webserver as csv files. The csv files are rotated every minute and copied to the spooling directory source location by a cron job. The current version of Flume (1.4) does not support recursive polling of directory for the spool source. So, I wrote a shell script that recursively finds .csv files from the source location and flattens them at the destination folder (using the file path as the file name).

Below is a sample shell script that does the job:

#!/bin/sh

TS=`date +%Y%m%d%H%M.%S`

SOURCEDIR=/xx/source

DESTDIR=/xx/destination

touch -t $TS $SOURCEDIR/$TS

# find older logs, flatten and move to destination
for FILE in `find $SOURCEDIR -name "*.csv" ! -newer $SOURCEDIR/$TS`
do
  NEWFILE=`echo $FILE | tr '/' '-'`
  mv $FILE $DESTDIR/$NEWFILE
done

rm $SOURCEDIR/$TS

Avro Format and Schema
Avro defines a data format designed to support data-intensive applications - it provides both RPC and serialization framework. Avro allows for schema evolution in a scalable manner and Hive and Impala both support avro formated records. We need to first create a avro schema file that represents the avro records that we are going to persit. Below is a sample format for reference:

{
 "namespace": "company.schema",
 "type": "record",
 "name": "Clickstream",
 "fields": [
  {"name": "timestamp", "type": "long"},
  {"name": "client_ip", "type": "string"},
  {"name": "request_type", "type": ["null", "string"]},
  {"name": "request_url", "type": ["null", "string"]}
 ]
}

Once the avro schema file is created, copy to the schema to the flume agent
sudo mkdir /etc/flume-ng/schemas
sudo cp clickstream.avsc /etc/flume-ng/schemas/clickstream.avsc

Kite SDK
Kite is a set of libraries and tools (contributed by Cloudera) that aims to making building systems on top of Hadoop easier. One of the module that it provides is a "Dataset" module that provides logical abstractions on top of persitence layer. The kite maven plugin also provides Maven goals for packaging, deploying, and running distributed applications. Please refer here to installation steps for Kite. Once installed, you can use the kite maven command to create the dataset on HDFS

mvn kite:create-dataset -Dkite.rootDirectory=/data/clickstream
  -Dkite.datasetName=clickstream \
  -Dkite.avroSchemaFile=/etc/flume-ng/schemas/clickstream.avsc

CSV Logs to Avro Records
In order to transform the events captured from the csv files into a format that can be used for further processing, we can use Flume Interceptors.

Flume Interceptors provide on-the-fly inspection and modification of events to create a customized data flow for our applications. Once an interceptor is attached to a source, it is invoked on every event as it travels between a source and a channel. In addition, multiple interceptors can be chained together in a sequence (as shown in the below diagram).



In this case, we can use an interceptor to transform the csv file to avro formatted records. The interceptor will be implemented using Morphlines, which is a powerful open source framework for doing ETL transformations in Hadoop applications. A morphline is an in-memory container for transformation commands that are chained together.To transform the records into Avro, we can attache a flume morphline interceptor to the source.

A morphline is defined through a configuration file which contains the commands. Below is a morphline to convert csv to avro records

morphlines: [
  {
    id: convertClickStreamLogsToAvro
    importCommands: ["com.cloudera.**", "org.kitesdk.**" ]
    commands: [

      { tryRules {
    catchExceptions : false
    throwExceptionIfAllRulesFailed : true
    rules : [
   # next rule of tryRules cmd:
   {
     commands : [
     { readCSV: {
     separator : ","
     columns : [timestamp,client_ip,request_type,request_uri]
     trim: true
     charset : UTF-8
     quoteChar : "\""
     } 
    }

    {      
     toAvro {
        
      schemaFile: /etc/flume-ng/schemas/clickstream.avsc
  
     } 
   
    }
    { 
     writeAvroToByteArray: {
        
      format: containerlessBinary
  
     } 
   
    }
   ]
   }
   # next rule of tryRules cmd:
   {
     commands : [
    { dropRecord {} }
     ]
   }
       
  ]
    }
  }

  { logTrace { format : "output record: {}", args : ["@{}"] } }  
    ]
  }


]

Now, copy the morphline file to the flume agent:
sudo cp morphline.conf /etc/flume-ng/conf/morphline.conf
Flume Configuration
The events are forwarded from the source to an HDFS sink through a memory channel.  The HDFS sink writes the events to a file where the path of the file is specified by the configuration. The flume properties file has to be setup to specify the source, channel and sink.

clickstreamAgent.channels = mem-channel-hdfs
clickstreamAgent.sources = spoolSource
clickstreamAgent.sinks = clickstream-dataset

Channels Setup
The below section shows the memory channel setup
clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 1000

Source and Interceptor Setup
The below section shows the source and the two interceptors that are attached to the source:

clickstreamAgent.sources.spoolSource.type = spooldir
clickstreamAgent.sources.spoolSource.channels = mem-channel-hdfs
clickstreamAgent.sources.spoolSource.spoolDir = /xxx/folder
clickstreamAgent.sources.spoolSource.deletePolicy=immediate

clickstreamAgent.sources.spoolSource.interceptors = attach-schema morphline

# add the schema for our record sink
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc

# morphline interceptor config
clickstreamAgent.sources.spoolSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro


Sink Configuration
The below section shows the HDFS sink setup
# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 100
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

Below is the complete configuration file (flume.properties)

clickstreamAgent.channels = mem-channel-hdfs
clickstreamAgent.sources = spoolSource
clickstreamAgent.sinks = clickstream-dataset

clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 1000

clickstreamAgent.sources.spoolSource.type = spooldir
clickstreamAgent.sources.spoolSource.channels = mem-channel-hdfs
clickstreamAgent.sources.spoolSource.spoolDir = /xxx/folder
clickstreamAgent.sources.spoolSource.deletePolicy=immediate

clickstreamAgent.sources.spoolSource.interceptors = attach-schema morphline

# add the schema for our record sink
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc

# morphline interceptor config
clickstreamAgent.sources.spoolSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro

# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 100
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

The configuration can be updated by copying the flume.properties to (/etc/flume-ng/conf) or using Cloudera Manager (Configuration->Default Agent)

Testing
Once all the above setup is done, you can start the flume agent and monitor the logs to see if the events are being processed. After sending some sample logs, you can see the files being written to HDFS (/data/clickstream).

In addition, the data can be queried on Hive and Impala.

[xxx:xxx:21000] > invalidate metadata;
Query: invalidate metadata
Query finished, fetching results ...


[xxx:xxx:21000]> show tables;
Query: show tables
Query finished, fetching results ...
+-------+
| name       |
+-------+
| clickstream|
+-------+
[xxx:xxx:21000]> select client_ip from clickstream;
Query: select client_ip from clickstream

Query finished, fetching results ...+----------------------+
| client_ip            |
+----------------------+
| 127.0.0.1            |
| 0.0.0.0              |
+----------------------+

2 comments:

  1. Hi Thanigai,

    thanks for the post!! It help me a lot :)
    I have two notes:
    1) Kite SDK: it is now possible to use Kite SDK CLI, so in your example could be now:
    kite-dataset create dataset:hdfs:/data/clickstream --schema /etc/flume-ng/schemas/clickstream.avsc
    2) CSV Logs to Avro Records: a little mistake/error in morphline.conf ("i" instead of "l") because you wrote request_uri but in your clickstream.avsc is specified request_url. The right line in morphline.conf could be:
    columns : [timestamp,client_ip,request_type,request_url]

    Thanks again,
    Best regards

    ReplyDelete