范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

超硬核!11个非常实用的Python和Shell拿来就用脚本实例

  这次再来给大家分享一波我工作中用到的几个脚本,主要分为:Python和Shell两个部分。
  Python 脚本部分实例:企业微信告警、FTP 客户端、SSH 客户端、Saltstack 客户端、vCenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图;
  Shell 脚本部分实例:SVN 完整备份、Zabbix 监控用户密码过期、构建本地 YUM 以及上篇文章中有读者的需求(负载高时,查出占用比较高的进程脚本并存储或推送通知);
  篇幅有些长,还请大家耐心翻到文末,毕竟有彩蛋。
  Python 脚本部分企业微信告警
  此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。# -*- coding: utf-8 -*-
  import requests
  import json
  class DLF:
  def __init__(self, corpid, corpsecret):
  self.url = "https://qyapi.weixin.qq.com/cgi-bin"
  self.corpid = corpid
  self.corpsecret = corpsecret
  self._token = self._get_token
  def _get_token(self):
  """
  获取企业微信API接口的access_token
  :return:
  """
  token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
  try:
  res = requests.get(token_url).json
  token = res["access_token"]
  return token
  except Exception as e:
  return str(e)
  def _get_media_id(self, file_obj):
  get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
  data = {"media": file_obj}
  try:
  res = requests.post(url=get_media_url, files=data)
  media_id = res.json["media_id"]
  return media_id
  except Exception as e:
  return str(e)
  def send_text(self, agentid, content, touser=None, toparty=None):
  send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
  send_data = {
  "touser": touser,
  "toparty": toparty,
  "msgtype": "text",
  "agentid": agentid,
  "text": {
  "content": content
  }
  }
  try:
  res = requests.post(send_msg_url, data=json.dumps(send_data))
  except Exception as e:
  return str(e)
  def send_image(self, agentid, file_obj, touser=None, toparty=None):
  media_id = self._get_media_id(file_obj)
  send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
  send_data = {
  "touser": touser,
  "toparty": toparty,
  "msgtype": "image",
  "agentid": agentid,
  "image": {
  "media_id": media_id
  }
  }
  try:
  res = requests.post(send_msg_url, data=json.dumps(send_data))
  except Exception as e:
  return str(e)
  FTP 客户端
  通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。# -*- coding: utf-8 -*-
  from ftplib import FTP
  from os import path
  import copy
  class FTPClient:
  def __init__(self, host, user, passwd, port=21):
  self.host = host
  self.user = user
  self.passwd = passwd
  self.port = port
  self.res = {"status": True, "msg": None}
  self._ftp = None
  self._login
  def _login(self):
  """
  登录FTP服务器
  :return: 连接或登录出现异常时返回错误信息
  """
  try:
  self._ftp = FTP
  self._ftp.connect(self.host, self.port, timeout=30)
  self._ftp.login(self.user, self.passwd)
  except Exception as e:
  return e
  def upload(self, localpath, remotepath=None):
  """
  上传ftp文件
  :param localpath: local file path
  :param remotepath: remote file path
  :return:
  """
  if not localpath: return "Please select a local file. "
  # 读取本地文件
  # fp = open(localpath, "rb")
  # 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件
  if not remotepath:
  remotepath = path.basename(localpath)
  # 上传文件
  self._ftp.storbinary("STOR " + remotepath, localpath)
  # fp.close
  def download(self, remotepath, localpath=None):
  """
  localpath
  :param localpath: local file path
  :param remotepath: remote file path
  :return:
  """
  if not remotepath: return "Please select a remote file. "
  # 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件
  if not localpath:
  localpath = path.basename(remotepath)
  # 如果localpath是目录的话就和remotepath的basename拼接
  if path.isdir(localpath):
  localpath = path.join(localpath, path.basename(remotepath))
  # 写入本地文件
  fp = open(localpath, "wb")
  # 下载文件
  self._ftp.retrbinary("RETR " + remotepath, fp.write)
  fp.close
  def nlst(self, dir="/"):
  """
  查看目录下的内容
  :return: 以列表形式返回目录下的所有内容
  """
  files_list = self._ftp.nlst(dir)
  return files_list
  def rmd(self, dir=None):
  """
  删除目录
  :param dir: 目录名称
  :return: 执行结果
  """
  if not dir: return "Please input dirname"
  res = copy.deepcopy(self.res)
  try:
  del_d = self._ftp.rmd(dir)
  res["msg"] = del_d
  except Exception as e:
  res["status"] = False
  res["msg"] = str(e)
  return res
  def mkd(self, dir=None):
  """
  创建目录
  :param dir: 目录名称
  :return: 执行结果
  """
  if not dir: return "Please input dirname"
  res = copy.deepcopy(self.res)
  try:
  mkd_d = self._ftp.mkd(dir)
  res["msg"] = mkd_d
  except Exception as e:
  res["status"] = False
  res["msg"] = str(e)
  return res
  def del_file(self, filename=None):
  """
  删除文件
  :param filename: 文件名称
  :return: 执行结果
  """
  if not filename: return "Please input filename"
  res = copy.deepcopy(self.res)
  try:
  del_f = self._ftp.delete(filename)
  res["msg"] = del_f
  except Exception as e:
  res["status"] = False
  res["msg"] = str(e)
  return res
  def get_file_size(self, filenames=[]):
  """
  获取文件大小,单位是字节
  判断文件类型
  :param filename: 文件名称
  :return: 执行结果
  """
  if not filenames: return {"msg": "This is an empty directory"}
  res_l =
  for file in filenames:
  res_d = {}
  # 如果是目录或者文件不存在就会报错
  try:
  size = self._ftp.size(file)
  type = "f"
  except:
  # 如果是路径的话size显示 - , file末尾加/ (/dir/)
  size = "-"
  type = "d"
  file = file + "/"
  res_d["filename"] = file
  res_d["size"] = size
  res_d["type"] = type
  res_l.append(res_d)
  return res_l
  def rename(self, old_name=None, new_name=None):
  """
  重命名
  :param old_name: 旧的文件或者目录名称
  :param new_name: 新的文件或者目录名称
  :return: 执行结果
  """
  if not old_name or not new_name: return "Please input old_name and new_name"
  res = copy.deepcopy(self.res)
  try:
  rename_f = self._ftp.rename(old_name, new_name)
  res["msg"] = rename_f
  except Exception as e:
  res["status"] = False
  res["msg"] = str(e)
  return res
  def close(self):
  """
  退出ftp连接
  :return:
  """
  try:
  # 向服务器发送quit命令
  self._ftp.quit
  except Exception:
  return "No response from server"
  finally:
  # 客户端单方面关闭连接
  self._ftp.close
  SSH 客户端
  此脚本仅用于通过 key连接,如需要密码连接,简单修改下即可。# -*- coding: utf-8 -*-
  import paramiko
  class SSHClient:
  def __init__(self, host, port, user, pkey):
  self.ssh_host = host
  self.ssh_port = port
  self.ssh_user = user
  self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
  self.ssh = None
  self._connect
  def _connect(self):
  self.ssh = paramiko.SSHClient
  self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
  try:
  self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
  except:
  return "ssh connect fail"
  def execute_command(self, command):
  stdin, stdout, stderr = self.ssh.exec_command(command)
  out = stdout.read
  err = stderr.read
  return out, err
  def close(self):
  self.ssh.close
  Saltstack 客户端
  通过 api 对 Saltstack 服务端进行操作,执行命令。#!/usr/bin/env python
  # -*- coding:utf-8 -*-
  import requests
  import json
  import copy
  class SaltApi:
  """
  定义salt api接口的类
  初始化获得token
  """
  def __init__(self):
  self.url = "http://172.85.10.21:8000/"
  self.username = "saltapi"
  self.password = "saltapi"
  self.headers = {"Content-type": "application/json"}
  self.params = {"client": "local", "fun": None, "tgt": None, "arg": None}
  self.login_url = self.url + "login"
  self.login_params = {"username": self.username, "password": self.password, "eauth": "pam"}
  self.token = self.get_data(self.login_url, self.login_params)["token"]
  self.headers["X-Auth-Token"] = self.token
  def get_data(self, url, params):
  """
  请求url获取数据
  :param url: 请求的url地址
  :param params: 传递给url的参数
  :return: 请求的结果
  """
  send_data = json.dumps(params)
  request = requests.post(url, data=send_data, headers=self.headers)
  response = request.json
  result = dict(response)
  return result["return"][0]
  def get_auth_keys(self):
  """
  获取所有已经认证的key
  :return:
  """
  data = copy.deepcopy(self.params)
  data["client"] = "wheel"
  data["fun"] = "key.list_all"
  result = self.get_data(self.url, data)
  try:
  return result["data"]["return"]["minions"]
  except Exception as e:
  return str(e)
  def get_grains(self, tgt, arg="id"):
  """
  获取系统基础信息
  :tgt: 目标主机
  :return:
  """
  data = copy.deepcopy(self.params)
  if tgt:
  data["tgt"] = tgt
  else:
  data["tgt"] = "*"
  data["fun"] = "grains.item"
  data["arg"] = arg
  result = self.get_data(self.url, data)
  return result
  def execute_command(self, tgt, fun="cmd.run", arg=None, tgt_type="list", salt_async=False):
  """
  执行saltstack 模块命令,类似于salt "*" cmd.run "command"
  :param tgt: 目标主机
  :param fun: 模块方法 可为空
  :param arg: 传递参数 可为空
  :return: 执行结果
  """
  data = copy.deepcopy(self.params)
  if not tgt: return {"status": False, "msg": "target host not exist"}
  if not arg:
  data.pop("arg")
  else:
  data["arg"] = arg
  if tgt != "*":
  data["tgt_type"] = tgt_type
  if salt_async: data["client"] = "local_async"
  data["fun"] = fun
  data["tgt"] = tgt
  result = self.get_data(self.url, data)
  return result
  def jobs(self, fun="detail", jid=None):
  """
  任务
  :param fun: active, detail
  :param jod: Job ID
  :return: 任务执行结果
  """
  data = {"client": "runner"}
  data["fun"] = fun
  if fun == "detail":
  if not jid: return {"success": False, "msg": "job id is none"}
  data["fun"] = "jobs.lookup_jid"
  data["jid"] = jid
  else:
  return {"success": False, "msg": "fun is active or detail"}
  result = self.get_data(self.url, data)
  return result
  vCenter 客户端
  通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
  from pyVmomi import vim
  from asset import models
  import atexit
  class Vmware:
  def __init__(self, ip, user, password, port, idc, vcenter_id):
  self.ip = ip
  self.user = user
  self.password = password
  self.port = port
  self.idc_id = idc
  self.vcenter_id = vcenter_id
  def get_obj(self, content, vimtype, name=None):
  """
  列表返回,name 可以指定匹配的对象
  """
  container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
  obj = [ view for view in container.view ]
  return obj
  def get_esxi_info(self):
  # 宿主机信息
  esxi_host = {}
  res = {"connect_status": True, "msg": None}
  try:
  # connect this thing
  si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
  except Exception as e:
  res["connect_status"] = False
  try:
  res["msg"] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
  except Exception as e:
  res["msg"] = "%s: connection error" % (self.ip)
  return res
  # disconnect this thing
  atexit.register(Disconnect, si)
  content = si.RetrieveContent
  esxi_obj = self.get_obj(content, [vim.HostSystem])
  for esxi in esxi_obj:
  esxi_host[esxi.name] = {}
  esxi_host[esxi.name]["idc_id"] = self.idc_id
  esxi_host[esxi.name]["vcenter_id"] = self.vcenter_id
  esxi_host[esxi.name]["server_ip"] = esxi.name
  esxi_host[esxi.name]["manufacturer"] = esxi.summary.hardware.vendor
  esxi_host[esxi.name]["server_model"] = esxi.summary.hardware.model
  for i in esxi.summary.hardware.otherIdentifyingInfo:
  if isinstance(i, vim.host.SystemIdentificationInfo):
  esxi_host[esxi.name]["server_sn"] = i.identifierValue
  # 系统名称
  esxi_host[esxi.name]["system_name"] = esxi.summary.config.product.fullName
  # cpu总核数
  esxi_cpu_total = esxi.summary.hardware.numCpuThreads
  # 内存总量 GB
  esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 /1024
  # 获取硬盘总量 GB
  esxi_disk_total =0
  for ds in esxi.datastore:
  esxi_disk_total += ds.summary.capacity / 1024 / 1024 /1024
  # 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机
  default_configure = {
  "cpu": 4,
  "memory": 8,
  "disk":100
  }
  esxi_host[esxi.name]["vm_host"] =
  vm_usage_total_cpu =0
  vm_usage_total_memory =0
  vm_usage_total_disk =0
  # 虚拟机信息
  for vm in esxi.vm:
  host_info = {}
  host_info["vm_name"] = vm.name
  host_info["power_status"] = vm.runtime.powerState
  host_info["cpu_total_kernel"] = str(vm.config.hardware.numCPU) + "核"
  host_info["memory_total"] = str(vm.config.hardware.memoryMB) + "MB"
  host_info["system_info"] = vm.config.guestFullName
  disk_info = ""
  disk_total =0
  for d in vm.config.hardware.device:
  if isinstance(d, vim.vm.device.VirtualDisk):
  disk_total += d.capacityInKB / 1024 /1024
  disk_info += d.deviceInfo.label + ": " + str((d.capacityInKB) / 1024 / 1024) + " GB" + ","
  host_info["disk_info"] = disk_info
  esxi_host[esxi.name]["vm_host"].append(host_info)
  # 计算当前宿主机可用容量:总量 - 已分配的
  if host_info["power_status"] == "poweredOn":
  vm_usage_total_cpu += vm.config.hardware.numCPU
  vm_usage_total_disk += disk_total
  vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)
  esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
  esxi_memory_free = esxi_memory_total - vm_usage_total_memory
  esxi_disk_free = esxi_disk_total - vm_usage_total_disk
  esxi_host[esxi.name]["cpu_info"] = "Total: %d核, Free: %d核" % (esxi_cpu_total, esxi_cpu_free)
  esxi_host[esxi.name]["memory_info"] = "Total: %dGB, Free: %dGB" % (esxi_memory_total, esxi_memory_free)
  esxi_host[esxi.name]["disk_info"] = "Total: %dGB, Free: %dGB" % (esxi_disk_total, esxi_disk_free)
  # 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源
  if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
  free_allocation_vm_host =0
  else:
  free_allocation_vm_host = int(min(
  [
  esxi_cpu_free / default_configure["cpu"],
  esxi_memory_free / default_configure["memory"],
  esxi_disk_free / default_configure["disk"]
  ]
  ))
  esxi_host[esxi.name]["free_allocation_vm_host"] = free_allocation_vm_host
  esxi_host["connect_status"] = True
  return esxi_host
  def write_to_db(self):
  esxi_host = self.get_esxi_info
  # 连接失败
  if not esxi_host["connect_status"]:
  return esxi_host
  del esxi_host["connect_status"]
  for machine_ip in esxi_host:
  # 物理机信息
  esxi_host_dict = esxi_host[machine_ip]
  # 虚拟机信息
  virtual_host = esxi_host[machine_ip]["vm_host"]
  del esxi_host[machine_ip]["vm_host"]
  obj = models.EsxiHost.objects.create(**esxi_host_dict)
  obj.save
  for host_info in virtual_host:
  host_info["management_host_id"] = obj.id
  obj2 = models.virtualHost.objects.create(**host_info)
  obj2.save
  获取域名 ssl 证书过期时间
  用于 zabbix 告警import re
  import sys
  import time
  import subprocess
  from datetime import datetime
  from io import StringIO
  def main(domain):
  f = StringIO
  comm = f"curl -Ivs https://{domain} --connect-timeout 10"
  result = subprocess.getstatusoutput(comm)
  f.write(result[1])
  try:
  m = re.search("start date: (.*?) .*?expire date: (.*?) .*?common name: (.*?) .*?issuer: CN=(.*?) ", f.getvalue, re.S)
  start_date = m.group(1)
  expire_date = m.group(2)
  common_name = m.group(3)
  issuer = m.group(4)
  except Exception as e:
  return 999999999
  # time 字符串转时间数组
  start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
  start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
  # datetime 字符串转时间数组
  expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
  expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
  # 剩余天数
  remaining = (expire_date-datetime.now).days
  return remaining
  if __name__ == "__main__":
  domain = sys.argv[1]
  remaining_days = main(domain)
  print(remaining_days)
  发送今天的天气预报以及未来的天气趋势图
  此脚本用于给老婆大人发送今天的天气预报以及未来的天气趋势图,现在微信把网页端禁止了,没法发送到微信了,我是通过企业微信进行通知的,需要把你老婆大人拉到企业微信,无兴趣的小伙伴跳过即可。 # -*- coding: utf-8 -*-
  import requests
  import json
  import datetime
  def weather(city):
  url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city
  try:
  data = requests.get(url).json["data"]
  city = data["city"]
  ganmao = data["ganmao"]
  today_weather = data["forecast"][0]
  res = "老婆今天是{} 今天天气概况 城市: {:<10} 时间: {:<10} 高温: {:<10} 低温: {:<10} 风力: {:<10} 风向: {:<10} 天气: {:<10}  稍后会发送近期温度趋势图,请注意查看。
  ".format(
  ganmao,
  city,
  datetime.datetime.now.strftime("%Y-%m-%d"),
  today_weather["high"].split[1],
  today_weather["low"].split[1],
  today_weather["fengli"].split("[")[2].split("]")[0],
  today_weather["fengxiang"],today_weather["type"],
  )
  return {"source_data": data, "res": res}
  except Exception as e:
  return str(e)
  ```
  + 获取天气预报趋势图
  ```python
  # -*- coding: utf-8 -*-
  import matplotlib.pyplot as plt
  import re
  import datetime
  def Future_weather_states(forecast, save_path, day_num=5):
  """
  展示未来的天气预报趋势图
  :param forecast: 天气预报预测的数据
  :param day_num: 未来几天
  :return: 趋势图
  """
  future_forecast = forecast
  dict={}
  for i in range(day_num):
  data =
  date = future_forecast[i]["date"]
  date = int(re.findall("d+",date)[0])
  data.append(int(re.findall("d+", future_forecast[i]["high"])[0]))
  data.append(int(re.findall("d+", future_forecast[i]["low"])[0]))
  data.append(future_forecast[i]["type"])
  dict[date] = data
  data_list = sorted(dict.items)
  date=
  high_temperature =
  low_temperature =
  for each in data_list:
  date.append(each[0])
  high_temperature.append(each[1][0])
  low_temperature.append(each[1][1])
  fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")
  current_date = datetime.datetime.now.strftime("%Y-%m")
  plt.rcParams["font.sans-serif"] = ["SimHei"]
  plt.rcParams["axes.unicode_minus"] = False
  plt.xlabel(current_date)
  plt.ylabel(" ")
  plt.legend(["高温", "低温"])
  plt.xticks(date)
  plt.title("最近几天温度变化趋势")
  plt.savefig(save_path)
  ```
  + 发送到企业微信
  ```python
  # -*- coding: utf-8 -*-
  import requests
  import json
  class DLF:
  def __init__(self, corpid, corpsecret):
  self.url = "https://qyapi.weixin.qq.com/cgi-bin"
  self.corpid = corpid
  self.corpsecret = corpsecret
  self._token = self._get_token
  def _get_token(self):
  """
  获取企业微信API接口的access_token
  :return:
  """
  token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
  try:
  res = requests.get(token_url).json
  token = res["access_token"]
  return token
  except Exception as e:
  return str(e)
  def _get_media_id(self, file_obj):
  get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
  data = {"media": file_obj}
  try:
  res = requests.post(url=get_media_url, files=data)
  media_id = res.json["media_id"]
  return media_id
  except Exception as e:
  return str(e)
  def send_text(self, agentid, content, touser=None, toparty=None):
  send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
  send_data = {
  "touser": touser,
  "toparty": toparty,
  "msgtype": "text",
  "agentid": agentid,
  "text": {
  "content": content
  }
  }
  try:
  res = requests.post(send_msg_url, data=json.dumps(send_data))
  except Exception as e:
  return str(e)
  def send_image(self, agentid, file_obj, touser=None, toparty=None):
  media_id = self._get_media_id(file_obj)
  send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
  send_data = {
  "touser": touser,
  "toparty": toparty,
  "msgtype": "image",
  "agentid": agentid,
  "image": {
  "media_id": media_id
  }
  }
  try:
  res = requests.post(send_msg_url, data=json.dumps(send_data))
  except Exception as e:
  return str(e)
  + main脚本
  # -*- coding: utf-8 -*-
  from plugins.weather_forecast import weather
  from plugins.trend_chart import Future_weather_states
  from plugins.send_wechat import DLF
  import os
  # 企业微信相关信息
  corpid = "xxx"
  corpsecret = "xxx"
  agentid = "xxx"
  # 天气预报趋势图保存路径
  _path = os.path.dirname(os.path.abspath(__file__))
  save_path = os.path.join(_path ,"./tmp/weather_forecast.jpg")
  # 获取天气预报信息
  content = weather("大兴")
  # 发送文字消息
  dlf = DLF(corpid, corpsecret)
  dlf.send_text(agentid=agentid, content=content["res"], toparty="1")
  # 生成天气预报趋势图
  Future_weather_states(content["source_data"]["forecast"], save_path)
  # 发送图片消息
  file_obj = open(save_path, "rb")
  dlf.send_image(agentid=agentid, toparty="1", file_obj=file_obj)
  Shell 脚本部分SVN 完整备份
  通过 hotcopy进行 SVN 完整备份,备份保留 7 天。#!/bin/bash
  # Filename : svn_backup_repos.sh
  # Date : 2020/12/14
  # Author : JakeTian
  # Email : JakeTian@***.com
  # Crontab : 59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/ 2>&1
  # Notes : 将脚本加入crontab中,每天定时执行
  # Description: SVN完全备份
  set -e
  SRC_PATH="/opt/svndata"
  DST_PATH="/data/svnbackup"
  LOG_FILE="$DST_PATH/logs/svn_backup.log"
  SVN_BACKUP_C="/bin/svnadmin hotcopy"
  SVN_LOOK_C="/bin/svnlook youngest"
  TODAY=$(date +"%F")
  cd $SRC_PATH
  ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name "httpd" -a ! -name "bak" | tr -d "./")
  # 创建备份目录,备份脚本日志目录
  test -d $DST_PATH || mkdir -p $DST_PATH
  test -d $DST_PATH/logs || mkdir $DST_PATH/logs
  test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY
  # 备份repos文件
  for repo in $ALL_REPOS
  do
  $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
  # 判断备份是否完成
  if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then
  echo "$TODAY: $repo Backup Success" >> $LOG_FILE
  else
  echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
  fi
  done
  # # 备份用户密码文件和权限文件
  cp -p authz access.conf $DST_PATH/$TODAY
  # 日志文件转储
  mv $LOG_FILE $LOG_FILE-$TODAY
  # 删除七天前的备份
  seven_days_ago=$(date -d "7 days ago" +"%F")
  rm -rf $DST_PATH/$seven_days_ago
  zabbix 监控用户密码过期
  用于 Zabbix 监控 Linux 系统用户(shell 为 /bin/bash 和 /bin/sh)密码过期,密码有效期剩余 7 天触发加自动发现用户。#!/bin/bash
  diskarray=(`awk -F":" "$NF ~ //bin/bash/||//bin/sh/{print $1}" /etc/passwd`)
  length=${#diskarray[@]}
  printf "{ "
  printf "	"""data":["
  for ((i=0;i<$length;i++))
  do
  printf " 		{"
  printf ""{#USER_NAME}":"${diskarray[$i]}"}"
  if [ $i -lt $[$length-1] ];then
  printf ","
  fi
  done
  printf " 	] "
  printf "} "
  检查用户密码过期
  #!/bin/bash
  export LANG=en_US.UTF-8
  SEVEN_DAYS_AGO=$(date -d "-7 day" +"%s")
  user="$1"
  # 将Sep 09, 2018格式的时间转换成unix时间
  expires_date=$(sudo chage -l $user | awk -F":" "/Password expires/{print $NF}" | sed -n "s/^ //p")
  if [[ "$expires_date" != "never" ]];then
  expires_date=$(date -d "$expires_date" +"%s")
  if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then
  echo "1"
  else
  echo "0"
  fi
  else
  echo "0"
  fi
  构建本地YUM
  通过 rsync 的方式同步 yum,通过 nginx 只做 http yum 站点;
  但是 centos6 的镜像最近都不能用了,国内貌似都禁用了,如果找到合适的自行更换地址。#!/bin/bash
  # 更新yum镜像
  RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
  DIR="/app/yumData"
  LogDir="$DIR/logs"
  Centos6Base="$DIR/Centos6/x86_64/Base"
  Centos7Base="$DIR/Centos7/x86_64/Base"
  Centos6Epel="$DIR/Centos6/x86_64/Epel"
  Centos7Epel="$DIR/Centos7/x86_64/Epel"
  Centos6Salt="$DIR/Centos6/x86_64/Salt"
  Centos7Salt="$DIR/Centos7/x86_64/Salt"
  Centos6Update="$DIR/Centos6/x86_64/Update"
  Centos7Update="$DIR/Centos7/x86_64/Update"
  Centos6Docker="$DIR/Centos6/x86_64/Docker"
  Centos7Docker="$DIR/Centos7/x86_64/Docker"
  Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7"
  Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7"
  Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0"
  Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0"
  MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
  # 目录不存在就创建
  check_dir{
  for dir in $*
  do
  test -d $dir || mkdir -p $dir
  done
  }
  # 检查rsync同步结果
  check_rsync_status{
  if [ $? -eq 0 ];then
  echo "rsync success" >> $1
  else
  echo "rsync fail" >> $1
  fi
  }
  check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0
  # Base yumrepo
  #$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1
  # check_rsync_status "$LogDir/centos6Base.log"
  $RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
  check_rsync_status "$LogDir/centos7Base.log"
  # Epel yumrepo
  # $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1
  # check_rsync_status "$LogDir/centos6Epel.log"
  $RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1
  check_rsync_status "$LogDir/centos7Epel.log"
  # SaltStack yumrepo
  # $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1
  # ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
  # check_rsync_status "$LogDir/centos6Salt.log"
  $RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1
  check_rsync_status "$LogDir/centos7Salt.log"
  # ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest
  # Docker yumrepo
  $RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1
  check_rsync_status "$LogDir/centos7Docker.log"
  # centos update yumrepo
  # $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1
  # check_rsync_status "$LogDir/centos6Update.log"
  $RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1
  check_rsync_status "$LogDir/centos7Update.log"
  # mysql 5.7 yumrepo
  # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1
  # check_rsync_status "$LogDir/centos6Mysql5.7.log"
  $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1
  check_rsync_status "$LogDir/centos7Mysql5.7.log"
  # mysql 8.0 yumrepo
  # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1
  # check_rsync_status "$LogDir/centos6Mysql8.0.log"
  $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1
  check_rsync_status "$LogDir/centos7Mysql8.0.log"
  读者需求解答
  负载高时,查出占用比较高的进程脚本并存储或推送通知
  这部分内容是上篇 Shell 脚本实例中底部读者留言的需求,如下:#!/bin/bash
  # 物理cpu个数
  physical_cpu_count=$(egrep "physical id" /proc/cpuinfo | sort | uniq | wc -l)
  # 单个物理cpu核数
  physical_cpu_cores=$(egrep "cpu cores" /proc/cpuinfo | uniq | awk "{print $NF}")
  # 总核数
  total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))
  # 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发
  one_min_load_threshold="$total_cpu_cores"
  five_min_load_threshold=$(awk "BEGIN {print ""$total_cpu_cores"" * "0.8"}")
  fifteen_min_load_threshold=$(awk "BEGIN {print ""$total_cpu_cores"" * "0.7"}")
  # 分别是分钟、五分钟、十五分钟负载平均值
  one_min_load=$(uptime | awk "{print $(NF-2)}" | tr -d ",")
  five_min_load=$(uptime | awk "{print $(NF-1)}" | tr -d ",")
  fifteen_min_load=$(uptime | awk "{print $NF}" | tr -d ",")
  # 获取当前cpu 内存 磁盘io信息,并写入日志文件
  # 如果需要发送消息或者调用其他,请自行编写函数即可
  get_info{
  log_dir="cpu_high_script_log"
  test -d "$log_dir" || mkdir "$log_dir"
  ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log
  ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log
  iostat -dx 1 10 > "$log_dir"/disk_io_10.log
  }
  export -f get_info
  echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" |
  awk "{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }"
  以上,就是今天分享的全部内容了。
  希望大家通过这些案例能够学以致用,结合自身的实际场景进行运用,从而提高自己的工作效率。

