Tech Chorus

Logging From Flask Application To ElasticSearch Via Logstash

written by Sudheer Satyanarayana on 2020-06-01

In this blog post, I will explain how to send logs from Flask application to Elasticsearch via Logstash. As a bonus, I will show how to view the logs in Kibana. No prior working knowledge of the Elastic stack is expected.

Prerequisites

Our log pipeline:

Flask application -> python-logstash-async -> Logstash -> Elasticsearch

Flask Logstash Elasticsearch Kibana Pipeline

You can view the logs in Kibana. Or you can query Elasticsearch directly manually or from another application.

Flask application environment setup

For this example, I used the CentOS 8 stream Linux distribution. You are free to use the distribution of your choice. If you are using another distribution adjust the commands.

sudo dnf install python36 vim-enhanced screen
python3 -m venv myenv
source myenv/bin/activate
pip install flask

Minimal Flask application

Let's start with the minimal Flask application from the official quick start guide.

from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello, World!'

Start the application:

export FLASK_APP=hello.py
export FLASK_ENV=development
flask run

Send a request to the Flask application

curl localhost:5000

Add a logging line and view the log entry in stdout

from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_world():
    app.logger.info("Hello there")
    return 'Hello, World!'

Here's my terminal output:

(myenv) [vagrant@dev-c8-01 flask-app]$ flask run
 * Serving Flask app "hello.py" (lazy loading)
 * Environment: development
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 223-107-058
[2020-05-31 13:19:49,981] INFO in hello: Hello there
127.0.0.1 - - [31/May/2020 13:19:49] "GET / HTTP/1.1" 200 -

So far, so good.

Yes, I use Vagrant.

Let's install the Python package python-logstash-async:

pip install python-logstash-async

python-logstash-async helps you ship logs from your Flask application to Logstash. As you can expect from the name, the connection to Logstash is handled asynchronously.

Import the log handler and formatter.

from logstash_async.handler import AsynchronousLogstashHandler
from logstash_async.formatter import FlaskLogstashFormatter

Configure Logstash. In this example, we use Python variables in the hello.py file.

LOGSTASH_HOST = "192.168.200.19"
LOGSTASH_DB_PATH = "/home/vagrant/app-data/flask_logstash.db"
LOGSTASH_TRANSPORT = "logstash_async.transport.BeatsTransport"
LOGSTASH_PORT = 5044

We are stating that the Logstash runs on the IP address, 192.168.200.19 on the TCP port 5044. Remember, the port has to be an integer. The python-logstash-async package offers a few options for the transport protocol. We chose the Beats Transport, because it is one of the popular input sources for Logstash. You can expect Logstash to accept data on the Beats port in many common Logstash configurations. On the server where the Flask application runs, you have to select a path at which python-logstash-async stores temporary data. In my example, it is /home/vagrant/app-data/flask_logstash.db.

Ensure the directory of LOGSTASH_DB_PATH exists. In my case, I did mkdir /home/vagrant/app-data/.

In a sophisticated application, you might want to load the configuration from a file or environment variables.

Configure the Logstash handler

logstash_handler = AsynchronousLogstashHandler(
    LOGSTASH_HOST,
    LOGSTASH_PORT,
    database_path=LOGSTASH_DB_PATH,
    transport=LOGSTASH_TRANSPORT,
)

Configure the formatter

logstash_handler.formatter = FlaskLogstashFormatter(metadata={"beat": "myapp"})

python-logstash-async package provides a formatter specifically for Flask. Some common Logstash configurations listen to Beats input. Hence, set up the metadata for Beats protocol like this.

Attach the handler:

app.logger.addHandler(logstash_handler)

Our application looks like this:

from flask import Flask
app = Flask(__name__)

from logstash_async.handler import AsynchronousLogstashHandler
from logstash_async.formatter import FlaskLogstashFormatter

LOGSTASH_HOST = "192.168.200.19"
LOGSTASH_DB_PATH = "/home/vagrant/app-data"
LOGSTASH_TRANSPORT = "logstash_async.transport.BeatsTransport"
LOGSTASH_PORT = 5044

logstash_handler = AsynchronousLogstashHandler(
    LOGSTASH_HOST,
    LOGSTASH_PORT,
    database_path=LOGSTASH_DB_PATH,
    transport=LOGSTASH_TRANSPORT,
)
logstash_handler.formatter = FlaskLogstashFormatter(metadata={"beat": "myapp"})
app.logger.addHandler(logstash_handler)

@app.route('/')
def hello_world():  
    app.logger.info("Hello there")
    return 'Hello, World!'

Configure The Logstash Server

I run another CentOS 8 Vagrant instance with the IP address 192.168.200.19.

CentOS 8 quick setup:

sudo dnf install epel-release
sudo dnf install vim-enhanced screen

Logstash requires Java 8 or 11.

Install Java 11.

sudo dnf install java-11-openjdk

Import Elasticsearch GPG key.

sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

Create a DNF repository for Elasticsearch OSS packages

