arterhuo / blog

1 stars 1 forks source link

Elasticsearch 创建索引,删除索引,设置warm脚本 #13

Open huoarter opened 5 years ago

huoarter commented 5 years ago
#!/usr/bin/env python
# coding: utf-8
# author: ertao.xu
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
from optparse import OptionParser
from requests.auth import HTTPBasicAuth
import yaml
import requests
import datetime
import os
import logging
import logging.handlers
import json
import arrow

import socket
from tenacity import retry, stop_after_attempt, wait_fixed

logger = logging.getLogger("logger")
handler1 = logging.StreamHandler()
handler2 = logging.FileHandler(filename="/var/log/es_scripts.log")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
handler1.setFormatter(formatter)
handler2.setFormatter(formatter)

logger.addHandler(handler1)
logger.addHandler(handler2)

s = requests.session()
config = os.path.dirname(os.path.abspath(__file__))+"/config.yaml"

yersterday = arrow.now().shift(days=-1).format('YYYY.MM.DD')
today = arrow.now().format('YYYY.MM.DD')
tomorrow = arrow.now().shift(days=1).format('YYYY.MM.DD')

class ES(object):

    def __init__(self, settings):
        self.settings = settings
        self.host = settings.get("host")
        self.name = settings.get("name")
        self.user = settings.get("user")
        self.password = settings.get("password")
        self.env = settings.get("env")
        self.info = settings.get("indices", {})
        self.box_type = settings.get("box_type", False)
        self.hot_days = settings.get("hot_days")
        self.delete = settings.get("delete", False)
        self.create = settings.get("create", False)
        self.writeSreIndex()

    def writeSreIndex(self):
        data = {}
        for index, info in self.info.items():
            data["name"] = index
            data.update(info)
            res = s.post(self.host+"/sre-index/doc/{0}".format(index),
                         auth=HTTPBasicAuth(self.user, self.password),
                         json=data)
            logger.info(res.text)

    def getIndices(self):
        response = s.get(self.host+"/_cat/indices?bytes=gb",
                         auth=HTTPBasicAuth(self.user, self.password))
        return response.text

    def backupKibana(self):
        query = {"from": 0, "size": 10000}
        response = s.get(self.host+"/.kibana/_search",
                         params=query, auth=HTTPBasicAuth(self.user,
                                                          self.password))
        with open('/data/kibana_backup/'+self.name+"-"+today, 'wb') as f:
            f.write(response.text)

    def calIsNotDespire(self, index_prefix, index_time):
        days = self.info.get(index_prefix, {}).get("days")
        if not days:
            days = self.info.get("default", {}).get("days")
        try:
            index_timestamp = int(datetime.datetime.strptime(index_time,
                                  "%Y.%m.%d").strftime("%s"))
            index_settime = int((datetime.datetime.now() -
                                 datetime.timedelta(days=days)).strftime("%s"))
        except Exception as e:
            logger.exception(index_prefix + "-" + str(index_time) +
                             " " + str(e))
            return False
        else:
            if index_settime > index_timestamp:
                return True
            else:
                return False

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
    def deleteIndices(self):
        if not self.delete:
            logger.info('''{0} {1} does not allow to
                        be deleted by scripts.'''.format(self.name, self.host))
            return False
        for line in self.getIndices().strip().split("\n"):
            line_sp = line.split()
            if not line_sp or '-20' not in line_sp[2]:
                continue
            else:
                index_prefix = "-".join(line_sp[2].split("-")[0:-1])
            if not index_prefix or index_prefix.startswith("."):
                continue
            index_time = line_sp[2].split("-")[-1]
            if self.calIsNotDespire(index_prefix, index_time):
                response = s.delete(self.host+"/{0}".format(
                    index_prefix + "-" + index_time),
                    auth=HTTPBasicAuth(self.user, self.password))
                logger.info(index_prefix + "-" + index_time + response.text)

    def parseIndices(self):
        result = {}
        for line in self.getIndices().split("\n"):
            line_sp = line.split()
            if not line_sp or '-20' not in line_sp[2]:
                continue
            else:
                index_prefix = "-".join(line_sp[2].split("-")[0:-1])
            if not index_prefix or index_prefix.startswith("."):
                continue
            index_size = line_sp[-1]
            if index_prefix not in result:
                result.setdefault(index_prefix, {})["num"] = 1
                result.setdefault(index_prefix, {})["size"] = int(index_size)
            else:
                result[index_prefix]["size"] += int(index_size)
                result[index_prefix]["num"] += 1
        return result

    def calIndicesShards(self):
        result = self.parseIndices()
        for k, v in result.items():
            index_prefix = k
            index_avgsize = result[index_prefix]["size"]/result[
                            index_prefix]["num"]
            if index_avgsize < 10:
                index_shards = 1
            elif index_avgsize < 60:
                index_shards = 2
            else:
                index_shards = index_avgsize/60 + 2
            result[index_prefix]["shards"] = index_shards
        return result

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
    def createHotIndices(self):
        if not self.create:
            logger.info(
                "{0} {1} do not allow to create index by scripts.".format(
                    self.name, self.host))
            return False
        data = self.calIndicesShards()
        for index, v in data.items():
            settings = {
               "settings": {
                   "number_of_replicas": 0,
                   "index.routing.allocation.include.box_type": "hot"
               }
            }
            res = s.put(self.host+"/{0}".format(index+"-{0}".format(tomorrow)),
                        auth=HTTPBasicAuth(self.user, self.password),
                        json=settings)
            logger.info(res.text)

    def calIsNotHotIndex(self, index_prefix, index_time):
        try:
            index_timestamp = arrow.get(index_time).timestamp
            index_settime = arrow.get(arrow.now().shift(days=int("-{0}".format(
                self.hot_days))).format("YYYY.MM.DD")).timestamp
        except Exception as e:
            logger.exception(index_prefix + "-" + str(index_time) +
                             " " + str(e))
            return False
        else:
            if index_settime < index_timestamp:
                return True
            else:
                return False

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
    def setIndicesToWarm(self):
        settings = {
            "settings": {
                "index.routing.allocation.include.box_type": "warm",
                "index.routing.allocation.total_shards_per_node": None
            }
        }

        if not self.box_type:
            logger.info("{0} {1} don't allow to be settings by scripts".format(
                self.name, self.host))
            return False
        for line in self.getIndices().split("\n"):
            line_sp = line.split()
            if not line_sp or '-20' not in line_sp[2] or \
               today in line_sp[2] or tomorrow in line_sp[2]:
                continue
            else:
                index_prefix = "-".join(line_sp[2].split("-")[0:-1])
            if not index_prefix or index_prefix.startswith("."):
                continue
            index_time = line_sp[2].split("-")[-1]
            if self.calIsNotHotIndex(index_prefix, index_time):
                logger.info("{0}-{1} is hot index".format(index_prefix,
                                                          index_time))
            else:
                logger.info("{0}-{1} is not hot index".format(index_prefix,
                                                              index_time))
                res = s.put(self.host+"/{0}-{1}/_settings".format(index_prefix,
                            index_time), json=settings,
                            auth=HTTPBasicAuth(self.user, self.password))
                logger.info(
                    '''put index {0}-{1} box_type
                    to warm, response is {2}'''.format(
                        index_prefix, index_time, res.text))

