A quick walkthrough of Logstash, the ETL engine offered by the Elastic Stack.
Logstash is an open source, server-side data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it to your favorite stash
Logstash gained its initial popularity with log and metric collection, such as log4j
logs, Apache web logs and syslog
. Its application has broadened, to all kinds of data sources like large scale event streams, webhooks, database and message queue integration. Once data is transformed and cleaned up is routed to a final destination (i.e. the stash), Elasticsearch is one option, but lots of other choices are there (mongo, S3, Nagios, IRC, email).
After unpacking the official tarball, to run all you need is a pipeline configuration, such as:
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
And to run:
bin/logstash -f simple.conf
Simply typing some stdin foo
and bar
see the following:
foo
{
"@timestamp" => 2018-12-08T08:16:50.572Z,
"host" => "bigdatabox",
"message" => "foo",
"@version" => "1"
}
bar
{
"@timestamp" => 2018-12-08T11:55:10.303Z,
"host" => "bigdatabox",
"message" => "bar",
"@version" => "1"
}
Given Elasticsearch was the stash, should see a newly created index by visiting http://localhost:9200/_cat/indices
:
yellow open logstash-2018.12.08 FW365UQGTrqUZggbKzV95A 5 1 2 0 9.2kb 9.2kb
The index contains two documents. http://localhost:9200/logstash-2018.12.08/_search
shows:
{
"_index" : "logstash-2018.12.08",
"_type" : "doc",
"_id" : "bm-rjWcB1pKAKB8739Ua",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2018-12-08T11:55:10.303Z",
"host" : "bigdatabox",
"message" : "bar",
"@version" : "1"
}
},
{
"_index" : "logstash-2018.12.08",
"_type" : "doc",
"_id" : "bW_jjGcB1pKAKB87_dWH",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2018-12-08T08:16:50.572Z",
"host" : "bigdatabox",
"message" : "foo",
"@version" : "1"
}
}
The unit of work in Logstash is the event, and are modelled as documents.
Execution Model
Coined a pipeline (similar to other middleware I’ve delt with), represents one logical flow of (events) data. Pipeline ingest data through inputs (protocol specific), pass them through an internal queue, and then hand them off to workers to process (filter and output). The queue (pub/sub) between the input and workers enables some crazy scale options if useful.
A Logstash instance is the Logstash process itself, and may contain many pipelines.
Queuing and Guaranteed Delivery
Logstash provides a few different queue types, depending on the level of guarantees needed.
The In-Memory Queue is blazing fast however is not durable. The Persistence Queue on the other hand ensures data is written to disk until delivered, but comes with a performance penality.
Logstash provides at least once delivery guarantee, while more conservative, puts the emphasis on message delivery. Practically speaking this equates to exactly once in general use, but in the case of unclean shutdown with the persistence queue for example, a worker section may be re-dispatched resulting in a duplicate message. This highlights the importance of idempotent operations (i.e. operations that can deal with being re-executed with the same message, without causing undesirable side effects).
Dead Letter Queue (DLQ)
Used for storing messages that are undeliverable (i.e. reprocessing them is futile). Logstash can either drop and log, or send them on to the DLQ (for possible future replay).
Configuration
A specific pipeline is defined by creating a conf
file, which is yaml based. A section for each category of plugin is available, input plugins, output plugins and filter plugins.
input {
...
}
# implicit queue (back pressure etc)
filter {
...
}
output {
...
}
Global configuration (logstash.yml)
Defines instance level config, which can be defined either in hierarchical form:
pipeline:
batch:
size: 125
delay: 5
Or as flat keys:
pipeline.batch.size: 125
pipeline.batch.delay: 5
For example, setting node.name uniquely id’s a Logstash node, which is particularly useful when using metrics. Can be set to a static value node.name: darthvadar
or the hostname node.name: ${HOSTNAME}
.
Pipeline configuration (pipelines.yml)
Complimentary to the specific pipeline configuration files, Logstash provides some core configuration in the config
dir. Of note is pipelines.yml
. Formatted in YAML can define a list of dictionaries, where each dictionary describes a pipelines. Multiple pipelines can be defined, but doesn’t have to. If fact, its still useful to deinfe a single pipeline in this file, as it allows you to invoke the logstash
executable without any additional arguments. A single pipeline:
- pipeline.id: ben-test
queue.type: persisted
path.config: /home/ben/Downloads/logstash-6.5.2/config/ben-simple.conf
Or many pipelines:
- pipeline.id: test
pipeline.workers: 1
pipeline.batch.size: 1
config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
- pipeline.id: another_test
queue.type: persisted
path.config: "/tmp/logstash/*.config"
pipelines.yml
has available arguments commented inline within the file. Settings relate to not only the functional aspects of pipeline configurations, but also non-functional concerns around the execution model, such as the queue type or the number of workers.
Pipeline Examples
JDBC to Elasticsearch
input {
jdbc {
jdbc_driver_library => "/opt/sqljdbc_6.4/enu/mssql-jdbc-6.4.0.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://odin:1433;databaseName=FooDb"
jdbc_user => "logstash"
jdbc_password => "mypassword"
jdbc_validate_connection => true
jdbc_statement => "SELECT Id, FamilyName, GivenName, EmailAddress, IsActive, CreatedBy, CreatedDate, ModifiedBy, ModifiedDate FROM Customers WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate ASC"
schedule => "* * * * *"
use_column_value => true
tracking_column => "modifieddate" #lower_case_column_names are lower
tracking_column_type => ["timestamp"]
clean_run => false
record_last_run => true
lowercase_column_names => true
}
}
filter {
mutate {
remove_field => ["@version", "path", "host", "message", "tags", "@timestamp"]
}
}
output {
stdout { codec => "dots" }
elasticsearch {
hosts => [ "elasticcluster:9200" ]
index => [ "customers-%{+yyyy.MM.dd}" ]
user => "elastic"
password => "mypassword"
document_id => "%{id}"
}
}
This showcases some useful JDBC input plugin features:
tracking_column
uses this field in the result set to keep track of the last ingested record. Only tuples more recent than the last will be ingested.statement
two things, (1) notice theORDER BY
clause which is essential because using thetracking_column
which keeps a journal of the latestModifiedDate
it comes across - only records with a laterModifiedDate
are ingested, and (2) the:sql_last_value
bind variable which restricts the query to only records that have changed in the source database since logstash last ingested.record_last_run
the log of the latestModifiedDate
discovered by Logstash so far. By default stored in the$HOME
of the account running Logstash, in a small log file called.logstash_jdbc_last_run
is used to track the value of the lasttracked_column
field.
Mapping flat source to hierarchical JSON structure
Look to the mutate filter.
mutate {
rename => {
"deviceip" => "[IP][device]"
"srcip" => "[IP][source]"
"dstip" => "[IP][destination]"
}
}