Create the file /etc/yum.repos.d/elasticsearch-oss-7.x.repo and put the following contents in it:

[elasticsearch-oss-7.x]
name=Elasticsearch repository for oss-7.x packages
baseurl=https://artifacts.elastic.co/packages/oss-7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

Install Logstash OSS package:

sudo dnf install logstash-oss

Start and enable the logstash systemd unit:

sudo systemctl enable logstash
sudo systemctl start logstash

Configure the logstash pipeline to accept input using the Beats protocol.

Create the file /etc/logstash/conf.d/beats-input.conf and put the following contents in it:

input {
  beats {
    port => 5044
  }
}

To test the Logstash pipeline so far, configure the stdout output. Create the file /etc/logstash/conf.d/stdout.conf and put the following contents in it:

output {
    stdout { codec => rubydebug }
}

Restart logstash

systemctl restart logstash

Depending on your environment, it may take a few seconds for logstash to start the services. Keep this in mind when testing your pipelines.

In a terminal window, watch the logstash logs via journalctl

sudo journalctl -fu logstash

Send a request to the Flask application again.

In the logstash journal you should see something like:


Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]: {
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                "logger_name" => "hello",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:         "req_remote_address" => "127.0.0.1",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                    "req_uri" => "http://localhost:5000/",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                    "program" => "/home/vagrant/flask-app/myenv/bin/flask",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                       "line" => 27,
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                   "req_host" => "localhost",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:              "flask_version" => "1.1.2",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                "req_referer" => nil,
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                  "logsource" => "dev-c8-01.lab.gavika.com",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                       "type" => "python-logstash",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                   "@version" => "1",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                       "tags" => [
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:         [0] "beats_input_codec_plain_applied"
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:     ],
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                      "level" => "INFO",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:               "process_name" => "MainProcess",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                "thread_name" => "Thread-6",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:              "req_useragent" => "",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                    "message" => "Hello there",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                       "path" => "/home/vagrant/flask-app/hello.py",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                "interpreter" => "/home/vagrant/flask-app/myenv/bin/python3",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                 "req_method" => "GET",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                  "func_name" => "hello_world",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                       "host" => "dev-c8-01.lab.gavika.com",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                        "pid" => 3482,
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:                 "@timestamp" => 2020-06-01T11:36:51.705Z,
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:     "logstash_async_version" => "1.6.4",
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]:        "interpreter_version" => "3.6.8"
Jun 01 11:36:53 dev-c8-02.labp.gavika.dev logstash[3124]: }

If you see output like this, the connection between the Flask application and Logstash is good.

Next step is to configure Elasticsearch server and modify the Logstash pipeline.

To keep it simple, I will install and configure Elasticsearch server on the same machine as Logstash. In a production environment, you might want to install Elasticsearch on a few separate servers.

Install Elasticsearch OSS package.

We have the prerequisites - a) Java 11 b) Elasticsearch GPG key c) Elasticsearch OSS repository configuration.

sudo dnf install elasticsearch-oss
sudo systemctl start elasticsearch-oss
sudo systemctl enable elasticsearch-oss

By default, Elasticsearch listens on 127.0.0.1. Change it to listen on the IP address 192.168.200.19. Edit /etc/elasticsearch/elasticsearch.yml:

network.host: 192.168.200.19
discovery.seed_hosts: ["192.168.200.19"]

cluster.initial_master_nodes:
  - node1
cluster.name: log-analytics-cluster
node.data: true
node.master: true
node.name: node1

Having Elasticsearch listen on the IP address 192.168.200.11 allows us to query it from another machine on the same network. We have put

Restart elasticsearch service.

Now, remove the stdout output in the Logstash pipeline. rm /etc/logstash/conf.d/stdout.conf

Add a new output destination in Logstash.

Create the file /etc/logstash/conf.d/elasticsearch-output.conf

output {
  elasticsearch {
    hosts => ["http://192.168.200.19:9200"]
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
  }
}

We are stating that Logstash should send output to the elasticsearch server hosted on 192.168.200.19 on TCP port 9200. The index name should be %{[@metadata][beat]}-%{+YYYY.MM.dd}. Recall that we added @metadata[beat] in our Flask application with value myapp. If the log is sent on 2020-06-01, Logstash will send the output to the elasticsearch index named myapp-2020.06.01. The beat name shuold be in lowercase. This is a generic configuration in Logstash that accepts input on 5044 via the Beats protocal and sends the output to elasticsearch index conveniently named using the beat name and the date.

Restart logstash service.

Send the request again to the Flask application.

List the indices in elasticsearch.

curl '192.168.200.19:9200/_cat/indices'

On my server, I see someting like:

yellow open myapp-2020.06.01 PU8K4humTnWrvAg6OUiEOw 1 1 1 0 18kb 18kb

This means, there is an index called myapp-2020.06.01 on my Elasticsearch server.

Search for the log messages

curl '192.168.200.19:9200/_search?pretty=&q=message=hello'

