Showing posts with label Flume. Show all posts
Showing posts with label Flume. Show all posts

Friday, February 13, 2015

Implementing Real Time Trending Engine on Clickstream data using Flume and Spark Streaming


In this blog post, I will be explaining one of the approaches for implementing a scalable real time stream processing engine that can provides trend aggregations using clickstream data. This could address use cases where you need the ability to provide real time trends on user activity on a website or mobile app. An engine like this would allow us to compute trend aggregations for scenarios like top products being added to cart or top searches or category pages that were visited. 

A stream processing pipeline and engine should be able to compute aggregations based on a configurable sliding window of data so that trends can be recomputed at set intervals based on sliding window of time (with low latency). I'm going to explain an approach for implementing a trend engine that works on consuming clickstream events using technologies like Apache Flume, Kite Morphlines, Spark Streaming, Rabbitmq, Node.js and Socket.IO. Below is a diagram that shows a high level architecture of the different components of the stack.


STREAM PROCESSING PIPELINE


In this approach, we will use Apache Flume for ingesting the clickstream data from the web clients. The client transmits the events into an Avro Source. The source that is receiving the event passes the event through the source interceptors where the event is transformed into a clickstream avro record using the kite Morphlines ETL framework. You can read my other blog post that talks in detail about how to ingest clickstream data using Flume and process it using Kite Morphines

Once the flume agent has been configured to ingest the clickstream events and processed using Morphines to create avro records that corresponds to a Kite Dataset, the event is fanned out to two different channels. Each channel is connected to a sink which drains the events from their respective channel. In this case, one sink is the HDFS sink which writes the avro records into HDFS as kite dataset that is used for clickstream analytics, while the other channel is configured to connect to a Spark Sink which is a custom flume sink where events get buffered for consumption by the Spark engine. 

Spark Streaming which is an extension of the Spark Core API for large scale stream processing from live datastreams. In this approach, Spark streaming uses a reliable Flume receiver to pull data from the sink and the trend aggregates are computed as a series of small batch programs. The computed trend summaries are then written to a Rabbitmq exchange. Then, a node aggregation process that subscribes to messages from the queue and creates a trend object in JSON. This trend object is then broadcasted to all clients via socket.io. Socket.io is a library that enables real-time bidirectional event based communication via websockets. This way the UI module on the websites get automatically updated with new trend information as they happen without requiring a page refresh or a periodic poll to the server. 

Now, let’s look at the key components in this stack in more detail.In this real-time trending platform, I'm using Apache Spark for building the stream processing engine. 


Apache Spark

Spark is a fast and general purpose cluster computing engine that leverages distributed memory. It has well defined and clean APIs available in multiple languages which can be used to write programs in terms of transformations on distributed datasets. It uses a data abstraction called RDDs which stand for resilient distributed datasets which is a collection of objects spread across a cluster stored in memory. RDDs are built through parallel transformations and can be automatically rebuilt on failure and thus provides linear scalability and fault tolerance. It also seamlessly integrates with the rest of the hadoop ecosystem both in terms of data compatibility and deployments Spark supports different deployment models – it can run on YARN, MESOS or a standalone mode. Plus, it comes with great libraries for machine learning, streaming processing, SQL engine and graph computations.
Apache Spark Architecture

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Flume, Kafka or TCP sockets and can be processed using different kinds of algorithms . Finally, processed data can be pushed out to HDFS or databases. You can also use algorithms from other libraries included in  Spark (like MLlib).
Spark Streaming Dstream
Internally, Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called  DStream, which represents a continuous stream of data. and internally they are represented as a sequence of RDDs.


Spark Sink

In order send the clickstream events to Spark Streaming, we use a custom spark sink. This spark sink uses a pull based approach – where event in the sink gets buffered. Spark streaming uses a reliable flume receiver to pull data from the sink using transactions. The transactions succeed only after the data is received and replicated by spark streaming. This ensures strong reliability and guarantees fault tolerance. In order to configure this custom sink, the custom sink needs to be downloaded and be available on the Flume agent’s classpath. Then in the flume configuration, we specfiy the type to be Sparksink.

