Showing posts with label Cloudera Search. Show all posts
Showing posts with label Cloudera Search. Show all posts

Tuesday, February 17, 2015

Configuring Cloudera Search to use local directory indexes

If you are using Cloudera Search, the SOLR collections that are created using Solrctl commands create collections that are backed in HDFS (org.apache.solr.core.HdfsDirectoryFactory) by default. If you want certain collections to use local indexes instead, then you can do the following:

1. Log in to one of the SOLR nodes and generate a new instance directory locally using the solrctl command:


solrctl instancedir —generate collectionConfigDir

The above command will generate the folder 'collectionConfigDir' with default configuration
2. Open the solrconfig.xml in the editor and make the following changes:

  • Comment out the section related to <directoryFactory> - The default generated config uses the HDFSDirectoryFactory. We can add a new section for the directoryFactory to use solr.NRTCachingDirectoryFactory


<directoryFactory name="DirectoryFactory" class="solr.NRTCachingDirectoryFactory">
</directoryFactory>

<!--  <directoryFactory name="DirectoryFactory" class="org.apache.solr.core.HdfsDirectoryFactory">
    <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
    <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
    <str name="solr.hdfs.security.kerberos.enabled">${solr.hdfs.security.kerberos.enabled:false}</str>
    <str name="solr.hdfs.security.kerberos.keytabfile">${solr.hdfs.security.kerberos.keytabfile:}</str>
    <str name="solr.hdfs.security.kerberos.principal">${solr.hdfs.security.kerberos.principal:}</str>
    <bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
    <int name="solr.hdfs.blockcache.slab.count">${solr.hdfs.blockcache.slab.count:1}</int>
    <bool name="solr.hdfs.blockcache.direct.memory.allocation">${solr.hdfs.blockcache.direct.memory.allocation:true}</bool>
    <bool name="solr.hdfs.blockcache.direct.memory.allocation">false</bool>
    <int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:16384}</int> 
    <bool name="solr.hdfs.blockcache.read.enabled">${solr.hdfs.blockcache.read.enabled:true}</bool>
    <bool name="solr.hdfs.blockcache.write.enabled">${solr.hdfs.blockcache.write.enabled:true}</bool>
    <bool name="solr.hdfs.nrtcachingdirectory.enable">${solr.hdfs.nrtcachingdirectory.enable:true}</bool>
    <int name="solr.hdfs.nrtcachingdirectory.maxmergesizemb">${solr.hdfs.nrtcachingdirectory.maxmergesizemb:16}</int>
    <int name="solr.hdfs.nrtcachingdirectory.maxcachedmb">${solr.hdfs.nrtcachingdirectory.maxcachedmb:192}</int>
  </directoryFactory> 
-->
  • Specify the solr data directory to use a local directory on the server. Also, ensure that the folder exists with required write permissions for the user running solr.

<!-- <dataDir>${solr.data.dir:}</dataDir>-->
<dataDir>/solr</dataDir>

  •  Change the lock type to use 'simple' instead of 'hdfs', which is set using this property 'solr.lock.type'

      <!--<lockType>${solr.lock.type:hdfs}</lockType>-->
 <lockType>simple</lockType>
3. Once these changes are done, then the instance directory can be uploaded as a new configuration to zookeeper using the following command

solrctrl instancedir --create collectionConfig collectionConfigDir

4. Now, the new collection can be created which will start using the local directory for storing the indexes.

solrctl collection --create newCollection -c collectionConfig -s 1 -r 2

The above command will create a collection (with 1 shard and 2 replicas) that will use local directory (/solr) for storing the indexes

Sunday, February 15, 2015

Near Line Search Indexing using Hbase, Lily Indexer and Cloudera Search (SOLR)

