返回介绍

PART Ⅰ : 容器云OPENSHIFT

PART Ⅱ:容器云 KUBERNETES

PART Ⅲ:持续集成与持续部署

PART Ⅴ:日志/监控/告警

PART Ⅵ:基础

PART Ⅶ:数据存储、处理

PART VIII:CODE

PART X:HACKINTOSH

PART XI:安全

Logastash简介安装配置Pipeline

发布于 2024-06-08 21:16:46 字数 113083 浏览 0 评论 0 收藏 0

官方文档:https://www.elastic.co/guide/en/logstash/current/getting-started-with-logstash.html

  • Logstash是一个开源数据收集引擎,具有实时管道功能。
  • Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地
  • Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。
  • Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患
  • Logstash使用Ruby语言编写的运行在Java虚拟机上的具有收集、分析和转发数据流功能的工具
  • Logstash使用Pipeline方式进行日志的搜集,处理和输出

Event:logstash将数据流中的每一条数据在input处被转换为event,在output处event再被转换为目标格式的数据

  • Inputs:用于从数据源获取Event。每个Input启动一个线程,从对应数据源获取数据,将数据写入一个队列
  • Filters:用于过滤、修改Event
  • Outputs:负责输出Event到其他系统中

Logstash使用Pipeline流水线的形式来处理数据Event事件,大致流程如下

“logstash workflow”的图片搜索结果

其中inputs和outputs支持codecs(coder&decoder)在1.3.0 版之前,logstash 只支持纯文本形式输入,然后用filter处理它。但现在,我们可以在输入期间处理不同类型的数据。所以现在的数据处理流程

1570778943330

1570786478086

箭头代表数据流向。可以有多个input。中间的queue负责将数据分发到不通的pipline中,每个pipline由batcher,filter和output构成。batcher的作用是批量从queue中取数据(可配置)。

logstash数据流历程

  1. 首先有一个输入数据,例如是一个web.log文件,其中每一行都是一条数据。file imput会从文件中取出数据,然后通过json codec将数据转换成logstash event。
  2. 这条event会通过queue流入某一条pipline处理线程中,首先会存放在batcher中。当batcher达到处理数据的条件(如一定时间或event一定规模)后,batcher会把数据发送到filter中,filter对event数据进行处理后转到output,output就把数据输出到指定的输出位置。
  3. 输出后还会返回ACK给queue,包含已经处理的event,queue会将已处理的event进行标记。

1570786873214

1570786908829

queue分类

  • In Memory: 在内存中,固定大小,无法处理进程crash. 机器宕机等情况,会导致数据丢失。
  • Persistent Queue:可处理进程crash情况,保证数据不丢失。保证数据至少消费一次;充当缓冲区,可代替kafka等消息队列作用。
  • Dead Letter Queues:存放logstash因数据类型错误等原因无法处理的Event

Persistent Queue(PQ)处理流程

  1. 一条数据经由input进入PQ,PQ将数据备份在disk,然后PQ响应input表示已收到数据;
  2. 数据从PQ到达filter/output,其处理到事件后返回ACK到PQ;
  3. PQ收到ACK后删除磁盘的备份数据;

1. 安装Java环境

在一些Linux环境下,必须设置JAVA_HOME环境变量,否则Logstash在安装期间没有检测到JAVA_HOME环境变量,会报错并且启动不起来服务。如果JDK目录在/opt下,则 在/usr/bin/下建立软连接指向JAVA_HOME/bin路径下的java

2. 安装Logstash

YUM/RPM

