In this blog post, I will be explaining one of the approaches for implementing a
scalable real time stream processing engine that can provides trend aggregations using clickstream data. This could address use cases where you need the ability to provide real time trends on
user activity on a website or mobile app. An engine like this would allow us to compute trend aggregations for scenarios like top products being added to cart or top
searches or category pages that were visited.
A stream processing pipeline and engine should be able to compute aggregations based on a configurable sliding window of data so that trends can be recomputed at set intervals based on sliding window of time (with low latency). I'm going to explain an approach for implementing a trend engine that works on consuming clickstream events using technologies like Apache Flume, Kite Morphlines, Spark Streaming, Rabbitmq, Node.js and Socket.IO. Below is a diagram that shows a high level architecture of the different components of the stack.
In this approach, we will use Apache Flume for ingesting the clickstream data from the web clients. The client transmits the events into an
Avro Source. The source that is receiving the event passes the event through
the source interceptors where the event is transformed into a clickstream avro
record using the kite Morphlines ETL framework. You can read my other blog post that talks in detail about how to ingest clickstream data using Flume and process it using Kite Morphines.
Once the flume agent has been configured to ingest the clickstream events and processed using Morphines to create avro records that corresponds to a Kite Dataset, the event is fanned out to two different channels. Each channel is connected to a sink which drains the events from their respective channel. In this case, one sink is the HDFS sink which writes the avro records into HDFS as kite dataset that is used for clickstream analytics, while the other channel is configured to connect to a Spark Sink which is a custom flume sink where events get buffered for consumption by the Spark engine.
Spark Streaming which is an extension of the Spark Core API for large scale stream processing from live datastreams. In this approach, Spark streaming uses a reliable Flume receiver to pull data from the sink and the trend aggregates are computed as a series of small batch programs. The computed trend summaries are then written to a Rabbitmq exchange. Then, a node aggregation process that subscribes to messages from the queue and creates a trend object in JSON. This trend object is then broadcasted to all clients via socket.io. Socket.io is a library that enables real-time bidirectional event based communication via websockets. This way the UI module on the websites get automatically updated with new trend information as they happen without requiring a page refresh or a periodic poll to the server.
Now, let’s look at the key components in this stack in more detail.In this real-time trending platform, I'm using Apache Spark for building the stream processing engine.
Internally, Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called DStream, which represents a continuous stream of data. and internally they are represented as a sequence of RDDs.
The maven artifact details on the Sink is provided here:
Spark can be run in YARN under two modes – a cluster mode and a client mode. The cluster mode is suitable for production deployments where the Spark driver runs inside an application master process which is managed by YARN on the cluster, so that the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. This is useful when you need the spark shell interactivity or for debugging purposes.
A stream processing pipeline and engine should be able to compute aggregations based on a configurable sliding window of data so that trends can be recomputed at set intervals based on sliding window of time (with low latency). I'm going to explain an approach for implementing a trend engine that works on consuming clickstream events using technologies like Apache Flume, Kite Morphlines, Spark Streaming, Rabbitmq, Node.js and Socket.IO. Below is a diagram that shows a high level architecture of the different components of the stack.
STREAM PROCESSING PIPELINE
Once the flume agent has been configured to ingest the clickstream events and processed using Morphines to create avro records that corresponds to a Kite Dataset, the event is fanned out to two different channels. Each channel is connected to a sink which drains the events from their respective channel. In this case, one sink is the HDFS sink which writes the avro records into HDFS as kite dataset that is used for clickstream analytics, while the other channel is configured to connect to a Spark Sink which is a custom flume sink where events get buffered for consumption by the Spark engine.
Spark Streaming which is an extension of the Spark Core API for large scale stream processing from live datastreams. In this approach, Spark streaming uses a reliable Flume receiver to pull data from the sink and the trend aggregates are computed as a series of small batch programs. The computed trend summaries are then written to a Rabbitmq exchange. Then, a node aggregation process that subscribes to messages from the queue and creates a trend object in JSON. This trend object is then broadcasted to all clients via socket.io. Socket.io is a library that enables real-time bidirectional event based communication via websockets. This way the UI module on the websites get automatically updated with new trend information as they happen without requiring a page refresh or a periodic poll to the server.
Now, let’s look at the key components in this stack in more detail.In this real-time trending platform, I'm using Apache Spark for building the stream processing engine.
Apache Spark
Spark is a fast and general purpose cluster computing engine that leverages distributed memory. It has well defined and clean APIs available in multiple languages which can be used to write programs in terms of transformations on distributed datasets. It uses a data abstraction called RDDs which stand for resilient distributed datasets which is a collection of objects spread across a cluster stored in memory. RDDs are built through parallel transformations and can be automatically rebuilt on failure and thus provides linear scalability and fault tolerance. It also seamlessly integrates with the rest of the hadoop ecosystem both in terms of data compatibility and deployments Spark supports different deployment models – it can run on YARN, MESOS or a standalone mode. Plus, it comes with great libraries for machine learning, streaming processing, SQL engine and graph computations.Spark Streaming
Spark Streaming is an extension of the core Spark API that enables scalable, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Flume, Kafka or TCP sockets and can be processed using different kinds of algorithms . Finally, processed data can be pushed out to HDFS or databases. You can also use algorithms from other libraries included in Spark (like MLlib).Internally, Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called DStream, which represents a continuous stream of data. and internally they are represented as a sequence of RDDs.
Spark Sink
In order send the clickstream events to Spark Streaming, we use a custom spark sink. This spark sink uses a pull based approach – where event in the sink gets buffered. Spark streaming uses a reliable flume receiver to pull data from the sink using transactions. The transactions succeed only after the data is received and replicated by spark streaming. This ensures strong reliability and guarantees fault tolerance. In order to configure this custom sink, the custom sink needs to be downloaded and be available on the Flume agent’s classpath. Then in the flume configuration, we specfiy the type to be Sparksink.The maven artifact details on the Sink is provided here:
Flume Agent Configuration
The Flume Agent has to be configured to listen to the avro source where the clickstream events are emitted and it has be configured to deliver the events to the two sinks via different channels. In addition, the source interceptors (Morphlines in this case) have to be attached to the avroSource.A sample flume agent configuration file is provided below:
clickstreamAgent.channels = mem-channel-hdfs mem-channel-avro
clickstreamAgent.sources = avroSource
clickstreamAgent.sinks = clickstream-dataset clickstream-avro-tier
clickstreamAgent.channels.mem-channel-avro.type = memory
clickstreamAgent.channels.mem-channel-avro.capacity = 10000
clickstreamAgent.channels.mem-channel-avro.transactionCapacity = 10000
clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 10000
clickstreamAgent.sources.avroSource.type = avro
clickstreamAgent.sources.avroSource.channels = mem-channel-hdfs mem-channel-avro
clickstreamAgent.sources.avroSource.bind = $HOST_NAME
clickstreamAgent.sources.avroSource.port = $PORT_NUMBER
clickstreamAgent.sources.avroSource.interceptors = attach-schema morphline
# add the schema for our record sink
clickstreamAgent.sources.avroSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.avroSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.avroSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc
# morphline interceptor config
clickstreamAgent.sources.avroSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.avroSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.avroSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro
# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
# the partitioned directories must match the dataset's partition strategy
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream/year=%Y/month=%m/day=%d/hour=%H
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 10000
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.hdfs.useLocalTimeStamp=true
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
# store the clickstream in the avro sink for the tiered collection
clickstreamAgent.sinks.clickstream-avro-tier.type = org.apache.spark.streaming.flume.sink.SparkSink
clickstreamAgent.sinks.clickstream-avro-tier.channel = mem-channel-avro
clickstreamAgent.sinks.clickstream-avro-tier.hostname = $SINK_HOST
clickstreamAgent.sinks.clickstream-avro-tier.port = $SINK_PORT
clickstreamAgent.sinks.clickstream-avro-tier.batch-size = 1
Polling Flume Receiver
As I had mentioned earlier, Spark Streaming provides a polling receiver to extract events from Spark Sink. The below code shows how to create a DStream object which contains continuous stream of events from Flume // Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("TrendAnalyzer")
// Create the Spark StreamingContext for the configured batch interval
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a data stream using the polling receiver
val stream = FlumeUtils.createPollingStream(ssc, host, port)
val windowStream = stream.window(windowInterval)
Running Spark on YARN
Spark supports pluggable cluster management and can run as either standalone or on YARN or mesos. Running Spark on YARN has several advantages if you already have a Hadoop cluster. One of the main reason is to leverage the same hadoop cluster hardware This leads to better utilization of the cluster and also eliminate the cost of maintaining a separate cluster. Also, we can take advantage of the features of the YARN scheduler for categorizing, isolating and prioritizing workloads.Spark can be run in YARN under two modes – a cluster mode and a client mode. The cluster mode is suitable for production deployments where the Spark driver runs inside an application master process which is managed by YARN on the cluster, so that the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. This is useful when you need the spark shell interactivity or for debugging purposes.
Hi, I have am trying to create a flow with flume and the spark-sink. I have a 7 node cluster with flume-server on 4-7 and spark installed on node2. What confuses me, is what hostname do i configure for the spark-sink in flume and subsequently what hostname should i pass to sparks createPollingStream? I was thinking localhost for both flume and spark, which would create a 1:1 ratio of flume-servers to spark receivers, but the documentation doesn't seem to explain it this way. --thanks
ReplyDeleteIf you are using the pull-based custom sink, you have to configure the flume properties to use the host where the custom sink runs locally - then spark streaming receiver needs to be configured to receive the data from that server:port
ReplyDeleteSo from what your saying, and from how the docs read, the spark-sink should only run on 1 of my 4 flume nodes and the other 3 flume nodes should be configured to push data to that spark-sink node via avro? Meaning there would be 2 separate flume configurations, 1 with the spark-sink and an avro-source, and the other 3 with their normal jms-source and an avro-sink for pushing to the 1 spark-sink node. Seems like this would create a bottleneck on this 1 node.
ReplyDeleteThe spark sinks can run on all flume nodes and each one should be configured to write local to that channel - then, in Spark streaming, you need to pass all the server addresses when calling createPollingStream (https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/streaming/flume/FlumeUtils.html) as this method takes an array of InetSocketAddress. You can also create a LB layer if you dont want to pass multiple addresses in your code
ReplyDeleteAh ok. Man, the Spark docs sure set you up for some headaches with their example for this showing a single hostname as the method param:
ReplyDeleteval flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
thanks!
Something still isn't right, because i set the flume spark-sink to localhost:10000 which caused all 4 flume-servers(nodes 4-7) to come up fine and listening on their local port 10000. I then start spark by passing in an array of those same 4 flume-server ips with port 10000, and spark repeatedly gives Connection refused: node4/10.10.40.74:10000. I check that ports are free before each run.
ReplyDeletehey...i got the same issue that i use FlumeUtils.createPollingStream Connection refused .did u work it out?
Deleteare you running using the spark-submit script with yarn (client or cluster mode)
ReplyDeleteYes, client.
ReplyDeleteI get the same error via yarn-cluster mode.
ReplyDeleteHow are you getting variable subsitution accomplished such as $HOST_NAME in your above flume conf. Again, thanks for the help.
ReplyDeleteIf you are using CDH, you can set it via Cloudera Manager (under Flume->Configuration->Flume Service Environment Advanced Configuration Snippet (Safety Valve) and set all environment variables
ReplyDeleteHi
ReplyDeleteCan you let me know how does the clickstream data comes from the website directly to an avro source?It logs info on the webserver as far as i know.
This is one awesome blog article. Much thanks again Java online training Bangalore
ReplyDelete