def tographite(value):
    try:
        HOST = '192.168.17.163'
        PORT = 2003
        s = socket.socket()
        s.connect((HOST, PORT))
    except Exception as e:
        logger.exception(e)
    else:
        print(value.lower() + " {0}\n".format(arrow.now().timestamp))
        s.sendall(value.lower() + " {0}\n".format(arrow.now().timestamp))
        s.close()

def delete():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            es.deleteIndices()
        except Exception as e:
            logger.exception(e)
            tographite("manage_es.{0}.delete 0".format(setting['name']))
        else:
            tographite("manage_es.{0}.delete 1".format(es.name))
    f.close()

def create():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            logger.info('{0} {1} is starting'.format(es.name, es.host))
            es.createHotIndices()
        except Exception as e:
            logger.exception(e)
            logger.info('{0} {1} is exception'.format(setting['name'],
                                                      setting['host']))
            tographite("manage_es.{0}.create 0".format(setting['name']))
        else:
            tographite("manage_es.{0}.create 1".format(es.name))
            logger.info('{0} {1} is end'.format(es.name, es.host))
    f.close()

def setting():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            es.setIndicesToWarm()
        except Exception as e:
            logger.exception(e)
            tographite("manage_es.{0}.setting 0".format(setting['name']))
        else:
            tographite("manage_es.{0}.setting 1".format(es.name))
    f.close()

def backup():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            es.backupKibana()
        except Exception as e:
            logger.exception(e)
    f.close()

if __name__ == "__main__":
    usage = "usage: %prog -m (delete|create|setting)"
    parser = OptionParser(usage=usage)
    parser.add_option("-m",
                      dest="module", help='''choise
                      execute module(delete|create|setting|backup)''')
    (options, args) = parser.parse_args()
    if options.module and options.module == "delete":
        delete()
    elif options.module and options.module == "create":
        create()
    elif options.module and options.module == "setting":
        setting()
    elif options.module and options.module == "backup":
        backup()
    else:
        parser.print_help()
huoarter commented 5 years ago
- name: LogES
  host: 
  user: elastic
  password: 
  indices:
    default:
      days: 7
      owner: sre
    logstash:
      days: 6
      owner: sre
  delete: true

- name: BizES
  host: 
  user: elastic
  password: 
  indices:
    default:
      days: 30
      owner: kui.sun
    prism:
      days: 30
      owner: kui.sun
    sre-index:
      days: 30
      owner: sre
    client-filet-invokefailed:
      days: 180
      owner: shaolin
    client-filet-status:
      dasy: 180
      owner: shaolin
    store-audit:
      dasy: 30
      owner: huangsong
  delete: true

- name: LogES2
  host: 
  user: elastic
  password: 
  indices:
    default:
      days: 7
      owner: sre
    logstash:
      days: 6
      owner: sre
  box_type: true
  hot_days: 2
  create: true
  delete: true
huoarter commented 4 years ago

import json import requests s = requests.session() with open("LogES2-2019.10.13") as f: data = f.read() for i in json.loads(data).get("hits").get("hits"): _type =i.get("_type") _id = i.get("_id") res = s.put("http://loges:9200/.kibana/{0}/{1}".format(_type,_id), json=i.get('_source')) print res.text