[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

yum install -y logstash-7.2.0

手动下载RPM安装,官方下载链接:https://www.elastic.co/downloads/logstash

yum localinstall -y logstash-7*.rpm

RPM包安装后各个配置文件的位置

TypeDescriptionDefault LocationSetting
homeHome directory of the Logstash installation./usr/share/logstash
binBinary scripts including logstash to start Logstash and logstash-plugin to install plugins/usr/share/logstash/bin
settingsConfiguration files, including logstash.yml, jvm.options, and startup.options/etc/logstashpath.settings
confLogstash pipeline configuration files/etc/logstash/conf.d/*.confSee /etc/logstash/pipelines.yml
logsLog files/var/log/logstashpath.logs
pluginsLocal, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only./usr/share/logstash/pluginspath.plugins
dataData files used by logstash and its plugins for any persistence needs./var/lib/logstashpath.data

二进制包

二进制包中各个配置文件的位置

TypeDescriptionDefault LocationSetting
homeHome directory of the Logstash installation.{extract.path}- Directory created by unpacking the archive
binBinary scripts, including logstash to start Logstash and logstash-plugin to install plugins{extract.path}/bin
settingsConfiguration files, including logstash.yml and jvm.options{extract.path}/configpath.settings
logsLog files{extract.path}/logspath.logs
pluginsLocal, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only.{extract.path}/pluginspath.plugins
dataData files used by logstash and its plugins for any persistence needs.{extract.path}/datapath.data

3. 启动

以服务形式或命令启动Logstash

systemctl start logstash 
#后台会起一个名叫org.jruby.Main的Java后台进程,用jps -l查看
jps -l

使用二进制执行文件启动

/user/share/logstash/bin/logstash -f logstash.conf --config.reload.automatic

#-f 指定配置文件路径
#--config.reload.automatic 自动检测加载配置文件,该参数在有-e参数是不生效  
#--config.reload.interval <seconds> 设置多少秒检测一次配置文件  如果Logstash启动时没有配置自动加载配置文件,重启进程时加上。

4. 验证

/usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'
#参数-e:直接从命令行定义配置信息
#配置从标准输入读取输入,然后输出到标准输出

stdin > hello world
stdout> 2013-11-21T01:22:14.405+0000 0.0.0.0 hello world

#Logstash会在消息上添加时间戳和IP地址
#Ctrl+D 退出Logstash

5. 命令行参数

参数描述默认值
-r, --config.reload.automaticMonitor configuration changes and reload whenever it is changed. NOTE: use SIGHUP to manually reload the configfalse
-n,
--node.name NAME
Specify the name of this logstash instance, if no value is given it will default to the current hostname.当前主机名
-f,
--path.config CONFIG_PATH
Load the logstash config from a specific file or directory. If a directory is given, all files in that directory will be concatenated in lexicographical order and then parsed as a single config file. You can also specify wildcards (globs) and any matched files will be loaded in the order described above.
-e,
--config.string CONFIG_STRING
Use the given string as the configuration data. Same syntax as the config file. If no input is pecified, then the following is used as the default input: "input { stdin { type => stdin } }" and if no output is specified, then the following is used as the default output: "output { stdout { codec => rubydebug } }" If you wish to use both defaults, please use the empty string for the '-e' flag.nil
--log.level LEVELSet the log level for logstash. Possible values are: fatal error warn info debug trace (default: "info")
-l,
--path.logs PATH
Write logstash internal logs to the given file. Without this flag, logstash will emit logs to standard output./usr/share/logstash/logs
-t,
--config.test_and_exit
Check configuration for valid syntax and then exit.false
--config.reload.interval RELOAD_INTERVALHow frequently to poll the configuration location for changes, in seconds3000000000
--http.host HTTP_HOSTWeb API binding host127.0.0.1
--http.port HTTP_PORTWeb API http port9600..9700
--log.format FORMATSpecify if Logstash should write its own logs in JSON form (one event per line) or in plain text (using Ruby's Object#inspect)plain
--path.settings SETTINGS_DIRDirectory containing logstash.yml file. This can also be set through the LS_SETTINGS_DIR environment variable/usr/share/logstash/config
-p,
--path.plugins PATH
A path of where to find plugins. This flag can be given multiple times to include multiple paths. Plugins are expected to be in a specific directory hierarchy: 'PATH/logstash/TYPE/NAME.rb' where TYPE is 'inputs' 'filters', 'outputs' or 'codecs' and NAME is the name of the plugin.[]
--path.data PATHThis should point to a writable directory. Logstash will use this directory whenever it needs to store data. Plugins will also have access to this path./usr/share/logstash/data
-u,
--pipeline.batch.delay DELAY_IN_MS
When creating pipeline batches, how long to wait while polling for the next event.50
--pipeline.id IDSets the ID of the pipeline.main
-b,
--pipeline.batch.size SIZE
Size of batches the pipeline is to work in.125
-V, --versionEmit the version of logstash and its friends, then exit.
-M,
--modules.variable MODULES_VARIABLE
Load variables for module template. Multiple instances of '-M' or '--modules.variable' are supported. Ignored if '--modules' flag is not used. Should be in the format of '-M "MODULE_NAME.var.PLUGIN_TYPE.PLUGIN_NAME.VARIABLE_NAME=VALUE"' as in '-M "example.var.filter.mutate.fieldname=fieldvalue"'
--modules MODULESLoad Logstash modules. Modules can be defined using multiple instances '--modules module1 --modules module2', or comma-separated syntax '--modules=module1,module2' Cannot be used in conjunction with '-e' or '-f' Use of '--modules' will override modules declared in the 'logstash.yml' file.
--setupLoad index template into Elasticsearch, and saved searches, index-pattern, visualizations, and dashboards into Kibana when running modules.false
-w,
--pipeline.workers COUNT
Sets the number of pipeline workers to run.20
--config.debugPrint the compiled config ruby code out as a debug log (you must also have --log.level=debug enabled). WARNING: This will include any 'password' options passed to plugin configs as plaintext, and may result in plaintext passwords appearing in your logs!false
--pipeline.unsafe_shutdownForce logstash to exit during shutdown even if there are still inflight events in memory. By default, logstash will refuse to quit until all received events have been pushed to the outputs.false
--java-executionUse Java execution engine.true
-i, --interactive SHELLDrop to shell instead of running as normal. Valid shells are "irb" and "pry"
--verboseSet the log level to info.
docker pull docker.elastic.co/logstash/logstash:7.4.0

docker pull logstash:7.4.0

镜像中各个配置文件的位置

TypeDescriptionDefault LocationSetting
homeHome directory of the Logstash installation./usr/share/logstash
binBinary scripts, including logstash to start Logstash and logstash-plugin to install plugins/usr/share/logstash/bin
settingsConfiguration files, including logstash.yml and jvm.options/usr/share/logstash/configpath.settings
confLogstash pipeline configuration files/usr/share/logstash/pipelinepath.config
pluginsLocal, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only./usr/share/logstash/pluginspath.plugins
dataData files used by logstash and its plugins for any persistence needs./usr/share/logstash/datapath.data

Note:基于该镜像启动的容器,日志是直接输出到控制台的,无法直接输出到日志文件中

  • docker镜像是基于.tar.gz格式的二进制包创建的

  • 将pipeline文件挂载到/usr/share/logstash/pipeline/下启动

    docker run --rm -it \
    -v ./test.conf:/usr/share/logstash/pipeline/test.conf \
    docker.elastic.co/logstash/logstash:7.4.0
    
  • 默认pipeline文件:/usr/share/logstash/pipeline/logstash.conf

    input {
      beats {
        port => 5044
      }
    }
    output {
      stdout {
        codec => rubydebug
      }
    }
    

    也就是说如果不配置挂载pipeline文件就直接启动容器,logstash将启动一个最小化的pipeline:Beat Input ---> Stdout Output

  • 可通过设置环境变量的形式配置logstash。

    docker run --rm -it -e PIPELINE_WORKERS:2 docker.elastic.co/logstash/logstash:7.4.0。例如以下环境变量对应的logstash配置

Environment VariableLogstash Setting
PIPELINE_WORKERSpipeline.workers
LOG_LEVELlog.level
XPACK_MONITORING_ENABLEDxpack.monitoring.enabled
  • logstash docker 镜像中的默认配置
http.host0.0.0.0
xpack.monitoring.elasticsearch.hostshttp://elasticsearch:9200

Logstash配置文件中配置项的格式是基于YAML语法,例如:

pipeline:
  batch:
    size: 125
    delay: 50

也可以使用平级格式

pipeline.batch.size: 125
pipeline.batch.delay: 50

配置项的值可以引用系统级别的环境变量

pipeline.batch.size: ${BATCH_SIZE}
pipeline.batch.delay: ${BATCH_DELAY:50}
node.name: "node_${LS_NODE_NAME}"
path.queue: "/tmp/${QUEUE_DIR:queue}"

如果设置多个自定义的配置项时,推荐使用以下格式

modules:
  - name: MODULE_NAME1
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE
    var.PLUGIN_TYPE2.PLUGIN_NAME2.KEY1: VALUE
    var.PLUGIN_TYPE3.PLUGIN_NAME3.KEY1: VALUE
  - name: MODULE_NAME2
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE

常见的logstash配置

SettingDescriptionDefault value
node.nameA descriptive name for the node.Machine’s hostname
path.dataThe directory that Logstash and its plugins use for any persistent needs.LOGSTASH_HOME/data
pipeline.idThe ID of the pipeline.main
pipeline.java_executionUse the Java execution engine.true
pipeline.workersThe number of workers that will, in parallel, execute the filter and output stages of the pipeline. If you find that events are backing up, or that the CPU is not saturated, consider increasing this number to better utilize machine processing power.Number of the host’s CPU cores
pipeline.batch.sizeThe maximum number of events an individual worker thread will collect from inputs before attempting to execute its filters and outputs. Larger batch sizes are generally more efficient, but come at the cost of increased memory overhead. You may need to increase JVM heap space in the jvm.options config file. See Logstash Configuration Files for more info.125
pipeline.batch.delayWhen creating pipeline event batches, how long in milliseconds to wait for each event before dispatching an undersized batch to pipeline workers.50
pipeline.unsafe_shutdownWhen set to true, forces Logstash to exit during shutdown even if there are still inflight events in memory. By default, Logstash will refuse to quit until all received events have been pushed to the outputs. Enabling this option can lead to data loss during shutdown.false
pipeline.plugin_classloaders(Beta) Load Java plugins in independent classloaders to isolate their dependencies.false
path.configThe path to the Logstash config for the main pipeline. If you specify a directory or wildcard, config files are read from the directory in alphabetical order.Platform-specific. See Logstash Directory Layout.
config.stringA string that contains the pipeline configuration to use for the main pipeline. Use the same syntax as the config file.None
config.test_and_exitWhen set to true, checks that the configuration is valid and then exits. Note that grok patterns are not checked for correctness with this setting. Logstash can read multiple config files from a directory. If you combine this setting with log.level: debug, Logstash will log the combined config file, annotating each config block with the source file it came from.false
config.reload.automaticWhen set to true, periodically checks if the configuration has changed and reloads the configuration whenever it is changed. This can also be triggered manually through the SIGHUP signal.false
config.reload.intervalHow often in seconds Logstash checks the config files for changes.3s
config.debugWhen set to true, shows the fully compiled configuration as a debug log message. You must also set log.level: debug. WARNING: The log message will include any password options passed to plugin configs as plaintext, and may result in plaintext passwords appearing in your logs!false
config.support_escapesWhen set to true, quoted strings will process the following escape sequences: \n becomes a literal newline (ASCII 10). \r becomes a literal carriage return (ASCII 13). \t becomes a literal tab (ASCII 9). \\ becomes a literal backslash \. \" becomes a literal double quotation mark. \' becomes a literal quotation mark.false
modulesWhen configured, modules must be in the nested YAML structure described above this table.None
queue.typeThe internal queuing model to use for event buffering. Specify memory for legacy in-memory based queuing, or persisted for disk-based ACKed queueing (persistent queues).memory
path.queueThe directory path where the data files will be stored when persistent queues are enabled (queue.type: persisted).path.data/queue
queue.page_capacityThe size of the page data files used when persistent queues are enabled (queue.type: persisted). The queue data consists of append-only data files separated into pages.64mb
queue.max_eventsThe maximum number of unread events in the queue when persistent queues are enabled (queue.type: persisted).0 (unlimited)
queue.max_bytesThe total capacity of the queue in number of bytes. Make sure the capacity of your disk drive is greater than the value you specify here. If both queue.max_events and queue.max_bytes are specified, Logstash uses whichever criteria is reached first.1024mb (1g)
queue.checkpoint.acksThe maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (queue.type: persisted). Specify queue.checkpoint.acks: 0 to set this value to unlimited.1024
queue.checkpoint.writesThe maximum number of written events before forcing a checkpoint when persistent queues are enabled (queue.type: persisted). Specify queue.checkpoint.writes: 0 to set this value to unlimited.1024
queue.checkpoint.retryWhen enabled, Logstash will retry once per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances.false
queue.drainWhen enabled, Logstash waits until the persistent queue is drained before shutting down.false
dead_letter_queue.enableFlag to instruct Logstash to enable the DLQ feature supported by plugins.false
dead_letter_queue.max_bytesThe maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting.1024mb
path.dead_letter_queueThe directory path where the data files will be stored for the dead-letter queue.path.data/dead_letter_queue
http.hostThe bind address for the metrics REST endpoint."127.0.0.1"
http.portThe bind port for the metrics REST endpoint.9600
log.level设置Logstash日志输出级别 可用值:fatal error warn info debug traceinfo
log.formatThe log format. Set to json to log in JSON format, or plain to use Object#.inspect.plain
path.logsThe directory where Logstash will write its log to.LOGSTASH_HOME/logs
path.pluginsWhere to find custom plugins. You can specify this setting multiple times to include multiple paths. Plugins are expected to be in a specific directory hierarchy: PATH/logstash/TYPE/NAME.rb where TYPE is inputs, filters, outputs, or codecs, and NAME is the name of the plugin.Platform-specific. See Logstash Directory Layout.

1. 配置项结构

Logstash Pipeline文件的配置项分为三个部分:

input{
  input插件{
      插件配置项
  }
}
filter{
  filter插件{
      插件配置项
  }
}
output{
  output插件{
      插件配置项
  }
}

Note:

  1. 如果在filter中添加了多种处理规则,则按照它的顺序一一处理,但是有一些插件并不是线程安全的。

  2. 如果在filter中指定了两个一样的的插件,这两个任务并不能保证准确的按顺序执行,因此官方也推荐避免在filter中重复使用插件。

2. 插件的条件控制

官方文档:https://www.elastic.co/guide/en/logstash/6.7/event-dependent-configuration.html

有时需要在特定条件下过滤或输出事件。为此,您可以使用条件(conditional)来决定filter和output处理特定的事件。比如在elk系统中想要添加一个type类型的关键字来根据不同的条件赋值,最后好做统计。条件语支持if,else if和else语句并且可以嵌套。

条件语法

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

操作符

  • 比较操作

    • 相等: ==, !=, <, >, <=, >=
    • 正则: `=~(匹配正则), !~(不匹配正则)
    • 包含:in(包含), not in(不包含)
  • 布尔操作

    • and(与), or(或), nand(非与), xor(非或)
  • 一元运算符

    • !(取反)
    • ()(复合表达式), !()(对复合表达式结果取反)

示例


filter {
  if [foo] in [foobar] {
    mutate { add_tag => "field in field" }
  }
  if [foo] in "foo" {
    mutate { add_tag => "field in string" }
  }
  if "hello" in [greeting] {
    mutate { add_tag => "string in field" }
  }
  if [foo] in ["hello", "world", "foo"] {
    mutate { add_tag => "field in list" }
  }
  if [missing] in [alsomissing] {
    mutate { add_tag => "shouldnotexist" }
  }
  if !("foo" in ["hello", "world"]) {
    mutate { add_tag => "shouldexist" }
  }
  if [message] =~ /\w+\s+\/\w+(\/learner\/course\/)/ {
    mutate {
      add_field => { "learner_type" => "course" }
    }
  }

  mutate { add_field => { "show" => "This data will be in the output" } }
  mutate { add_field => { "[@metadata][test]" => "Hello" } }
  mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
  if "_grokparsefailure" not in [tags] {
    elasticsearch { ... }
  }
  if [@metadata][test] == "Hello" {
    stdout { codec => rubydebug }
  }
  if [loglevel] == "ERROR" and [deployment] == "production" {
    pagerduty {
    ...
    }
  }
}

注意:

如果if[foo] in "String"在执行这样的语句时无法把该字段值转化成String类型。所以最好要加field if exist判断

if ["foo"] {
  mutate {
    add_field => "bar" => "%{foo}"
  }
}

3. 引用event中的字段

  • 直接引用字段,使用[],嵌套字段使用多层[][]即可

    {
        "a": "1",
        "b": "2",
        "c": {
            "c1": "3"
        }
    }
    ----------Pipeline中引用Event中的字段--------------
    if [b] =~ "2" {
        ..........
    }
    if [c][c1] == "3" {
        ...........
    }
    
  • 在字符串中以sprintf方式引用,使用%{}

    {
        "a": "1",
        "b": "2",
        "c": {
            "c1": "3"
        }
    }
    ----------Pipeline中引用Event中的字段--------------
    add_field => {
      "test" => "test: %{b}"
    }
    add_field => {
      "test" => "test: %{[c][c1]}"
    }
    

插件一览表

PluginDescriptionGithub repository
azure_event_hubsReceives events from Azure Event Hubsazure_event_hubs
beatsReceives events from the Elastic Beats frameworklogstash-input-beats
cloudwatchPulls events from the Amazon Web Services CloudWatch APIlogstash-input-cloudwatch
couchdb_changesStreams events from CouchDB’s _changes URIlogstash-input-couchdb_changes
dead_letter_queueread events from Logstash’s dead letter queuelogstash-input-dead_letter_queue
elasticsearchReads query results from an Elasticsearch clusterlogstash-input-elasticsearch
execCaptures the output of a shell command as an eventlogstash-input-exec
fileStreams events from fileslogstash-input-file
gangliaReads Ganglia packets over UDPlogstash-input-ganglia
gelfReads GELF-format messages from Graylog2 as eventslogstash-input-gelf
generatorGenerates random log events for test purposeslogstash-input-generator
githubReads events from a GitHub webhooklogstash-input-github
google_cloud_storageExtract events from files in a Google Cloud Storage bucketlogstash-input-google_cloud_storage
google_pubsubConsume events from a Google Cloud PubSub servicelogstash-input-google_pubsub
graphiteReads metrics from the graphite toollogstash-input-graphite
heartbeatGenerates heartbeat events for testinglogstash-input-heartbeat
httpReceives events over HTTP or HTTPSlogstash-input-http
http_pollerDecodes the output of an HTTP API into eventslogstash-input-http_poller
imapReads mail from an IMAP serverlogstash-input-imap
ircReads events from an IRC serverlogstash-input-irc
java_generatorGenerates synthetic log eventscore plugin
java_stdinReads events from standard inputcore plugin
jdbcCreates events from JDBC datalogstash-input-jdbc
jmsReads events from a Jms Brokerlogstash-input-jms
jmxRetrieves metrics from remote Java applications over JMXlogstash-input-jmx
kafkaReads events from a Kafka topiclogstash-input-kafka
kinesisReceives events through an AWS Kinesis streamlogstash-input-kinesis
log4jReads events over a TCP socket from a Log4j SocketAppender objectlogstash-input-log4j
lumberjackReceives events using the Lumberjack protocllogstash-input-lumberjack
meetupCaptures the output of command line tools as an eventlogstash-input-meetup
pipeStreams events from a long-running command pipelogstash-input-pipe
puppet_facterReceives facts from a Puppet serverlogstash-input-puppet_facter
rabbitmqPulls events from a RabbitMQ exchangelogstash-input-rabbitmq
redisReads events from a Redis instancelogstash-input-redis
relpReceives RELP events over a TCP socketlogstash-input-relp
rssCaptures the output of command line tools as an eventlogstash-input-rss
s3Streams events from files in a S3 bucketlogstash-input-s3
salesforceCreates events based on a Salesforce SOQL querylogstash-input-salesforce
snmpPolls network devices using Simple Network Management Protocol (SNMP)logstash-input-snmp
snmptrapCreates events based on SNMP trap messageslogstash-input-snmptrap
sqliteCreates events based on rows in an SQLite databaselogstash-input-sqlite
sqsPulls events from an Amazon Web Services Simple Queue Service queuelogstash-input-sqs
stdinReads events from standard inputlogstash-input-stdin
stompCreates events received with the STOMP protocollogstash-input-stomp
syslogReads syslog messages as eventslogstash-input-syslog
tcpReads events from a TCP socketlogstash-input-tcp
twitterReads events from the Twitter Streaming APIlogstash-input-twitter
udpReads events over UDPlogstash-input-udp
unixReads events over a UNIX socketlogstash-input-unix
varnishlogReads from the varnish cache shared memory loglogstash-input-varnishlog
websocketReads events from a websocketlogstash-input-websocket
wmiCreates events based on the results of a WMI querylogstash-input-wmi
xmppReceives events over the XMPP/Jabber protocollogstash-input-xmpp

插件通用配置项

参数参数值类型必须默认值详解
add_fieldhashNo{}向事件添加字段。
codeccodecNoplain用于输入数据的编解码器,在输入数据之前,输入编解码器是一种方便的解码方法,不需要在你的Logstash管道中使用单独的过滤器
enable_metricbooleanNotrue禁用或启用这个特定插件实例的指标日志,默认情况下,我们记录所有我们可以记录的指标,但是你可以禁用特定插件的指标集合。
idstringNo向插件配置添加唯一的ID,如果没有指定ID,则Logstash将生成一个,强烈建议在配置中设置此ID,当你有两个或多个相同类型的插件时,这一点特别有用。例如,如果你有两个log4j输入,在本例中添加一个命名ID将有助于在使用监视API时监视Logstash。input { kafka { id => "my_plugin_id" }}
tagsarrayNo向事件添加任意数量的标记,这有助于以后的处理。
typestringNo向该输入处理的所有事件添加type字段,类型主要用于过滤器激活,该type作为事件本身的一部分存储,因此你也可以使用该类型在Kibana中搜索它。如果你试图在已经拥有一个type的事件上设置一个type(例如,当你将事件从发送者发送到索引器时),那么新的输入将不会覆盖现有的type,发送方的type集在其生命周期中始终与该事件保持一致,甚至在发送到另一个Logstash服务器时也是如此。

插件一览表

PluginDescriptionGithub repository
aggregateAggregates information from several events originating with a single tasklogstash-filter-aggregate
alterPerforms general alterations to fields that the mutate filter does not handlelogstash-filter-alter
bytesParses string representations of computer storage sizes, such as "123 MB" or "5.6gb", into their numeric value in byteslogstash-filter-bytes
cidrChecks IP addresses against a list of network blockslogstash-filter-cidr
cipherApplies or removes a cipher to an eventlogstash-filter-cipher
cloneDuplicates eventslogstash-filter-clone
csvParses comma-separated value data into individual fieldslogstash-filter-csv
dateParses dates from fields to use as the Logstash timestamp for an eventlogstash-filter-date
de_dotComputationally expensive filter that removes dots from a field namelogstash-filter-de_dot
dissectExtracts unstructured event data into fields using delimiterslogstash-filter-dissect
dnsPerforms a standard or reverse DNS lookuplogstash-filter-dns
dropDrops all eventslogstash-filter-drop
elapsedCalculates the elapsed time between a pair of eventslogstash-filter-elapsed
elasticsearchCopies fields from previous log events in Elasticsearch to current eventslogstash-filter-elasticsearch
environmentStores environment variables as metadata sub-fieldslogstash-filter-environment
extractnumbersExtracts numbers from a stringlogstash-filter-extractnumbers
fingerprintFingerprints fields by replacing values with a consistent hashlogstash-filter-fingerprint
geoipAdds geographical information about an IP addresslogstash-filter-geoip
grokParses unstructured event data into fieldslogstash-filter-grok
httpProvides integration with external web services/REST APIslogstash-filter-http
i18nRemoves special characters from a fieldlogstash-filter-i18n
java_uuidGenerates a UUID and adds it to each processed eventcore plugin
jdbc_staticEnriches events with data pre-loaded from a remote databaselogstash-filter-jdbc_static
jdbc_streamingEnrich events with your database datalogstash-filter-jdbc_streaming
jsonParses JSON eventslogstash-filter-json
json_encodeSerializes a field to JSONlogstash-filter-json_encode
kvParses key-value pairslogstash-filter-kv
memcachedProvides integration with external data in Memcachedlogstash-filter-memcached
metricizeTakes complex events containing a number of metrics and splits these up into multiple events, each holding a single metriclogstash-filter-metricize
metricsAggregates metricslogstash-filter-metrics
mutatePerforms mutations on fieldslogstash-filter-mutate
prunePrunes event data based on a list of fields to blacklist or whitelistlogstash-filter-prune
rangeChecks that specified fields stay within given size or length limitslogstash-filter-range
rubyExecutes arbitrary Ruby codelogstash-filter-ruby
sleepSleeps for a specified time spanlogstash-filter-sleep
splitSplits multi-line messages into distinct eventslogstash-filter-split
syslog_priParses the PRI (priority) field of a syslog messagelogstash-filter-syslog_pri
threats_classifierEnriches security logs with information about the attacker’s intentlogstash-filter-threats_classifier
throttleThrottles the number of eventslogstash-filter-throttle
tldReplaces the contents of the default message field with whatever you specify in the configurationlogstash-filter-tld
translateReplaces field contents based on a hash or YAML filelogstash-filter-translate
truncateTruncates fields longer than a given lengthlogstash-filter-truncate
urldecodeDecodes URL-encoded fieldslogstash-filter-urldecode
useragentParses user agent strings into fieldslogstash-filter-useragent
uuidAdds a UUID to eventslogstash-filter-uuid
xmlParses XML into fieldslogstash-filter-xml

插件通用配置项

SettingInput typeRequired
add_fieldhashNo
add_tagarrayNo
enable_metricbooleanNo
idstringNo
periodic_flushbooleanNo
remove_fieldarrayNo
remove_tagarrayNo

插件一览表

PluginDescriptionGithub repository
boundarySends annotations to Boundary based on Logstash eventslogstash-output-boundary
circonusSends annotations to Circonus based on Logstash eventslogstash-output-circonus
cloudwatchAggregates and sends metric data to AWS CloudWatchlogstash-output-cloudwatch
csvWrites events to disk in a delimited formatlogstash-output-csv
datadogSends events to DataDogHQ based on Logstash eventslogstash-output-datadog
datadog_metricsSends metrics to DataDogHQ based on Logstash eventslogstash-output-datadog_metrics
elastic_app_searchSends events to the Elastic App Search solutionlogstash-output-elastic_app_search
elasticsearchStores logs in Elasticsearchlogstash-output-elasticsearch
emailSends email to a specified address when output is receivedlogstash-output-email
execRuns a command for a matching eventlogstash-output-exec
fileWrites events to files on disklogstash-output-file
gangliaWrites metrics to Ganglia’s gmondlogstash-output-ganglia
gelfGenerates GELF formatted output for Graylog2logstash-output-gelf
google_bigqueryWrites events to Google BigQuerylogstash-output-google_bigquery
google_cloud_storageUploads log events to Google Cloud Storagelogstash-output-google_cloud_storage
google_pubsubUploads log events to Google Cloud Pubsublogstash-output-google_pubsub
graphiteWrites metrics to Graphitelogstash-output-graphite
graphtasticSends metric data on Windowslogstash-output-graphtastic
httpSends events to a generic HTTP or HTTPS endpointlogstash-output-http
influxdbWrites metrics to InfluxDBlogstash-output-influxdb
ircWrites events to IRClogstash-output-irc
java_sinkDiscards any events receivedcore plugin
java_stdoutPrints events to the STDOUT of the shellcore plugin
juggernautPushes messages to the Juggernaut websockets serverlogstash-output-juggernaut
kafkaWrites events to a Kafka topiclogstash-output-kafka
libratoSends metrics, annotations, and alerts to Librato based on Logstash eventslogstash-output-librato
logglyShips logs to Logglylogstash-output-loggly
lumberjackSends events using the lumberjack protocollogstash-output-lumberjack
metriccatcherWrites metrics to MetricCatcherlogstash-output-metriccatcher
mongodbWrites events to MongoDBlogstash-output-mongodb
nagiosSends passive check results to Nagioslogstash-output-nagios
nagios_nscaSends passive check results to Nagios using the NSCA protocollogstash-output-nagios_nsca
opentsdbWrites metrics to OpenTSDBlogstash-output-opentsdb
pagerdutySends notifications based on preconfigured services and escalation policieslogstash-output-pagerduty
pipePipes events to another program’s standard inputlogstash-output-pipe
rabbitmqPushes events to a RabbitMQ exchangelogstash-output-rabbitmq
redisSends events to a Redis queue using the RPUSH commandlogstash-output-redis
redmineCreates tickets using the Redmine APIlogstash-output-redmine
riakWrites events to the Riak distributed key/value storelogstash-output-riak
riemannSends metrics to Riemannlogstash-output-riemann
s3Sends Logstash events to the Amazon Simple Storage Servicelogstash-output-s3
snsSends events to Amazon’s Simple Notification Servicelogstash-output-sns
solr_httpStores and indexes logs in Solrlogstash-output-solr_http
sqsPushes events to an Amazon Web Services Simple Queue Service queuelogstash-output-sqs
statsdSends metrics using the statsd network daemonlogstash-output-statsd
stdoutPrints events to the standard outputlogstash-output-stdout
stompWrites events using the STOMP protocollogstash-output-stomp
syslogSends events to a syslog serverlogstash-output-syslog
tcpWrites events over a TCP socketlogstash-output-tcp
timberSends events to the Timber.io logging servicelogstash-output-timber
udpSends events over UDPlogstash-output-udp
webhdfsSends Logstash events to HDFS using the webhdfs REST APIlogstash-output-webhdfs
websocketPublishes messages to a websocketlogstash-output-websocket
xmppPosts events over XMPPlogstash-output-xmpp
zabbixSends events to a Zabbix serverlogstash-output-zabbix

插件通用配置项

SettingInput typeRequired
codeccodecNo
enable_metricbooleanNo
idstringNo

插件一览表

PluginDescriptionGithub repository
avroReads serialized Avro records as Logstash eventslogstash-codec-avro
cefReads the ArcSight Common Event Format (CEF).logstash-codec-cef
cloudfrontReads AWS CloudFront reportslogstash-codec-cloudfront
cloudtrailReads AWS CloudTrail log fileslogstash-codec-cloudtrail
collectdReads events from the collectd binary protocol using UDP.logstash-codec-collectd
dotsSends 1 dot per event to stdout for performance trackinglogstash-codec-dots
ednReads EDN format datalogstash-codec-edn
edn_linesReads newline-delimited EDN format datalogstash-codec-edn_lines
es_bulkReads the Elasticsearch bulk format into separate events, along with metadatalogstash-codec-es_bulk
fluentReads the fluentd msgpack schemalogstash-codec-fluent
graphiteReads graphite formatted lineslogstash-codec-graphite
gzip_linesReads gzip encoded contentlogstash-codec-gzip_lines
jdotsRenders each processed event as a dotcore plugin
java_lineEncodes and decodes line-oriented text datacore plugin
java_plainProcesses text data with no delimiters between eventscore plugin
jsonReads JSON formatted content, creating one event per element in a JSON arraylogstash-codec-json
json_linesReads newline-delimited JSONlogstash-codec-json_lines
lineReads line-oriented text datalogstash-codec-line
msgpackReads MessagePack encoded contentlogstash-codec-msgpack
multilineMerges multiline messages into a single eventlogstash-codec-multiline
netflowReads Netflow v5 and Netflow v9 datalogstash-codec-netflow
nmapReads Nmap data in XML formatlogstash-codec-nmap
plainReads plaintext with no delimiting between eventslogstash-codec-plain
protobufReads protobuf messages and converts to Logstash Eventslogstash-codec-protobuf
rubydebugApplies the Ruby Awesome Print library to Logstash eventslogstash-codec-rubydebug

Logstash 插件是使用 Ruby开发的,Logstash 从很早的1.5.0+版开始,其插件模块和核心模块便分开维护,其插件使用的是 RubyGems包管理器来管理维护。所以 Logstash插件本质上就是自包含的RubyGems。

RubyGems(简称 gems)是一个用于对 Ruby组件进行打包的 Ruby 打包系统。 它提供一个分发 Ruby 程序和库的标准格式,还提供一个管理程序包安装的工具。

插件的名字格式:logstash-{input/output/filter}-插件名 示例:filter中的date插件:logstash-filter-date

1. 安装插件

#以安装dissect插件为例
/usr/share/logstash/bin/logstash-plugin install 插件名
#参数详解:
--path.plugins  指定安装路径

2. 查看已安装的插件

/usr/share/logstash/bin/logstash-plugin list
#参数详解:
--verbose  查看插件的版本
--verbose  查看组(input, filter, codec, output)下面的所有插件。例如查看filter下的所有插件

3. 更新插件

#更新某个插件
/usr/share/logstash/bin/logstash-plugin update 插件名
#更新全部插件
/usr/share/logstash/bin/logstash-plugin update

4. 卸载插件

/usr/share/logstash/bin/logstash-plugin remove  插件名

5. 给插件管理器设置代理

export HTTP_PROXY=http://127.0.0.1:3128

6. 修改插件仓库地址

Logstash插件默认仓库地址是:http://rubygems.org

有一些开源的插件仓库:

编辑/usr/share/logstash/Gemfile,将source "https://rubygems.org"改为source "https://my.private.repository"

1、output-elasticsearch的template模板

PUT _template/logstash
{
    "order" : 2,
    "version" : 60001,
    "index_patterns" : [
      "*"
    ],
    "settings" : {
      "index" : {
        "number_of_replicas" : "1",
        "number_of_shards" : "2",
        "refresh_interval" : "60s"
      }
    },
    "mappings" : {
      "dynamic_templates" : [
        {
          "message_field" : {
            "path_match" : "message",
            "mapping" : {
              "norms" : false,
              "type" : "text"
            },
            "match_mapping_type" : "string"
          }
        },
        {
          "string_fields" : {
            "mapping" : {
              "norms" : false,
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "ignore_above" : 256,
                  "type" : "keyword"
                }
              }
            },
            "match_mapping_type" : "string",
            "match" : "*"
          }
        }
      ],
      "properties" : {
        "@timestamp" : {
          "type" : "date"
        },
        "geoip" : {
          "dynamic" : true,
          "properties" : {
            "ip" : {
              "type" : "ip"
            },
            "latitude" : {
              "type" : "half_float"
            },
            "location" : {
              "type" : "geo_point"
            },
            "longitude" : {
              "type" : "half_float"
            }
          }
        },
        "@version" : {
          "type" : "keyword"
        }
      }
    },
    "aliases" : { }
}

1、查看pipeline运行监控信息

curl -XGET 'http://logstash实例地址:9600/_node/stats/pipelines/pipeline实例名?pretty'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文