方便面的6种隐藏吃法,样样都好吃得很,别只会煮着吃了用三分钟守候泡面的飘香,那碗中的袅袅热气,成就了最平凡的幸福感。在这个快节奏的时代,方便面成为了很多人熬夜加班后的夜宵首选。一碗热气腾腾的泡面,足以消退一身的疲惫。泡面的方便快捷深饺子别再煮着吃啦,试试这种做法,搭配秘制蘸料好吃到爆大家好,我是喵酱每日菜谱,每天一道家常下饭好菜,喜欢我的美食分享记得点赞关注!广东终于迎来了大晴天,风和日丽心情美美的一天,菜市场逛起来!好久没有包饺子吃了,今天果断安排包饺子!逛煮面条最忌开水下锅!煮面师傅透露绝活,面条出锅爽滑劲道不粘坨哈喽,大家好。我们自己在家煮面条时,有没有感觉自己煮的面条吃着一点都不筋道爽滑,还特别容易坨,这时我们不禁疑惑是不是自己买的面条差导致的?今天,就让我们一起来解开这个疑惑吧!其实,四川哪道菜最好吃?经过评选,这5道川菜比较出名,你吃过几道呢众所周知,中国美食有八大菜系,其中川菜是以麻辣爽口著称的,而川菜又被誉为中华料理的集大成者,其中有无数道经典美食,脍炙人口。一直以来,关于川菜中哪道最好吃,一直是纷争不定,其实川菜AirPods怎么选?入手前这些细节须知悉可以看到,去牛秋季不仅全新发布了AirPods3,苹果还悄悄的将AirPodsPro的充电盒更新为了MagSafe充电盒,支持磁吸充电。这样对比三款耳机的区别,你就会发现相比前代,与你分享三个生活方式健康越来越受到人们的重视,怎样才能健康?我与你分享三个生活小常识01。饭后走一走。孩提时,听年长的讲,饭后百步走,活到99。我有两个朋友是给老板开车的,年龄都是30出头,小肚子都是爱上暗色系发色!不用漂发也不再担心掉色,有质感不伤发还显白突然发现,越来越多的美女把头发都染成了似黑非黑的暗色系,原来焦黄毛躁像枯草的头发,变得有光泽又有质感,发质好像也恢复了,而且还很显白。从ins上的日韩发色流行趋势上就可以看出来,暗开春赏花好去处2022年已经过了两个节气,到了春暖花开,各家好儿女相看的好时节。怎么能不选一个既惬意又美丽的地方相看呢?根据某网往年数据显示以下几个地方即实惠又美丽。1。无锡鼋头渚2。南京梅花山女排新教练受质疑,教导李盈莹为时尚早?排协见证,蔡斌高见导读由于蔡斌成为了新一任中国女排的主教练,在球员的起用以及弃用上,以及对于教练组的组建方面,都会有自己的想法,而外界对此也有自己的想法,二者思想高度不一角度不同的情况下自然会产生冲剩米饭别炒着吃,试试这个高级吃法,又香又拉丝,更适合懒人吃最近焗饭比较火,它比盖浇饭更加有特色,也比拌饭更加好吃。最为重要的是看着就很高级,吃起来很有范儿呢!今天我们就来做一款番茄芝士焗饭。烹饪过程非常简单,但成品会有一个极大的提升,难怪1箭22星,见证属于中国航天的辉煌褚悦璠(郑州大学)2月27日,我国用长征八号遥二运载火箭在文昌发射场,以1箭22星的方式,成功发射升空泰景三号01卫星泰景四号01卫星等22颗卫星,它们如天女散花般进入预定轨道,发
当童年古装女神隔空撞衫虽都是美人,仍逃不开谁丑谁尴尬的定律前几期我们盘点了琼瑶剧里女演员们的撞衫趣事,今天我们再来聊一聊早期古装剧中穿同款戏服的女演员。虽然十几二十年前的女演员个个都美得很有辨识度,但当她们隔空或在同剧中撞衫,有时候也很难当童年古装女神隔空撞衫虽都是美人,仍逃不开谁丑谁尴尬的定律前几期我们盘点了琼瑶剧里女演员们的撞衫趣事,今天我们再来聊一聊早期古装剧中穿同款戏服的女演员。虽然十几二十年前的女演员个个都美得很有辨识度,但当她们隔空或在同剧中撞衫,有时候也很难阿甘正传真的是一部喜剧吗?3看完有什么感想呢?是不是感觉是喜剧,又觉得不完全是,为小美感到可惜,又为阿甘接盘感到失望。阿甘正传从教育角度看,阿甘和小美同样凄惨,不同的是阿甘有一个一直鼓励他的妈妈,而小美的爸爸阿甘正传真的是一部喜剧吗?3看完有什么感想呢?是不是感觉是喜剧,又觉得不完全是,为小美感到可惜,又为阿甘接盘感到失望。阿甘正传从教育角度看,阿甘和小美同样凄惨,不同的是阿甘有一个一直鼓励他的妈妈,而小美的爸爸新闻联播的主持人从不低头念稿子,难道他们都背下来了吗?很多人看过新闻联播,无论是康辉海霞,还是李梓萌欧阳夏丹,都很优秀。在屏幕前,他们播报的样子,真的很专业。有人对此有疑问,新闻联播的主持人不低头念稿,真的都背下来了吗?的确是这样,这新闻联播的主持人从不低头念稿子,难道他们都背下来了吗?很多人看过新闻联播,无论是康辉海霞,还是李梓萌欧阳夏丹,都很优秀。在屏幕前,他们播报的样子,真的很专业。有人对此有疑问,新闻联播的主持人不低头念稿,真的都背下来了吗?的确是这样,这不配做男主的7位男星,个个被男配吊打,自己心里没点数吗?众所周知,娱乐圈是一个看脸的地方,颜值高的人往往能够有更好的资源。不过事无绝对,有些明星长得不咋样,却偏偏要在影视剧中演男主,结果却被男主吊打,实在是有些尴尬。一吴建豪想当年,F4不配做男主的7位男星,个个被男配吊打,自己心里没点数吗?众所周知,娱乐圈是一个看脸的地方,颜值高的人往往能够有更好的资源。不过事无绝对,有些明星长得不咋样,却偏偏要在影视剧中演男主,结果却被男主吊打,实在是有些尴尬。一吴建豪想当年,F4使用Lightrun对Java应用程序进行性能调整简介在这篇文章中,我将向你展示使用Lightrun分析一个Java应用程序,这样你就可以发现各种性能调整的改进,你可以应用到你当前的Java应用程序。在上一篇文章中,我解释了什么是使用Lightrun对Java应用程序进行性能调整简介在这篇文章中,我将向你展示使用Lightrun分析一个Java应用程序,这样你就可以发现各种性能调整的改进,你可以应用到你当前的Java应用程序。在上一篇文章中,我解释了什么是电车,油车,我们该怎么选择,是谁在左右消费者?7月22日,据台媒报道,林志颖在桃园市发生车祸,整辆车起火燃烧,林志颖无生命危险,已送往长庚医院接受治疗。图片来源于网络8月1日下午,四川G93成渝环线高速发生一起小轿车起火事故。