fluent / fluent-logger-python

A structured logger for Fluentd (Python)
http://fluentd.org/
Other
444 stars 138 forks source link

How to send logs to FluentD remotely from GCP? #165

Closed rileyhun closed 4 years ago

rileyhun commented 4 years ago

I've already deployed FluentD to Kubernetes successfully, but I'm a little confused as to what is the name of the fluentd host to pass into my Flask Application. Is it "elasticsearch-client.infra.svc.cluster.local"? Does anyone have any guidance or instruction on this? Pardon my ignorance.

Here are my example files

config map

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: infra
data:
  fluent.conf: |
    <match fluent.**>
        # this tells fluentd to not output its log on stdout
        @type null
    </match>
    # here we read the logs from Docker's containers and parse them
    <source>
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/app.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>
    # we use kubernetes metadata plugin to add metadatas to the log
    <filter kubernetes.**>
        @type kubernetes_metadata
    </filter>

    # takes the messages sent over TCP
    <source>
      @type forward
      port 24224
    </source>

    <match app.**>
      @type stdout
    </match>

     # we send the logs to Elasticsearch
    <match **>
       @type elasticsearch_dynamic
       @log_level info
       include_tag_key true
       host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
       port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
       user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
       password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
       scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
       ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
       reload_connections true
       logstash_format true
       logstash_prefix logstash
       <buffer>
           @type file
           path /var/log/fluentd-buffers/kubernetes.system.buffer
           flush_mode interval
           retry_type exponential_backoff
           flush_thread_count 2
           flush_interval 5s
           retry_forever true
           retry_max_interval 30
           chunk_limit_size 2M
           queue_limit_length 32
           overflow_action block
       </buffer>
    </match>

daemon-set file

apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: infra
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: fluentd
  namespace: infra
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - namespaces
  verbs:
  - get
  - list
  - watch
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: fluentd
roleRef:
  kind: ClusterRole
  name: fluentd
  apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
  name: fluentd
  namespace: infra
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: infra
  labels:
    k8s-app: fluentd-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      serviceAccount: fluentd # if RBAC is enabled
      serviceAccountName: fluentd # if RBAC is enabled
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.1-debian-elasticsearch
        env:
        - name:  FLUENT_ELASTICSEARCH_HOST
          value: "elasticsearch-client.infra.svc.cluster.local"
        - name:  FLUENT_ELASTICSEARCH_PORT
          value: "80"
        - name: FLUENT_ELASTICSEARCH_SCHEME
          value: "http"
        - name: FLUENT_ELASTICSEARCH_USER # even if not used they are necessary
          value: "elastic"
        - name: FLUENT_ELASTICSEARCH_PASSWORD # even if not used they are necessary
          valueFrom:
            secretKeyRef:
              name: elasticsearch-pw-elastic
              key: password
        resources:
          limits:
            memory: 200Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluentd-config
          mountPath: /fluentd/etc # path of fluentd config file
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluentd-config
        configMap:
          name: fluentd-config # name of the config map we will create

flask application controller

from flask import request, abort
from flask_restplus import Resource
from ..util.dto import ModelServiceDto
import numpy as np
import pandas as pd
import mlflow.sklearn
import os
from fluent import sender, event

TENANT = os.getenv('TENANT', 'local')
FLUENTD_HOST = os.getenv('FLUENTD_HOST')
FLUENTD_PORT = os.getenv('FLUENTD_PORT')

# load model
mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
mlflow.set_experiment('clean_customer_data')
runs = mlflow.search_runs()
runs = runs[runs['tags.mlflow.runName'] == 'military_gov_entities']
artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']

model = mlflow.sklearn.load_model(artifact_uri + '/model')

api = ModelServiceDto.api
_entities = ModelServiceDto.entities

@api.errorhandler(Exception)
def handle_custom_exception(error):
    """Return a custom message and status code"""
    return {'error': {'entity_name': "'entity_name' cannot be empty list"},
            'message': str(error)}, error.code

@api.route('/predict')
class PredictiveModel(Resource):

    @api.expect(_entities, validate=True)
    @api.doc('classify entity as military/government/other')
    def post(self):
        data = request.json['entity_name']
        if not data:
            abort(400)

        input_data = np.array(data)
        predictions = model.predict(input_data)
        prediction_probs = pd.DataFrame(model.predict_proba(input_data), columns=model.classes_)
        prediction_probs['confidence'] = prediction_probs.max(axis=1)
        res = pd.DataFrame({'entity_name': input_data,
                             'predicted_classification': predictions,
                             'probability score': prediction_probs['confidence'].tolist()}).to_dict(orient='records')
        if FLUENTD_HOST:
            logger = sender.FluentSender(TENANT, host=FLUENTD_HOST, port=int(FLUENTD_PORT))
            for r in res:
                print('logging {}'.format(r))
                if not logger.emit('prediction', r):
                    print(logger.last_error)
                    logger.clear_last_error()

            logger.close()
        return res
arcivanov commented 4 years ago

@rileyhun Project issue tracker is an inappropriate venue to receive help with your particular application deployment in K8S. You should try Stackoverflow or Reddit.