Heroyf @ heroyf.com

Python Code Snippets

Mar 24 · 5min

操作sql

# !/usr/bin/env python
# -*- coding:utf-8 -*-
"""
@file: sql_exec
@author: frankfyang
@create: 2022/3/16
"""
import json
import pymysql

class MysqlDb(object):

    def __init__(self, db):
        self.host = db["HOST"]
        self.user = db["USER"]
        self.password = db["PASSWORD"]
        self.database = db["NAME"]
        self.port = int(db["PORT"])
        self.connect_timeout = 5
        self.max_allowed_packet = 16 * 1024 * 1024
        self.read_timeout = 10
        self.write_timeout = 10

    def db_connect(self):
        conn = pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=self.database,
            port=self.port,
            connect_timeout=self.connect_timeout,
            max_allowed_packet=self.max_allowed_packet,
            read_timeout=self.read_timeout,
            write_timeout=self.write_timeout,
            charset="utf8",
        )
        cur = conn.cursor()
        return conn, cur

    def db_close(self, conn, cur):
        if conn and cur:
            cur.close()
            conn.close()

    def sql_execute(self, sql, param=None):
        conn, cur = self.db_connect()
        try:
            if conn and cur:
                cur.execute(sql, param)
                conn.commit()
        except Exception as e:
            conn.rollback()
            raise Exception(e)
        finally:
            self.db_close(conn, cur)

    def sql_select(self, sql, param=None):
        result = {"field": [], "data": []}
        conn, cur = self.db_connect()
        try:
            if conn and cur:
                cur.execute(sql, param)
                result["field"] = [field[0] for field in cur.description]
                result["data"] = cur.fetchall()
                conn.commit()
        except Exception as e:
            conn.rollback()
            raise Exception(e)
        finally:
            self.db_close(conn, cur)
        return result

def format_data(result):
    return (
        [
            dict(zip([field for field in result["field"]], line))
            for line in result["data"]
        ]
        if result["data"]
        else []
    )

DATABASES = {
    "default": {
        "NAME": "db_name",
        "USER": "db_user",
        "PASSWORD": "db_pwd",
        "HOST": "db",
        "PORT": "3306",
    }
}

mysql_db = MysqlDb(DATABASES["default"])

if __name__ == '__main__':
    update_sql = """
    """
    result = mysql_db.sql_execute(update_sql)
    print(result)

添加随机crontab

import os
import random
import json
import hashlib

from utils.crontab_util import Crontab
from utils import sys_util, socket_util
from utils.util_exception import UtilException

EXEC_DURATION_HOURS = 2

def hash_local_ip():
    local_ip = socket_util.get_cft_login_ip()
    if local_ip is None:
        local_ip = "127.0.0.1"
    print("local_ip: ".format(local_ip))
    hash_object = hashlib.md5(local_ip.encode())
    hex_dig = hash_object.hexdigest()
    return int(hex_dig, 16)

def gen_dynamic_cron_min():
    random_seed = random.randint(0, 30)
    return "{},{}".format(random_seed, 30 + random_seed)

def output_result(result, msg):
    data = {"result": result, "result_msg": msg}
    dataj = json.dumps(data, ensure_ascii=False)
    print("OUTPUT_RESULT %s OUTPUT_END" % dataj)

def main():
    cron = Crontab(user="product_pub")
    current_dir = os.path.dirname(os.path.abspath(__file__))
    dynamic_cron_min = gen_dynamic_cron_min()
    cron.remove("restart.py")

    print("add new random start crontab")
    cron.add(
        "{} * * * * sleep $(($RANDOM\%10));cd {} && python2 restart.py > /dev/null 2>&1".format(
            dynamic_cron_min, current_dir))

    jobs = cron.get("restart.py")
    if jobs is None or not jobs.strip():
        raise UtilException(-1, "add restart crontab failed")
    output_result(0, "ok")

if __name__ == '__main__':
    main()

根据IP生成随机睡眠时间

import time
import hashlib

from utils import socket_util

def get_sleep_time(random_range):
    # get local ip
    local_ip = socket_util.get_cft_login_ip()
    # hash local ip
    hex_dig = hashlib.sha256(local_ip.encode()).hexdigest()
    # random 1 - random_range
    return int(hex_dig, 16) % random_range + 1

获取dns解析结果

def resolve_domain_to_ips(domain: str) -> Dict[str, List[str]]:
    """
    解析域名并返回一个字典,包含域名和对应的 IP 地址列表。

    :param domain: 要解析的域名
    :return: 包含域名和 IP 地址列表的字典
    """
    try:
        # 获取域名的所有 IP 地址
        ip_addresses = socket.gethostbyname_ex(domain)[2]
        # 返回格式化的结果
        return {domain: ip_addresses}
    except socket.gaierror:
        # 处理解析错误
        print(f"Could not resolve domain: {domain}")
        return {domain: []}

环境初始化

#!/bin/bash

MIN_PY_VERSION="3.6.0"
MAX_PY_VERSION="3.7.0"

echo "#### $(date +'%Y-%m-%d %H:%M:%S') Start ####"
cd $(cd "$(dirname $0)"; pwd)

mkdir -p ../logs &> /dev/null

source /etc/profile &> /dev/null

echo "#### $(date +'%Y-%m-%d %H:%M:%S') Install devel lib ####"
pkgs=("python3-devel" "python38-devel" "mysql-devel")

for pkg in ${pkgs[@]}; do
  echo "> $pkg"
  if [ $(rpm -qa | grep -w $pkg | wc -l) -le 0 ]; then
      echo "installing..."
      yum install -y $pkg > /dev/null
  fi
  echo "installed"
done

python_paths=$(ls -1 /usr/bin/python* | grep '.*[2-3]\(.[0-9]\+\)\?$')
max_version=""
PYBIN=""

echo "#### $(date +'%Y-%m-%d %H:%M:%S') Pick higher python version ####"
for python_path in $python_paths; do
    version=$($python_path -V 2> /dev/null | awk '{print $NF}')
    if [[ "$version" > "$MAX_PY_VERSION" ]]; then
      continue
    fi
    if [[ "$version" > "$max_version" ]]; then
      max_version="$version"
      PYBIN="$python_path"
    fi
done

echo "> $max_version"
echo "> $PYBIN"
if [[ "$max_version" < "$MIN_PY_VERSION" ]]; then
    echo "python3 version must be $MIN_PY_VERSION or higher"
    exit 1
fi

echo "#### $(date +'%Y-%m-%d %H:%M:%S') Create virtualenv ####"
if [ ! -f ../venv/bin/python ]; then
  if [ `$PYBIN -m 'pip' freeze | grep virtualenv | wc -l`  -eq 0 ]; then
    $PYBIN -m 'pip' install virtualenv -i https://mirrors.tencent.com/pypi/simple/
  fi
  $PYBIN -m 'virtualenv' ../venv
  if [ $? -ne 0 ]; then
    echo "create virtualenv failed"
    exit 1
  fi
fi

echo "#### $(date +'%Y-%m-%d %H:%M:%S') Install requirements ####"
source ../venv/bin/activate && cd ../ && pip install -r requirements.txt -i https://mirrors.tencent.com/pypi/simple/