操作sql
# !/usr/bin/env python
# -*- coding:utf-8 -*-
"""
@file: sql_exec
@author: frankfyang
@create: 2022/3/16
"""
import json
import pymysql
class MysqlDb(object):
def __init__(self, db):
self.host = db["HOST"]
self.user = db["USER"]
self.password = db["PASSWORD"]
self.database = db["NAME"]
self.port = int(db["PORT"])
self.connect_timeout = 5
self.max_allowed_packet = 16 * 1024 * 1024
self.read_timeout = 10
self.write_timeout = 10
def db_connect(self):
conn = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
connect_timeout=self.connect_timeout,
max_allowed_packet=self.max_allowed_packet,
read_timeout=self.read_timeout,
write_timeout=self.write_timeout,
charset="utf8",
)
cur = conn.cursor()
return conn, cur
def db_close(self, conn, cur):
if conn and cur:
cur.close()
conn.close()
def sql_execute(self, sql, param=None):
conn, cur = self.db_connect()
try:
if conn and cur:
cur.execute(sql, param)
conn.commit()
except Exception as e:
conn.rollback()
raise Exception(e)
finally:
self.db_close(conn, cur)
def sql_select(self, sql, param=None):
result = {"field": [], "data": []}
conn, cur = self.db_connect()
try:
if conn and cur:
cur.execute(sql, param)
result["field"] = [field[0] for field in cur.description]
result["data"] = cur.fetchall()
conn.commit()
except Exception as e:
conn.rollback()
raise Exception(e)
finally:
self.db_close(conn, cur)
return result
def format_data(result):
return (
[
dict(zip([field for field in result["field"]], line))
for line in result["data"]
]
if result["data"]
else []
)
DATABASES = {
"default": {
"NAME": "db_name",
"USER": "db_user",
"PASSWORD": "db_pwd",
"HOST": "db",
"PORT": "3306",
}
}
mysql_db = MysqlDb(DATABASES["default"])
if __name__ == '__main__':
update_sql = """
"""
result = mysql_db.sql_execute(update_sql)
print(result)
添加随机crontab
import os
import random
import json
import hashlib
from utils.crontab_util import Crontab
from utils import sys_util, socket_util
from utils.util_exception import UtilException
EXEC_DURATION_HOURS = 2
def hash_local_ip():
local_ip = socket_util.get_cft_login_ip()
if local_ip is None:
local_ip = "127.0.0.1"
print("local_ip: ".format(local_ip))
hash_object = hashlib.md5(local_ip.encode())
hex_dig = hash_object.hexdigest()
return int(hex_dig, 16)
def gen_dynamic_cron_min():
random_seed = random.randint(0, 30)
return "{},{}".format(random_seed, 30 + random_seed)
def output_result(result, msg):
data = {"result": result, "result_msg": msg}
dataj = json.dumps(data, ensure_ascii=False)
print("OUTPUT_RESULT %s OUTPUT_END" % dataj)
def main():
cron = Crontab(user="product_pub")
current_dir = os.path.dirname(os.path.abspath(__file__))
dynamic_cron_min = gen_dynamic_cron_min()
cron.remove("restart.py")
print("add new random start crontab")
cron.add(
"{} * * * * sleep $(($RANDOM\%10));cd {} && python2 restart.py > /dev/null 2>&1".format(
dynamic_cron_min, current_dir))
jobs = cron.get("restart.py")
if jobs is None or not jobs.strip():
raise UtilException(-1, "add restart crontab failed")
output_result(0, "ok")
if __name__ == '__main__':
main()
根据IP生成随机睡眠时间
import time
import hashlib
from utils import socket_util
def get_sleep_time(random_range):
# get local ip
local_ip = socket_util.get_cft_login_ip()
# hash local ip
hex_dig = hashlib.sha256(local_ip.encode()).hexdigest()
# random 1 - random_range
return int(hex_dig, 16) % random_range + 1
获取dns解析结果
def resolve_domain_to_ips(domain: str) -> Dict[str, List[str]]:
"""
解析域名并返回一个字典,包含域名和对应的 IP 地址列表。
:param domain: 要解析的域名
:return: 包含域名和 IP 地址列表的字典
"""
try:
# 获取域名的所有 IP 地址
ip_addresses = socket.gethostbyname_ex(domain)[2]
# 返回格式化的结果
return {domain: ip_addresses}
except socket.gaierror:
# 处理解析错误
print(f"Could not resolve domain: {domain}")
return {domain: []}
环境初始化
#!/bin/bash
MIN_PY_VERSION="3.6.0"
MAX_PY_VERSION="3.7.0"
echo "#### $(date +'%Y-%m-%d %H:%M:%S') Start ####"
cd $(cd "$(dirname $0)"; pwd)
mkdir -p ../logs &> /dev/null
source /etc/profile &> /dev/null
echo "#### $(date +'%Y-%m-%d %H:%M:%S') Install devel lib ####"
pkgs=("python3-devel" "python38-devel" "mysql-devel")
for pkg in ${pkgs[@]}; do
echo "> $pkg"
if [ $(rpm -qa | grep -w $pkg | wc -l) -le 0 ]; then
echo "installing..."
yum install -y $pkg > /dev/null
fi
echo "installed"
done
python_paths=$(ls -1 /usr/bin/python* | grep '.*[2-3]\(.[0-9]\+\)\?$')
max_version=""
PYBIN=""
echo "#### $(date +'%Y-%m-%d %H:%M:%S') Pick higher python version ####"
for python_path in $python_paths; do
version=$($python_path -V 2> /dev/null | awk '{print $NF}')
if [[ "$version" > "$MAX_PY_VERSION" ]]; then
continue
fi
if [[ "$version" > "$max_version" ]]; then
max_version="$version"
PYBIN="$python_path"
fi
done
echo "> $max_version"
echo "> $PYBIN"
if [[ "$max_version" < "$MIN_PY_VERSION" ]]; then
echo "python3 version must be $MIN_PY_VERSION or higher"
exit 1
fi
echo "#### $(date +'%Y-%m-%d %H:%M:%S') Create virtualenv ####"
if [ ! -f ../venv/bin/python ]; then
if [ `$PYBIN -m 'pip' freeze | grep virtualenv | wc -l` -eq 0 ]; then
$PYBIN -m 'pip' install virtualenv -i https://mirrors.tencent.com/pypi/simple/
fi
$PYBIN -m 'virtualenv' ../venv
if [ $? -ne 0 ]; then
echo "create virtualenv failed"
exit 1
fi
fi
echo "#### $(date +'%Y-%m-%d %H:%M:%S') Install requirements ####"
source ../venv/bin/activate && cd ../ && pip install -r requirements.txt -i https://mirrors.tencent.com/pypi/simple/