The maven artifact details on the Sink is provided here:

Flume Agent Configuration

The Flume Agent has to be configured to listen to the avro source where the clickstream events are emitted and it has be configured to deliver the events to the two sinks via different channels. In addition, the source interceptors (Morphlines in this case) have to be attached to the avroSource.A sample flume agent configuration file is provided below:


clickstreamAgent.channels = mem-channel-hdfs mem-channel-avro
clickstreamAgent.sources = avroSource
clickstreamAgent.sinks = clickstream-dataset clickstream-avro-tier

clickstreamAgent.channels.mem-channel-avro.type = memory
clickstreamAgent.channels.mem-channel-avro.capacity = 10000
clickstreamAgent.channels.mem-channel-avro.transactionCapacity = 10000

clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 10000

clickstreamAgent.sources.avroSource.type = avro
clickstreamAgent.sources.avroSource.channels = mem-channel-hdfs mem-channel-avro
clickstreamAgent.sources.avroSource.bind = $HOST_NAME
clickstreamAgent.sources.avroSource.port = $PORT_NUMBER
clickstreamAgent.sources.avroSource.interceptors = attach-schema morphline

# add the schema for our record sink
clickstreamAgent.sources.avroSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.avroSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.avroSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc

# morphline interceptor config
clickstreamAgent.sources.avroSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.avroSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.avroSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro

# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
# the partitioned directories must match the dataset's partition strategy
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream/year=%Y/month=%m/day=%d/hour=%H
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 10000
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.hdfs.useLocalTimeStamp=true
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

# store the clickstream in the avro sink for the tiered collection 
clickstreamAgent.sinks.clickstream-avro-tier.type = org.apache.spark.streaming.flume.sink.SparkSink
clickstreamAgent.sinks.clickstream-avro-tier.channel = mem-channel-avro
clickstreamAgent.sinks.clickstream-avro-tier.hostname = $SINK_HOST
clickstreamAgent.sinks.clickstream-avro-tier.port = $SINK_PORT
clickstreamAgent.sinks.clickstream-avro-tier.batch-size = 1


Polling Flume Receiver

As I had mentioned earlier, Spark Streaming provides a polling receiver to extract events from Spark Sink. The below code shows how to create a DStream object which contains continuous stream of events from Flume


 // Create the context and set the batch size
 val sparkConf = new SparkConf().setAppName("TrendAnalyzer")
 // Create the Spark StreamingContext for the configured batch interval
 val ssc = new StreamingContext(sparkConf, batchInterval)
  
 // Create a data stream using the polling receiver
 val stream = FlumeUtils.createPollingStream(ssc, host, port)
 val windowStream = stream.window(windowInterval)

Running Spark on YARN

Spark supports pluggable cluster management and can run as either standalone or on YARN or mesos. Running Spark on YARN has several advantages if you already have a Hadoop cluster. One of the main reason is to leverage the same hadoop cluster hardware This leads to better utilization of the cluster and also eliminate the cost of maintaining a separate cluster. Also, we can take advantage of the features of the YARN scheduler for categorizing, isolating and prioritizing workloads. Spark on YARN cluster

Spark can be run in YARN under two modes – a cluster mode and a client mode. The cluster mode is suitable for production deployments where the Spark driver runs inside an application master process which is managed by YARN on the cluster, so that the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. This is useful when you need the spark shell interactivity or for debugging purposes.

A word on Data Serialization

Because of the in-memory nature of most Spark computations, Data Serialization plays an important role in the performance of the application. Spark by default has “Java Serialization” which is very flexible and works with most classes but it is also very slow. Spark also supports Kryo Serialization which uses Kyro library which is very compact and faster than Java Serialization. You can switch to using Kryo by initializing your job with a SparkConf and calling conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). This setting configures the serializer used during both shuffling and also when serializing RDDs to disk. Another requirement for Kyro serializer is to register the class in advance for best performance.The scala snippet here shows how to register the avro classes. This will register the use of Avro's specific binary serialization for the Clickstream class