In this blog post, I will be explaining how to implement near "Large Scale Near Line Search Indexing" using HBase and SOLR. This is a common scenario that is applicable in different use cases where you have large volumes (and on-going) of content updates that needs to be pushed in to live search indexes and the content should be instantly (near real-time) searchable. In addition, the content that needs to be stored and indexed might be unstructured or could be large enough that existing RDBMS solutions would not scale. 

In such a situation, the master content could reside in a distributed key value columnar NOSQL store like HBase which provides horizontal scalability. For example, let's say that you have a high throughput website which gets a lot of new user generated content that you want to make it searchable. In these cases, you need a store that can support massive scale and also allows for flexible schema evolution. HBase lends itself well for these kind of usecases as it offers a distributed solution with extreme scalability and high reliability.

In this implementation, we will use Lily HBase Indexer to transform incoming HBase mutations to SOLR documents and update live SOLR indexes.

About Lily HBase Indexer


The Lily HBase Indexer Service is a fault tolerant system for processing a continuous stream of HBase cell updates into live search indexes. It works by acting as an HBase replication sink. As updates are written to HBase region servers, it is written to the Hlog (WAL) and HBase replication continuously polls the HLog files to get the latest changes and they are "replicated" asynchronously to the HBase Indexer processes. The indexer analyzes then incoming HBase cell updates, and it creates Solr documents and pushes them to SolrCloud servers.

Lily Hbase Indexer

The configuration information about indexers is stored in ZooKeeper. So, new indexer hosts can always be added to a cluster and it enables horizontal scalability

Installing and Configuring Lily Hbase Indexer

Installing the lily Hbase indexer in your cluster is very easy. If you are using a distribution like Cloudera (CDH), the Lily Hbase indexer is already included in the CDH parcels. If not, then the package can be downloaded, built and installed from Lily Hbase Indexer website. Please check here for installation steps if you want to do it manually.

Configuring Hbase Indexer

The Lily hbase indexer services provides a command line utility that can be used to add, list,  update and delete indexer configurations. The command shown below registers and adds a indexer configuration to the Hbase Indexer. This is done by passing an index configuration XML file alsong with the zookeeper ensemble information used for Hbase and SOLR and the solr collection. 

Hbase-indexer add-indexer
--name search_indexer
--indexer-conf /.search-indexer.xml
--connection-param solr.zk=ZK_HOST/solr
--connection-param solr.collection=search_meta
--zookeeper ZK_HOST:2181

The XML configuration file provides the option to specify the Hbase table which needs to be replicated and a mapper. Here we use morphline framework again to transform the columns in the hbase table to SOLR fields and we pass the morphline file which has the pipeline of commands to do the transformation

<indexer table=“search_meta” mapper="com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper" mapping-type="row" unique-key-field="id" row-field="keyword">
<param name="morphlineFile" value="morphlines.conf"/>
</indexer>

The Param name=”morphlineFile” specifies the location of the Morphlines configuration file. The location could be an absolute path of your Morphlines file, 

Enabling HBase Replication

Since the HBase Indexer works by acting as a Replication Sink, we need to make sure that Replication is enabled in HBase. You can activate replication using Cloudera Manager by clicking HBase Service->Configuration->Backup and ensuring “Enable HBase Replication” and “Enable Indexing” are both checked.

In addition, we have to make sure that the column family in the HBase table that needs to be replicated must have replication enabled. This can be done by ensuring that the REPLICATION_SCOPE flag is set while the column family is created, as shown below:

create ‘searchTbl’, {NAME => ‘meta’, REPLICATION_SCOPE => 1}


About Morphlines

Morphline provides a simple ETL framework for Hadoop applications. A morphline is a configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. Morphlines can be seen as an evolution of Unix pipelines where the data model works with streams of generic records defined by transformation commands based on a config file. Since Morphlines is a java library, it can also be embedded in any Java codebase. Here, we use it to transform the HBase cell updates and map them to fields in SOLR.

Using Morphines in HBase Indexer

