I have mentioned about centralized logging system with an example of Logstash in previous post:
centralized logging with Logstash.
It's greate if you do not want to do any programming work since Logstash provides you lots of flexibility. However, there are quite some limitations if you use logstash for a long time:
- System log for logstash is not sufficient if error happens
- Too complicated system, any error in any part could result in data failure
- With large input of stream say 2000 per input, logstash could crash without any notifications.
While I explore through the streaming solutions across the internet, Flume becomes a potential better solutions.The idea of Flume is more or less the same as Logstash, except itself provides a reliable queue system. Supported by Apache, Flume seems to be more reliable, and it dose not have to be very complicated system. The following tutorial is an introduction to use flume as centralized logging system, based on ubuntu system.
Flume Installation
cd ~ sudo apt-get update sudo apt-get install openjdk-7-jre-headless -y sudo wget http://mirror.nus.edu.sg/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz tar -zvxf apache-flume-1.5.0-bin.tar.gz sudo mv apache-flume-1.5.0-bin /opt/flumeIf you want to use elasticsearch sink, you will have to install lucence library. Suppose you are using elasticsearch 1.2.1 with lucence 4.8
sudo wget https://dl.dropboxusercontent.com/s/h491nkeajc67pk7/lucene%204.8.zip unzip lucene\ 4.8.zip cd lucene\ 4.8/ sudo mv * /opt/flume/libIf you want to try out flume, please follow the tutorial: http://flume.apache.org/FlumeUserGuide.html#a-simple-example usually in order to run flume in a normal environment, you will have to configure flume-env.sh:
cd /opt/flume/conf cp flume-env.sh.template flume-env.sh #edit flume-env.sh to put your JAVA_HOME and a reasonable JAVA_OPTS(this is important, if you have a large streaming data per sec)
Elasticsearch Installation
Please follow my blog:http://jakege.blogspot.sg/2014/03/how-to-install-elasticsearch.htmlKibana Installation
Kibana you will just have to download from official site:http://www.elasticsearch.org/overview/kibana/, because it's simply a local website. All you have to do is to edit config.js to connect to the right elasticsearch servers and open index.html.Centralized Logging System with Flume
Usually for centralized logging we use the consolidate flume standard setup:Here we need two kinds of flume agent: shipping agent and collecting agent.
Shipping agent
For shipping agent, the agent will listen on log file and ship to arvo port of collection agent, the set-up is like:################################################ # Name the components on this agent ################################################ agent1.sources = source1 agent1.sinks = sink1 sink2 agent1.channels = channel1 ################################################ # Describe Source ################################################ # Source Tail agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /var/log/nginx/nginx_access.log ################################################ # Describe Interceptors ################################################ agent1.sources.source1.interceptors = interceptor1 interceptor2 #add from host agent1.sources.source1.interceptors.interceptor1.type = host agent1.sources.source1.interceptors.interceptor1.hostHeader = host #add timestamp agent1.sources.source1.interceptors.interceptor2.type = timestamp ################################################ # Describe Sink ################################################ #Avro Sink, usually will have two collection point for load_balance and HA agent1.sinks.sink1.type = avro agent1.sinks.sink1.hostname = 192.168.0.1 agent1.sinks.sink1.port = 5000 agent1.sinks.sink2.type = avro agent1.sinks.sink2.hostname = 192.168.0.1 agent1.sinks.sink2.port = 5000 ################################################ # Describe Sink Group ################################################ # Sink Group agent1.sinkgroups = load_group1 agent1.sinkgroups.load_group1.sinks = sink1 sink2 agent1.sinkgroups.load_group1.processor.type = load_balance agent1.sinkgroups.load_group1.processor.backoff = true agent1.sinkgroups.load_group1.processor.selector = round_robin ################################################ # Describe Channel ################################################ # Channel Memory agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 100000 agent1.channels.channel1.transactionCapacity = 300 ################################################ # Bind the source and sink to the channel ################################################ agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 agent1.sinks.sink2.channel = channel1
Collecting agent
For collection agent, the agent will take the input from arvo port and put it into elasticsearch, the set-up is like:################################################ # Name the components on this agent ################################################ agent2.sources = source1 agent2.sinks = sink1 agent2.channels = channel1 ################################################ # Describe Source ################################################ # Source Avro agent2.sources.source1.type = avro agent2.sources.source1.bind = 0.0.0.0 agent2.sources.source1.port = 5000 ################################################ # Describe Interceptors ################################################ # an example of nginx access log regex match agent2.sources.source1.interceptors = interceptor1 agent2.sources.source1.interceptors.interceptor1.type = regex_extractor agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+) agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = ident agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = auth agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = logtime agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = method agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = request agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = param agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = httpversion agent2.sources.source1.interceptors.interceptor1.serializers.s9.name = referrer agent2.sources.source1.interceptors.interceptor1.serializers.s10.name = agent agent2.sources.source1.interceptors.interceptor1.serializers.s11.name = response agent2.sources.source1.interceptors.interceptor1.serializers.s12.name = bytes agent2.sources.source1.interceptors.interceptor1.serializers.s13.name = requesttime ################################################ # Describe Sink ################################################ # Sink ElasticSearch # Elasticsearch lib ---> flume/lib # elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data. agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink agent2.sinks.sink1.hostNames = 192.168.1.1:9300,192.168.1.2:9300 agent2.sinks.sink1.indexName = nginx agent2.sinks.sink1.indexType = nginx_access agent2.sinks.sink1.clusterName = elasticsearch agent2.sinks.sink1.batchSize = 1000 agent2.sinks.sink1.ttl = 2 #this serializer is crucial in order to use kibana agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer ################################################ # Describe Channel ################################################ # Channel Memory agent2.channels.channel1.type = memory agent2.channels.channel1.capacity = 10000000 agent2.channels.channel1.transactionCapacity = 1000 ################################################ # Bind the source and sink to the channel ################################################ agent2.sources.source1.channels = channel1 agent2.sinks.sink1.channel = channel1
Start-up script
if you need an start-up service script: assume your agent is agent1 and config file is flume.conf in /opt/flume. please change it respectively#!/bin/sh # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Starts a Flume agent # # chkconfig: 345 90 10 # description: Flume agent # ### BEGIN INIT INFO # Provides: flume-ng-agent # Required-Start: $remote_fs # Should-Start: # Required-Stop: $remote_fs # Should-Stop: # Default-Start: 3 4 5 # Default-Stop: 0 1 2 6 # Short-Description: Flume agent ### END INIT INFO . /lib/lsb/init-functions # Name of the agnet FLUME_AGENT_NAME=agent1 # Setting up a few defaults that can be later overrideen in /etc/default/flume-ng-agent FLUME_LOG_DIR=/opt/flume/logs FLUME_CONF_DIR=/opt/flume/conf FLUME_RUN_DIR=/var/run/flume FLUME_HOME=./bin/flume-ng FLUME_USER=mozat # Autodetect JAVA_HOME if not defined if [ -e /usr/libexec/bigtop-detect-javahome ]; then . /usr/libexec/bigtop-detect-javahome elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then . /usr/lib/bigtop-utils/bigtop-detect-javahome fi STATUS_RUNNING=0 STATUS_DEAD=1 STATUS_DEAD_AND_LOCK=2 STATUS_NOT_RUNNING=3 ERROR_PROGRAM_NOT_INSTALLED=5 FLUME_LOCK_DIR="/var/lock/subsys/" LOCKFILE="${FLUME_LOCK_DIR}/flume-ng-agent" desc="Flume agent daemon" FLUME_CONF_FILE=${FLUME_CONF_FILE:-${FLUME_CONF_DIR}/flume.conf} EXEC_PATH=/opt/flume/bin/flume-ng FLUME_PID_FILE=${FLUME_RUN_DIR}/flume.pid # These directories may be tmpfs and may or may not exist # depending on the OS (ex: /var/lock/subsys does not exist on debian/ubuntu) for dir in "$FLUME_RUN_DIR" "$FLUME_LOCK_DIR"; do [ -d "${dir}" ] || install -d -m 0755 -o $FLUME_USER -g $FLUME_USER ${dir} done FLUME_SHUTDOWN_TIMEOUT=${FLUME_SHUTDOWN_TIMEOUT:-60} start() { [ -x $exec ] || exit $ERROR_PROGRAM_NOT_INSTALLED checkstatus status=$? if [ "$status" -eq "$STATUS_RUNNING" ]; then exit 0 fi log_success_msg "Starting $desc (flume-ng-agent): " /bin/su -s /bin/bash -c "cd /opt/flume;/bin/bash -c 'echo \$\$ > ${FLUME_PID_FILE} && exec ${EXEC_PATH} agent -n $FLUME_AGENT_NAME -c conf -f $FLUME_CONF_FILE -Dflume.monitoring.type=http -Dflume.monitoring.port=30001 >>${FLUME_LOG_DIR}/flume.${FLUME_AGENT_NAME}.init.log 2>&1 ' &" $FLUME_USER RETVAL=$? [ $RETVAL -eq 0 ] && touch $LOCKFILE return $RETVAL } stop() { if [ ! -e $FLUME_PID_FILE ]; then log_failure_msg "Flume agent is not running" exit 0 fi log_success_msg "Stopping $desc (flume-ng-agent): " FLUME_PID=`cat $FLUME_PID_FILE` if [ -n $FLUME_PID ]; then log_success_msg "kill process ${FLUME_PID}" kill -TERM ${FLUME_PID} &>/dev/null # for i in `seq 1 ${FLUME_SHUTDOWN_TIMEOUT}` ; do # kill -0 ${FLUME_PID} &>/dev/null || break # sleep 1 # done kill -KILL ${FLUME_PID} &>/dev/null fi rm -f $LOCKFILE $FLUME_PID_FILE return 0 } restart() { stop start } checkstatus(){ pidofproc -p $FLUME_PID_FILE java > /dev/null status=$? case "$status" in $STATUS_RUNNING) log_success_msg "Flume agent is running" ;; $STATUS_DEAD) log_failure_msg "Flume agent is dead and pid file exists" ;; $STATUS_DEAD_AND_LOCK) log_failure_msg "Flume agent is dead and lock file exists" ;; $STATUS_NOT_RUNNING) log_failure_msg "Flume agent is not running" ;; *) log_failure_msg "Flume agent status is unknown" ;; esac return $status } condrestart(){ [ -e ${LOCKFILE} ] && restart || : } case "$1" in start) start ;; stop) stop ;; status) checkstatus ;; restart) restart ;; condrestart|try-restart) condrestart ;; *) echo $"Usage: $0 {start|stop|status|restart|try-restart|condrestart}" exit 1 esac exit $RETVAL
Index template
For logs, in order to manage it better and fits better in Kibana, we better define a index template, for example for our nginx log:{ "template": "nginx-*", "settings" : { "number_of_shards" : 5, "number_of_replicas" : 1, "index.cache.field.type" : "soft", "index.refresh_interval" : "5s", "index" : { "query" : { "default_field" : "@message" }, "store" : { "compress" : { "stored" : true, "tv": true } } } }, "mappings": { "_default_": { "_all": { "enabled": false }, "_source": { "compress": true }, "_ttl": { "enabled": true, "default": "2d" }, "properties" : { "@timestamp": { "type": "date", "index": "not_analyzed" }, "@message": { "type" : "string", "index" : "analyzed" }, "@source_host": { "type": "string", "index": "not_analyzed" }, "@fields" : { "type": "object", "properties": { "agent" : { "type" : "string", "index" : "analyzed" }, "request" : { "type" : "string", "index" : "not_analyzed" }, "host" : { "type" : "string", "index" : "not_analyzed" }, "clientip" : { "type" : "string", "index" : "not_analyzed" }, "file" : { "type" : "string", "index" : "not_analyzed" }, "bytes": { "type": "integer"}, "offset": {"type": "integer"}, "requesttime": {"type": "float"}, "logtime": { "type" : "string", "index" : "not_analyzed" } } } } } } }
Elasticsearch Curator
To manage our index, delete and close outdated index and optimize it. We use elasticsearch curator, installation for 1.0:pip install elasticsearch-curatorTo close index older than 1 day and delete 4 days with "-" as separator(flume uses - as index date separator like: nginx-2014-06-13), we put this in cron job:
20 12 * * * /usr/local/bin/curator --host 192.168.1.1 -s - --prefix nginx- -d 4 -c 1 -s -
Hi,
ReplyDeleteThanks for this informative article. I am having issues setting up flume with elasticsearch and kibana.
I am able to save the data using Flume into HDFS by using HDFS sink. Now the same data I want to send to elasticsearch so I can visualize in Kibana.
But unfortunately, it does not seem to work.
The data I want to send looks like this:
{‘id’: ‘26’, ‘value’: ‘8’}
{‘id’: ‘27’, ‘value’: ‘16’}
{‘id’: ‘28’, ‘value’: ‘21’}
{‘id’: ‘29’, ‘value’: ‘10’}
I have created an elasticsearch index with this mapping:
curl -XPUT ‘localhost:9200/riz_index?pretty’ -H ‘Content-Type: application/json’ -d’
{
“mappings” : {
“_default_” : {
“properties” : {
“id” : {“type”: “integer” },
“value” : {“type”: “integer” }
}
}
}
}
‘
The flume conf file:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 127.0.0.5
a1.sources.r1.port = 5005
# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = localhost:9200,localhost:9300
a1.sinks.k1.indexName = riz_index
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.v12.ElasticSearchLogStashEventSerializer
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
I can see the elasticsearch index created in kibana and the specified fields (id and value) but no data in those fields.
Is there anything that I am missing?
Thanks
Nice and helpful post for everyone. i learnt more useful information about hadoop from this article..
ReplyDeletebest big data training center in Chennai
Hi,
ReplyDeleteI must appreciate you for providing such a valuable content for us. This is one amazing piece of article. Helped a lot in increasing my knowledge.
Cloud computing Training in Chennai
Hadoop Training in Chennai
Best institute for Cloud computing in Chennai
Cloud computing Training Chennai
Big Data Course in Chennai
Big Data Hadoop Training in Chennai
nice..
ReplyDeleteinternships in hyderabad for cse students 2020
r training in chennai
robotics course
internship for ece students in core companies
internship for aeronautical engineering students in bangalore
internship for cse students in bangalore 2019
industrial visits for college students in chennai
Excellent post. Keep posting such kind of info on your blog.
ReplyDeleteMLSU BA 1st Year Result
MLSU BA 2nd Year Result
MLSU BA 3rd Year Result
betmatik
ReplyDeletekralbet
betpark
tipobet
slot siteleri
kibris bahis siteleri
poker siteleri
bonus veren siteler
mobil ödeme bahis
SB3PMV
شركة تنظيف بالقصيم mlmCZrxS5k
ReplyDelete