Thursday, June 12, 2014

Real-time Clickstream Analytics using Flume, Avro, Kite Morphlines and Impala


In this post, I will explain how to capture clickstream logs from webservers and ingest into Hive/Impala in real-time for clickstream analytics. We will ingest clickstream logs in realtime using Flume, transform them using Morphlines and store them as avro records in HDFS, which can be used for analytics in Hive and Impala.

Apache Flume
Apache Flume is a high performance system for data collection which is ideal for real-time log data ingestion. It is horizontally scalable and very extensible as we will soon see below.Flume supports a wide variety of sources to pull data from - I have used the spoolingdirectory source which expects immutable files to placed in a particular directory from where the files are processed. The spooling source guarantees reliability of delivery of the events as long as the files are "immutable and uniquely named".

In my case, the clickstream logs are written on the webserver as csv files. The csv files are rotated every minute and copied to the spooling directory source location by a cron job. The current version of Flume (1.4) does not support recursive polling of directory for the spool source. So, I wrote a shell script that recursively finds .csv files from the source location and flattens them at the destination folder (using the file path as the file name).

Below is a sample shell script that does the job:

#!/bin/sh

TS=`date +%Y%m%d%H%M.%S`

SOURCEDIR=/xx/source

DESTDIR=/xx/destination

touch -t $TS $SOURCEDIR/$TS

# find older logs, flatten and move to destination
for FILE in `find $SOURCEDIR -name "*.csv" ! -newer $SOURCEDIR/$TS`
do
  NEWFILE=`echo $FILE | tr '/' '-'`
  mv $FILE $DESTDIR/$NEWFILE
done

rm $SOURCEDIR/$TS

Avro Format and Schema
Avro defines a data format designed to support data-intensive applications - it provides both RPC and serialization framework. Avro allows for schema evolution in a scalable manner and Hive and Impala both support avro formated records. We need to first create a avro schema file that represents the avro records that we are going to persit. Below is a sample format for reference:

{
 "namespace": "company.schema",
 "type": "record",
 "name": "Clickstream",
 "fields": [
  {"name": "timestamp", "type": "long"},
  {"name": "client_ip", "type": "string"},
  {"name": "request_type", "type": ["null", "string"]},
  {"name": "request_url", "type": ["null", "string"]}
 ]
}

Once the avro schema file is created, copy to the schema to the flume agent
sudo mkdir /etc/flume-ng/schemas
sudo cp clickstream.avsc /etc/flume-ng/schemas/clickstream.avsc

Kite SDK
Kite is a set of libraries and tools (contributed by Cloudera) that aims to making building systems on top of Hadoop easier. One of the module that it provides is a "Dataset" module that provides logical abstractions on top of persitence layer. The kite maven plugin also provides Maven goals for packaging, deploying, and running distributed applications. Please refer here to installation steps for Kite. Once installed, you can use the kite maven command to create the dataset on HDFS

mvn kite:create-dataset -Dkite.rootDirectory=/data/clickstream
  -Dkite.datasetName=clickstream \
  -Dkite.avroSchemaFile=/etc/flume-ng/schemas/clickstream.avsc

CSV Logs to Avro Records
In order to transform the events captured from the csv files into a format that can be used for further processing, we can use Flume Interceptors.

Flume Interceptors provide on-the-fly inspection and modification of events to create a customized data flow for our applications. Once an interceptor is attached to a source, it is invoked on every event as it travels between a source and a channel. In addition, multiple interceptors can be chained together in a sequence (as shown in the below diagram).



In this case, we can use an interceptor to transform the csv file to avro formatted records. The interceptor will be implemented using Morphlines, which is a powerful open source framework for doing ETL transformations in Hadoop applications. A morphline is an in-memory container for transformation commands that are chained together.To transform the records into Avro, we can attache a flume morphline interceptor to the source.