I’m just showing here a very basic morphline file where the first command is extractHbaseCells which is a morphline command that extracts cells from an HBase Result and transforms the values into a SolrInputDocument. The command consists of an array of zero or more mapping specifications.. We can list an array of such mappings here. The parameters for the mapping are:
  • inputColumn – which specifies columns to which to subscribe
  • outputFied – the name of the field where the data is sent
  • type – the type of the field
  • source –  value specifies that the cell value should be indexed
The second command is to sanitizeunknown fields from being written to SOLR . The mapper that we used MorphlineResultToSolrMapper has the implementation to write the morphline fields into SOLR documents


Morphline in Hbase Indexer

Morphline support for Avro

Morphline comes with some handy utilities for reading and writing Avro formatted objects. This can be combined with the extractHbaseCells command to transform a kite avro formatted dataset persisted in Hbase as byte arrays. The readAvroContainer command Parses an Apache Avro binary container and emits a morphline record for each contained Avro datum.

The Avro schema that was used to write the Avro data is retrieved from the Avro container.  The byte array is read from the first attachment of the input record. We can then use the extractAvroPaths command to extract specific values from an Avro object as shown in the example below:


Morphline Avro Hbase Indexer

The above snippet shows how you can transform avro formatted data written in HBase and map them to SOLR fields.

Setting up SOLR schema

In order to send the HBase mutations to fields in SOLR documents, we have to make sure the following setup is done:
  • Setup the collection in SolrCloud or Cloudera Search
  • Setup the field names in schema.xml


You can check this link to read on understanding how solr documents are modeled.

Once the solr collections are created, the Hbase indexer process can be started and you can see the Hbase mutations replicated and live solr indexes get updated (in near real time).

Sunday, June 15, 2014

Performance Tuning and Optimization for Cloudera Search


In this post, I will explain some of the key factors that should be considered while optimizing performance (for querying and indexing) on implementations using Cloudera Search. Since Cloudera Search is built on top of SolrCloud, most of the performance considerations that are applicable for SOLR applications are still applicable for Cloudera Search. Please refer to this link for an overview of the main factors that affect SOLR performance (in general).

Below are some of the main factors that I found useful while optimizing performance on Cloudera Search

Block Caching
As we know, Cloudera Search uses HDFS filesystem to store the indexes. In order to optimize the performance, HDFS Block cache option is available. The block cahce works by caching the HDFS index blocks in JVM direct memory. Block cache can be enabled in solrconfig.xml. Below are the key parameters involved in tuning the block cache:

1. Enable Block Cache - by setting solr.hdfs.blockcache.enabled  to be true. Once block cache is enabled, the read and write caches can be enabled/disabled separately through the following settings:
  • solr.hdfs.blockcache.read.enabled
  • solr.hdfs.blockcache.write.enabled
 There is a known issue with enabling block cache writing which may lead to irrevocable corrupt indexes. So, it is very important that this is disabled by setting solr.hdfs.blockcache.read.enabled to be false.

2. Configure Memory Slab Count - The slab count determines the number of memory slabs to allocate, where each slab is 128 MB. Allocate the slab count to a sufficiently higher number that is required for the specific application (based on schema and query access patterns)
This is done by setting the solr.hdfs.blockcache.slab.count parameter

3.  Enable Global Block Cache - Enabling the global block cache would allow multiple solrcores on the same node to share a global HDFS block cache. It is done by setting solr.hdfs.blockcache.global to be true

4. NRTCachingDirectory - If using the Near Real-Time (NRT) setup, then enabling the NRTCachingDirectory (solr.hdfs.nrtcachingdirectory.enable) and tuning the maxCachedmb (solr.hdfs.nrtcachingdirectory.maxcachedmb) and merge size (solr.hdfs.nrtcachingdirectory.maxmergesizemb) thresholds also help.

Below is the xml section (in solrconfig.xml) with the above parameters:


<directoryFactory name="DirectoryFactory" class="solr.HdfsDirectoryFactory">
  <bool name="solr.hdfs.blockcache.enabled">true</bool>
  <bool name="solr.hdfs.blockcache.read.enabled">true</bool> 
  <bool name="solr.hdfs.blockcache.write.enabled">false</bool>
  <int name="solr.hdfs.blockcache.slab.count">100</int>
  <bool name="solr.hdfs.blockcache.direct.memory.allocation">true</bool>
  <int name="solr.hdfs.blockcache.blocksperbank">16384</int>
  <bool name="solr.hdfs.nrtcachingdirectory.enable">true</bool>
  <int name="solr.hdfs.nrtcachingdirectory.maxmergesizemb">64</int>
  <int name="solr.hdfs.nrtcachingdirectory.maxcachedmb">1024</int>
</directoryFactory>


SolrCloud Caching
The caching related settings play a major role in influencing the query performance/response time. Since SOLR caching are based on the Index Searches, the caches are available as long as the searchers are valid. Solr has support for FastLRUCache which has faster reads which can be set via (Solr.FastLRUCache) in  solrconfig.xml file. There are 3 types of caching available in SOLR

FilterCache - This cache stores unordered sets of document IDs matching the queries. It stores the results of any filter queries (fq) by executing and caching each filter separately. Setting this cache to sufficiently higher value helps in caching commonly used filter queries. Below is a sample configuration for the filter cache:

QueryResultCache - This cache stores the top N results of a query. Since it stores only the document IDs returned by the query, the memory usage of this cache is less compared to that of the filterCache.

DocumentCache - This cache stores the Lucene Document objects that have been fetched from disk. The size of the documentCache memory is dependent on the number and type of fields stored in the document.

Below is a sample configuration setting for the above three cache types:

<filterCache class="solr.FastLRUCache"
                 size="20000"
                 initialSize="5000"
                 autowarmCount="1000"/>

<queryResultCache class="solr.FastLRUCache"
                     size="10000"
                     initialSize="5000"
                     autowarmCount="1000"/>

<documentCache class="solr.FastLRUCache"
                   size="5000"
                   initialSize="2000"
                   autowarmCount="1000"/>

Disable Swapping
As with other hadoop systems, it is recommended to disable Linux swapping on all solr nodes.

To get more details on tuning Cloudera Search, please refer to this link

Friday, June 13, 2014

Handling OP_READ_BLOCK and File Not Found error in Cloudera Search

While using Cloudera Search in a (Near Real-Time) NRT scenario, occasionally during periods of high writes (index updates), I see the following error thrown on the solr logs

Failed to connect to /0.0.0.0:50010 for file /solr/collectionName/core_node1/data/index.20140528010609412/_zsae_Lucene41_0.doc for block BP-658256793-10.130.36.84-1390532185717:blk_-197867398637806450_45226458:java.io.IOException: Got error for OP_READ_BLOCK,​ self=/0.0.0.0:42978,​ remote=/0.0.0.0:50010,​ for file /solr/collectionName/core_node1/data/index.20140528010609412/_zsae_Lucene41_0.doc,​ for pool BP-658256793-10.130.36.84-1390532185717 block -197867398637806450_45226458

DFS chooseDataNode: got # 1 IOException,​ will wait for 1064.4631017325812 msec.

java.io.FileNotFoundException: File does not exist: /solr/collectionName/core_node1/data/index.20140528010609412/_zsae_Lucene41_0.doc

On troubleshooting further, the logs on datanode side show the following READ_BLOCK errors
DatanodeRegistration(<serverip>, storageID=DS-1520167466-0.0.0.0-50010-1389140294030, infoPort=50075, ipcPort=50020, storageInfo=lv=-40;cid=cluster25;nsid=1124238169;c=0):Got exception while serving BP-1661432518-0.0.0.0-1389140286721:blk_5701049520037157281_7691487 to /ip:45293
org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException: Replica not found for BP-1661432518-10.130.71.219-1389140286721:blk_5701049520037157281_7691487
 at org.apache.hadoop.hdfs.server.datanode.BlockSender.getReplica(BlockSender.java:382)
 at org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:193)
 at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:326)
 at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:92)
 at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:64)
 at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
 at java.lang.Thread.run(Thread.java:662)