I see response like this:

{
  "took" : 17,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 0.18232156,
    "hits" : [
      {
        "_index" : "myapp-2020.06.01",
        "_type" : "_doc",
        "_id" : "WLHHb3IBj0Bi87BR6NBQ",
        "_score" : 0.18232156,
        "_source" : {
          "logger_name" : "hello",
          "req_uri" : "http://localhost:5000/",
          "interpreter_version" : "3.6.8",
          "path" : "/home/vagrant/flask-app/hello.py",
          "process_name" : "MainProcess",
          "thread_name" : "Thread-7",
          "req_host" : "localhost",
          "host" : "dev-c8-01.lab.gavika.com",
          "flask_version" : "1.1.2",
          "interpreter" : "/home/vagrant/flask-app/myenv/bin/python3",
          "message" : "Hello there",
          "tags" : [
            "beats_input_codec_plain_applied"
          ],
          "@timestamp" : "2020-06-01T12:07:50.063Z",
          "pid" : 3482,
          "func_name" : "hello_world",
          "line" : 27,
          "level" : "INFO",
          "logsource" : "dev-c8-01.lab.gavika.com",
          "req_remote_address" : "127.0.0.1",
          "req_referer" : null,
          "req_useragent" : "",
          "logstash_async_version" : "1.6.4",
          "program" : "/home/vagrant/flask-app/myenv/bin/flask",
          "req_method" : "GET",
          "@version" : "1",
          "type" : "python-logstash"
        }
      },
      {
        "_index" : "myapp-2020.06.01",
        "_type" : "_doc",
        "_id" : "e6jTb3IB_6K0Q_DqeqUM",
        "_score" : 0.18232156,
        "_source" : {
          "logger_name" : "hello",
          "req_uri" : "http://localhost:5000/",
          "interpreter_version" : "3.6.8",
          "path" : "/home/vagrant/flask-app/hello.py",
          "process_name" : "MainProcess",
          "thread_name" : "Thread-8",
          "req_host" : "localhost",
          "host" : "dev-c8-01.lab.gavika.com",
          "flask_version" : "1.1.2",
          "interpreter" : "/home/vagrant/flask-app/myenv/bin/python3",
          "message" : "Hello there",
          "tags" : [
            "beats_input_codec_plain_applied"
          ],
          "@timestamp" : "2020-06-01T12:20:28.830Z",
          "pid" : 3482,
          "func_name" : "hello_world",
          "line" : 27,
          "level" : "INFO",
          "logsource" : "dev-c8-01.lab.gavika.com",
          "req_remote_address" : "127.0.0.1",
          "req_referer" : null,
          "req_useragent" : "",
          "logstash_async_version" : "1.6.4",
          "program" : "/home/vagrant/flask-app/myenv/bin/flask",
          "req_method" : "GET",
          "@version" : "1",
          "type" : "python-logstash"
        }
      }
    ]
  }
}

Bonus: view the logs in Kibana.

Install Kibana.

sudo dnf install kibana-oss
sudo systemctl start kibana
sudo systemctl enable kibana

Edit /etc/kibana/kibana.yml and add the following contents:

server.host: "192.168.200.19"
elasticsearch.hosts: ["http://192.168.200.19:9200"]

Restart Kibana.

sudo systemctl restart kibana

In your browser, visit http://192.168.200.19:5601/. Click Visualize. Kibana will prompt you to create an index pattern. In the search box type:

myapp-*

Then click Next Step. In the Time Filter field name select @timestamp. Click Create Index Pattern.

Click Discover. You should see your logs on the screen.

Using TLS with Logstash

If you want to use TLS with Logstash, modify your Flask application like this:

LOGSTASH_HOST = "192.168.200.19"
LOGSTASH_DB_PATH = "/home/vagrant/app-data"
LOGSTASH_TRANSPORT = "logstash_async.transport.BeatsTransport"
LOGSTASH_PORT = 5044

LOGTSASH_SSL_ENABLE = True
LOGSTASH_SSL_VERIFY = True
LOGSTASH_CA_CERT_FILE_PATH = "/home/vagrant/logstash.crt"

logstash_handler = AsynchronousLogstashHandler(
    LOGSTASH_HOST,
    LOGSTASH_PORT,
    database_path=LOGSTASH_DB_PATH,
    transport=LOGSTASH_TRANSPORT,
    ssl_enable=LOGTSASH_SSL_ENABLE,
    ssl_verify=LOGSTASH_SSL_VERIFY,
    ca_certs=LOGSTASH_CA_CERT_FILE_PATH,
)

Notice that the certificate file logstash.crt is made available on the server where the Flask application runs.

Production Environment

This blog post explains how to setup log pipeline from Flask application to Elasticsearch via Logstash. In a production environment, the setup should improve in many ways. Here are few things you might consider for production:

Related Posts

Tags: flask logging logs elastic stack elasticsearch elasticsearch-oss kibana kibana-oss logstash logstash-oss kibana log analytics log pipeline python