A morphline is defined through a configuration file which contains the commands. Below is a morphline to convert csv to avro records

morphlines: [
  {
    id: convertClickStreamLogsToAvro
    importCommands: ["com.cloudera.**", "org.kitesdk.**" ]
    commands: [

      { tryRules {
    catchExceptions : false
    throwExceptionIfAllRulesFailed : true
    rules : [
   # next rule of tryRules cmd:
   {
     commands : [
     { readCSV: {
     separator : ","
     columns : [timestamp,client_ip,request_type,request_uri]
     trim: true
     charset : UTF-8
     quoteChar : "\""
     } 
    }

    {      
     toAvro {
        
      schemaFile: /etc/flume-ng/schemas/clickstream.avsc
  
     } 
   
    }
    { 
     writeAvroToByteArray: {
        
      format: containerlessBinary
  
     } 
   
    }
   ]
   }
   # next rule of tryRules cmd:
   {
     commands : [
    { dropRecord {} }
     ]
   }
       
  ]
    }
  }

  { logTrace { format : "output record: {}", args : ["@{}"] } }  
    ]
  }


]

Now, copy the morphline file to the flume agent:
sudo cp morphline.conf /etc/flume-ng/conf/morphline.conf
Flume Configuration
The events are forwarded from the source to an HDFS sink through a memory channel.  The HDFS sink writes the events to a file where the path of the file is specified by the configuration. The flume properties file has to be setup to specify the source, channel and sink.

clickstreamAgent.channels = mem-channel-hdfs
clickstreamAgent.sources = spoolSource
clickstreamAgent.sinks = clickstream-dataset

Channels Setup
The below section shows the memory channel setup
clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 1000

Source and Interceptor Setup
The below section shows the source and the two interceptors that are attached to the source:

clickstreamAgent.sources.spoolSource.type = spooldir
clickstreamAgent.sources.spoolSource.channels = mem-channel-hdfs
clickstreamAgent.sources.spoolSource.spoolDir = /xxx/folder
clickstreamAgent.sources.spoolSource.deletePolicy=immediate

clickstreamAgent.sources.spoolSource.interceptors = attach-schema morphline

# add the schema for our record sink
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc

# morphline interceptor config
clickstreamAgent.sources.spoolSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro


Sink Configuration
The below section shows the HDFS sink setup
# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 100
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

Below is the complete configuration file (flume.properties)

clickstreamAgent.channels = mem-channel-hdfs
clickstreamAgent.sources = spoolSource
clickstreamAgent.sinks = clickstream-dataset

clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 1000

clickstreamAgent.sources.spoolSource.type = spooldir
clickstreamAgent.sources.spoolSource.channels = mem-channel-hdfs
clickstreamAgent.sources.spoolSource.spoolDir = /xxx/folder
clickstreamAgent.sources.spoolSource.deletePolicy=immediate

clickstreamAgent.sources.spoolSource.interceptors = attach-schema morphline

# add the schema for our record sink
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc

# morphline interceptor config
clickstreamAgent.sources.spoolSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro

# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 100
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

The configuration can be updated by copying the flume.properties to (/etc/flume-ng/conf) or using Cloudera Manager (Configuration->Default Agent)

Testing
Once all the above setup is done, you can start the flume agent and monitor the logs to see if the events are being processed. After sending some sample logs, you can see the files being written to HDFS (/data/clickstream).

In addition, the data can be queried on Hive and Impala.

[xxx:xxx:21000] > invalidate metadata;
Query: invalidate metadata
Query finished, fetching results ...


[xxx:xxx:21000]> show tables;
Query: show tables
Query finished, fetching results ...
+-------+
| name       |
+-------+
| clickstream|
+-------+
[xxx:xxx:21000]> select client_ip from clickstream;
Query: select client_ip from clickstream

Query finished, fetching results ...+----------------------+
| client_ip            |
+----------------------+
| 127.0.0.1            |
| 0.0.0.0              |
+----------------------+