node1:50010:DataXceiver error processing READ_BLOCK operation  src: /0.0.0.0:45293 dest: /0.0.0.0:50010
org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException: Replica not found for BP-1661432518-0.0.0.0-1389140286721:blk_5701049520037157281_7691487
 at org.apache.hadoop.hdfs.server.datanode.BlockSender.getReplica(BlockSender.java:382)
 at org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:193)
 at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:326)
 at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:92)
 at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:64)
 at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
 at java.lang.Thread.run(Thread.java:662)
This issue is related to this bug (https://issues.apache.org/jira/browse/SOLR-5693), where HDFS file merging does not work correctly with NRT search. This would lead to search failure with file not found exceptions. Again occurrence of the error depends on the velocity of the data changes. The issue has been backported to Cloudera Search and is supposed to be fixed with the next release of Search.

In versions 1.2 and below, a workaround to minimize the occurence of this failure is to do more frequent commits (reduce the autocommit frequency)

In the event of a catastrophic failure, when the replica goes down because of this issue and does NOT recover, you might have to manually copy over the index files on HDFS from another replica which is up and running. The error log specifies the actual index files that are missing because of the merge.

The index data files are usually stored in HDFS under /solr/{collection-name}/{core_node#}/data

There are two files in this directory which are important in identifying the state of the index:

  • index.properties - this file shows the current active index folder something like this:

#index.properties
#Wed Apr 30 23:57:38 PDT 2014
index=index.20140430235104131

  • replication.properties - this file shows replication state of the core

#Replication details
#Fri May 02 15:57:16 PDT 2014
previousCycleTimeInSeconds=486
indexReplicatedAtList=1399071436498,1399052241128,1398927458820,1398833578758,1398833453263,1398799104838
indexReplicatedAt=1399071436498
timesIndexReplicated=22
lastCycleBytesDownloaded=6012236791

I have used the "File Browser" application inside "Hue" to browse the files to get a  sense of where the files are located and which ones are missing. Below are the steps to fix the issue:

  1. Identify the core (eg: core_node1) and the file(s) missing from the logs
  2. Stop the solr process on the node with the failure. You can use Cloudera Manager to do this.
  3. Copy the index folder (index.xxxxx)from the replica that is working to the failed node. The copy can be done by either:

  • Using "hadoop fs -cp" command
  • Using Hue application.
     4. Copy the index.properties and replication.properties file.
     5. Restart the service.

In both cases, the copy operation has to be done in the context of a user (eg: hdfs) which has write permissions on the HDFS location.

Thursday, June 12, 2014

Real-time Clickstream Analytics using Flume, Avro, Kite Morphlines and Impala


In this post, I will explain how to capture clickstream logs from webservers and ingest into Hive/Impala in real-time for clickstream analytics. We will ingest clickstream logs in realtime using Flume, transform them using Morphlines and store them as avro records in HDFS, which can be used for analytics in Hive and Impala.

Apache Flume
Apache Flume is a high performance system for data collection which is ideal for real-time log data ingestion. It is horizontally scalable and very extensible as we will soon see below.Flume supports a wide variety of sources to pull data from - I have used the spoolingdirectory source which expects immutable files to placed in a particular directory from where the files are processed. The spooling source guarantees reliability of delivery of the events as long as the files are "immutable and uniquely named".

In my case, the clickstream logs are written on the webserver as csv files. The csv files are rotated every minute and copied to the spooling directory source location by a cron job. The current version of Flume (1.4) does not support recursive polling of directory for the spool source. So, I wrote a shell script that recursively finds .csv files from the source location and flattens them at the destination folder (using the file path as the file name).

Below is a sample shell script that does the job:

#!/bin/sh

TS=`date +%Y%m%d%H%M.%S`

SOURCEDIR=/xx/source

DESTDIR=/xx/destination

touch -t $TS $SOURCEDIR/$TS

# find older logs, flatten and move to destination
for FILE in `find $SOURCEDIR -name "*.csv" ! -newer $SOURCEDIR/$TS`
do
  NEWFILE=`echo $FILE | tr '/' '-'`
  mv $FILE $DESTDIR/$NEWFILE
done

rm $SOURCEDIR/$TS

Avro Format and Schema
Avro defines a data format designed to support data-intensive applications - it provides both RPC and serialization framework. Avro allows for schema evolution in a scalable manner and Hive and Impala both support avro formated records. We need to first create a avro schema file that represents the avro records that we are going to persit. Below is a sample format for reference:

{
 "namespace": "company.schema",
 "type": "record",
 "name": "Clickstream",
 "fields": [
  {"name": "timestamp", "type": "long"},
  {"name": "client_ip", "type": "string"},
  {"name": "request_type", "type": ["null", "string"]},
  {"name": "request_url", "type": ["null", "string"]}
 ]
}

Once the avro schema file is created, copy to the schema to the flume agent
sudo mkdir /etc/flume-ng/schemas
sudo cp clickstream.avsc /etc/flume-ng/schemas/clickstream.avsc

Kite SDK
Kite is a set of libraries and tools (contributed by Cloudera) that aims to making building systems on top of Hadoop easier. One of the module that it provides is a "Dataset" module that provides logical abstractions on top of persitence layer. The kite maven plugin also provides Maven goals for packaging, deploying, and running distributed applications. Please refer here to installation steps for Kite. Once installed, you can use the kite maven command to create the dataset on HDFS

mvn kite:create-dataset -Dkite.rootDirectory=/data/clickstream
  -Dkite.datasetName=clickstream \
  -Dkite.avroSchemaFile=/etc/flume-ng/schemas/clickstream.avsc

CSV Logs to Avro Records
In order to transform the events captured from the csv files into a format that can be used for further processing, we can use Flume Interceptors.

Flume Interceptors provide on-the-fly inspection and modification of events to create a customized data flow for our applications. Once an interceptor is attached to a source, it is invoked on every event as it travels between a source and a channel. In addition, multiple interceptors can be chained together in a sequence (as shown in the below diagram).



In this case, we can use an interceptor to transform the csv file to avro formatted records. The interceptor will be implemented using Morphlines, which is a powerful open source framework for doing ETL transformations in Hadoop applications. A morphline is an in-memory container for transformation commands that are chained together.To transform the records into Avro, we can attache a flume morphline interceptor to the source.

A morphline is defined through a configuration file which contains the commands. Below is a morphline to convert csv to avro records

morphlines: [
  {
    id: convertClickStreamLogsToAvro
    importCommands: ["com.cloudera.**", "org.kitesdk.**" ]
    commands: [

      { tryRules {
    catchExceptions : false
    throwExceptionIfAllRulesFailed : true
    rules : [
   # next rule of tryRules cmd:
   {
     commands : [
     { readCSV: {
     separator : ","
     columns : [timestamp,client_ip,request_type,request_uri]
     trim: true
     charset : UTF-8
     quoteChar : "\""
     } 
    }

    {      
     toAvro {
        
      schemaFile: /etc/flume-ng/schemas/clickstream.avsc
  
     } 
   
    }
    { 
     writeAvroToByteArray: {
        
      format: containerlessBinary
  
     } 
   
    }
   ]
   }
   # next rule of tryRules cmd:
   {
     commands : [
    { dropRecord {} }
     ]
   }
       
  ]
    }
  }

  { logTrace { format : "output record: {}", args : ["@{}"] } }  
    ]
  }


]

Now, copy the morphline file to the flume agent:
sudo cp morphline.conf /etc/flume-ng/conf/morphline.conf
Flume Configuration
The events are forwarded from the source to an HDFS sink through a memory channel.  The HDFS sink writes the events to a file where the path of the file is specified by the configuration. The flume properties file has to be setup to specify the source, channel and sink.

clickstreamAgent.channels = mem-channel-hdfs
clickstreamAgent.sources = spoolSource
clickstreamAgent.sinks = clickstream-dataset

Channels Setup
The below section shows the memory channel setup
clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 1000

Source and Interceptor Setup
The below section shows the source and the two interceptors that are attached to the source:

clickstreamAgent.sources.spoolSource.type = spooldir
clickstreamAgent.sources.spoolSource.channels = mem-channel-hdfs
clickstreamAgent.sources.spoolSource.spoolDir = /xxx/folder
clickstreamAgent.sources.spoolSource.deletePolicy=immediate

clickstreamAgent.sources.spoolSource.interceptors = attach-schema morphline

# add the schema for our record sink
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc

# morphline interceptor config
clickstreamAgent.sources.spoolSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro


Sink Configuration
The below section shows the HDFS sink setup
# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 100
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

Below is the complete configuration file (flume.properties)

clickstreamAgent.channels = mem-channel-hdfs
clickstreamAgent.sources = spoolSource
clickstreamAgent.sinks = clickstream-dataset

clickstreamAgent.channels.mem-channel-hdfs.type = memory
clickstreamAgent.channels.mem-channel-hdfs.capacity = 10000000
clickstreamAgent.channels.mem-channel-hdfs.transactionCapacity = 1000

clickstreamAgent.sources.spoolSource.type = spooldir
clickstreamAgent.sources.spoolSource.channels = mem-channel-hdfs
clickstreamAgent.sources.spoolSource.spoolDir = /xxx/folder
clickstreamAgent.sources.spoolSource.deletePolicy=immediate

clickstreamAgent.sources.spoolSource.interceptors = attach-schema morphline

# add the schema for our record sink
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.type = static
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.key = flume.avro.schema.url
clickstreamAgent.sources.spoolSource.interceptors.attach-schema.value = file:/etc/flume-ng/schemas/clickstream.avsc

# morphline interceptor config
clickstreamAgent.sources.spoolSource.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineFile = /etc/flume-ng/conf/morphline.conf
clickstreamAgent.sources.spoolSource.interceptors.morphline.morphlineId = convertClickStreamLogsToAvro

# store the clickstream in the avro Dataset
clickstreamAgent.sinks.clickstream-dataset.type = hdfs
clickstreamAgent.sinks.clickstream-dataset.channel = mem-channel-hdfs
clickstreamAgent.sinks.clickstream-dataset.hdfs.path = /data/clickstream
clickstreamAgent.sinks.clickstream-dataset.hdfs.batchSize = 100
clickstreamAgent.sinks.clickstream-dataset.hdfs.fileType = DataStream
clickstreamAgent.sinks.clickstream-dataset.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

The configuration can be updated by copying the flume.properties to (/etc/flume-ng/conf) or using Cloudera Manager (Configuration->Default Agent)

Testing
Once all the above setup is done, you can start the flume agent and monitor the logs to see if the events are being processed. After sending some sample logs, you can see the files being written to HDFS (/data/clickstream).

In addition, the data can be queried on Hive and Impala.

[xxx:xxx:21000] > invalidate metadata;
Query: invalidate metadata
Query finished, fetching results ...


[xxx:xxx:21000]> show tables;
Query: show tables
Query finished, fetching results ...
+-------+
| name       |
+-------+
| clickstream|
+-------+
[xxx:xxx:21000]> select client_ip from clickstream;
Query: select client_ip from clickstream

Query finished, fetching results ...+----------------------+
| client_ip            |
+----------------------+
| 127.0.0.1            |
| 0.0.0.0              |
+----------------------+