本篇记录一些可以直接复用的代码片段。持续更新。
代码网站(可以找找有没有现成的):http://outofmemory.cn/code-snippet/c/python/
Flask JWT认证
#!/usr/bin/env python3
# -*-coding:utf-8-*-
# @Version: Python 3
# Flask JWT 验证
import time
from adslproxy.db import RedisClient
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import SignatureExpired, BadSignature, BadData
from flask import Flask, jsonify, request, abort
app = Flask(__name__)
# 用于存储已经保存的账户信息
redis_cli = RedisClient()
#############################################################
secret_key = 'PMF9IAnk16KVbUel'
salt = 'jR9kK3KjYDN79t6s'
access_token_expires_in = 60 * 60 * 5
refresh_token_expires_in = 60 * 60 * 6
def genTokenSeq(user):
"""
# 生成token
:param user: 输入用户名
:return: 两个token
"""
access_token_gen = Serializer(secret_key=secret_key, salt=salt, expires_in=access_token_expires_in)
refresh_token_gen = Serializer(secret_key=secret_key, salt=salt, expires_in=refresh_token_expires_in)
timestamp = time.time()
access_token = access_token_gen.dumps({
"userid": user,
"iat": timestamp
})
refresh_token = refresh_token_gen.dumps({
"userid": user,
"iat": timestamp
})
data = {
"access_token": str(access_token, 'utf-8'),
"access_token_expire_in": access_token_expires_in,
"refresh_token": str(refresh_token, 'utf-8'),
"refresh_token_expire_in": refresh_token_expires_in,
}
return data
def validateToken(token):
"""
# 解析token
:param token: 输入toke
:return: 解析结果
"""
s = Serializer(secret_key=secret_key, salt=salt)
try:
data = s.loads(token)
except SignatureExpired:
return jsonify({'code': 401, 'message': 'toekn expired'}) # token过期
except BadSignature as e:
encoded_payload = e.payload
if encoded_payload is not None:
try:
s.load_payload(encoded_payload)
except BadData:
return jsonify({'code': 401, 'message': 'token tampered'}) # token篡改
return jsonify({'code': 401, 'message': 'badSignature of token'}) # 签名有误
except Exception:
return jsonify({'code': 401, 'message': 'wrong token with unknown reason'}) # 令牌错误
if 'userid' not in data:
return jsonify({'code': 401, 'message': 'illegal payload inside'}) # 非法载荷
return jsonify({'code': 200, 'userid': data['userid'], 'message': f"user({data['userid']}) logged in by token."})
###############################################################
# API
@app.route('/login', methods=['POST'])
def login():
"""
客户端发送json过来
{
"username":"admin",
"password":"12345678"
}
"""
json_data = request.get_json()
username = json_data.get('username')
password = json_data.get('password')
if username is None or password is None:
abort(400)
# 这里校验账户是否合法,我这里用redis简单对比;关系型数据库需要自行修改。
# 这里使用了redis做AB数据集切换(账户密码是定时从配置文件读取并更新的),redis方法是自己封装的。
list_key = RedisClient(list_key='ab_set').get('a_or_b')
if RedisClient(list_key=list_key).get(username) == password:
return genTokenSeq(username)
else:
abort(400)
@app.route('/', methods=['POST'])
def index():
"""
客户端发送json过来
{
"token":"token-str",
}
"""
json_data = request.get_json()
print(json_data)
token = ''
if json_data:
token = json_data.get('token')
else:
abort(400)
if token:
data = validateToken(token)
return data, 200
else:
abort(400)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
参考资料:
https://mp.weixin.qq.com/s/S32HM0yvuy8ptSKYjsKu6A
伪造IP
简单的伪造headers;伪造:HTTP_X_FORWARDED_FOR;严格反扒情况下是检测REMOTE_ADDR,这个是客户端无法伪造的(可能是用户或者是代理服务器的IP);详情见扩展阅读。
import socket
import struct
import random
import requests
def createHeader():
ip = socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
'CLIENT-IP': ip,
'X-FORWARDED-FOR': ip
}
return headers
if __name__ == '__main__':
headers = createHeader()
html = requests.post('http://members.3322.org/dyndns/getip', headers=headers)
print(html.text)
扩展阅读:
https://gargantuax.github.io/blog/2017-02/%E4%BD%A0%E7%9C%9F%E7%9A%84%E4%BA%86%E8%A7%A3ip%E5%90%97php%E5%A6%82%E4%BD%95%E4%B8%A5%E6%A0%BC%E8%8E%B7%E5%8F%96%E7%9C%9F%E5%AE%9E%E7%94%A8%E6%88%B7ip/
https://www.urlteam.org/2016/08/%E4%BB%BF%E4%BA%BA%E5%8C%96%E4%BC%AA%E9%80%A0%E8%AF%B7%E6%B1%82%E5%A4%B4/
PTL
打开在线图片之后关闭
import requests
from io import BytesIO
import psutil
img_url = 'https://www.leolan.top/usr/themes/default/images/thumbs/3.jpg'
image = Image.open(BytesIO(requests.get(img_url).content))
# 这里使用了psutil库,先记录已有进程,然后show()图片,之后再找出进程杀死。
process_list = []
for proc in psutil.process_iter():
process_list.append(proc)
# 展示图片
image.show()
# 杀死进程
for proc in psutil.process_iter():
if not proc in process_list:
proc.kill()
参考资料:
https://blog.csdn.net/ygfrancois/article/details/84781087
通知、消息
微信小号、群消息
微信通知需要准备一个小号,而且容易被封。不方便的一点是需要扫码登陆。
参考:https://wxpy.readthedocs.io/zh/latest/index.html
企业微信
首先管理员登陆企业微信后台创建应用,设置通知范围等,并记录必要信息。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import json
import requests
import datetime
# 接收信息的用户列表(即用户的账户;也可设置成群组,见参考资料)
touser = 'userid1|userid2'
# 企业ID
corpid = 'xxxxxx'
# 密钥
corpsecret = 'xxxxxxx'
# 应用ID(换成自己的)
AgentId = 100002
def get_token():
"""
获取Token
"""
result = requests.get(
'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}'.format(corpid, corpsecret), timeout=5)
access_token = json.loads(result.text)['access_token']
return access_token
def push_msg(access_token):
"""
接收shell参数:[部署应用名] [git更新记录]
:param access_token:
:return:
"""
now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if sys.argv[2] == '':
msg = "{}: {} 更新失败,请再次提交代码!".format(sys.argv[1], now_time)
else:
msg = "{}: {} 更新:{}".format(sys.argv[1], now_time, sys.argv[2])
# 构造请求消息体
msg_body = {"touser": touser, "msgtype": "text", "agentid": AgentId, "text": {"content": msg}, "safe": 0}
result = requests.post('https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}'.format(access_token),
timeout=5, data=json.dumps(msg_body))
print(result.text)
if __name__ == '__main__':
access_token = get_token()
push_msg(access_token)
# 后续步骤
如果微信绑定了企业微信,别忘了在企业详情页打开“接收应用信息”。
参考资料:
https://work.weixin.qq.com/api/doc#90000/90003/90487
https://open.work.weixin.qq.com/api/doc#90000/90135/90236
https://work.weixin.qq.com/api/doc#90000/90135/90236/%E6%96%87%E6%9C%AC%E6%B6%88%E6%81%AF
公众号
有个人号和服务号之分,个人号到2019年基本没啥用了,接口权限都没有。
以下主要以服务号的方式推送信息。
1、第三方平台(如:Server 酱)
https://hoxis.github.io/python-serverchan.html
http://sc.ftqq.com
2、认证的服务号
认证的服务号可以在后台权限中找到文档。处理方式类似企业微信应用。
钉钉
官方提供的文档就很强大了。
参考资料:
https://github.com/typ431127/zabbix_dingding
https://github.com/magician000/DingTalkRobot-python
https://github.com/Twotiger/dingding
https://github.com/zhuifengshen/DingtalkChatbot
网页版QQ于2019年1月停止服务,目前只能用一些其他的方式,模拟登陆等等。
酷Q:https://cqhttp.cc/docs/4.10/
邮箱
#!/usr/bin/python
# -*- coding: utf-8 -*-
import smtplib
from email.message import EmailMessage
# 账户信息
EMAIL_HOST = 'smtp.qq.com'
EMAIL_PORT = 465 #ssl端口,建议使用ssl
EMAIL_HOST_USER = '842632422@qq.com'
EMAIL_HOST_PASSWORD = '[QQ 邮箱此处填写授权码]'
# 邮件主题
EMAIL_SUBJECT = '消息提醒'
# 发件人
EMAIL_FROM = '842632422@qq.com'
# 收件人
EMAIL_TO = 'xxxxx@qq.com'
# 消息内容
EMAIL_CONTENT = '这里是内容,这是一封测试邮件'
def send_msg(content):
msg = EmailMessage()
msg['Subject'] = EMAIL_SUBJECT
msg['From'] = EMAIL_FROM
msg['To'] = EMAIL_TO
msg.set_content(content)
# 不使用ssl把SMTP_SSL改为SMTP即可
with smtplib.SMTP_SSL(EMAIL_HOST, EMAIL_PORT) as con:
con.login(EMAIL_HOST_USER, EMAIL_HOST_PASSWORD)
con.send_message(msg)
print("发送成功")
if __name__ == '__main__':
send_msg(EMAIL_CONTENT)
参考资料:https://www.runoob.com/python/python-email.html
手机短信
https://www.twilio.com/
这是收费的短信服务,支持多个国家,注册送15美金,大概能免费发500条。
# Download the helper library from https://www.twilio.com/docs/python/install
from twilio.rest import Client
# Your Account Sid and Auth Token from twilio.com/console
account_sid = 'ACXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
auth_token = 'your_auth_token'
client = Client(account_sid, auth_token)
message = client.messages \
.create(
body="Join Earth's mightiest heroes. Like Kevin Bacon.",
from_='+15017122661',
to='+15558675310'
)
print(message.sid)
参考:
https://mp.weixin.qq.com/s/auRrvR6EVtVAdptEymMRUw
https://www.twilio.com/docs/sms/quickstart/python
https://zhuanlan.zhihu.com/p/46992160
控制函数执行时间
import signal
import time
def set_runing_time(num):
"""通过信号控制执行时间"""
def wrap(func):
def handle(signum, frame): # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame.
raise RuntimeError
def to_do(*args):
try:
signal.signal(signal.SIGALRM, handle) # 设置信号和回调函数
signal.alarm(num) # 设置 num 秒的闹钟
print('start alarm signal.')
r = func(*args)
print('close alarm signal.')
signal.alarm(0) # 关闭闹钟
return r
except RuntimeError as e:
return "超时啦"
return to_do
return wrap
@set_runing_time(5) # 限时 5 秒超时
def my_func(): # 要执行的函数
while True:
print('@@@@@@@@@@@@@@')
time.sleep(0.5)
print('##############')
if __name__ == '__main__':
a = time.time() * 1000
s = my_func()
b = time.time() * 1000
print('执行时间(毫秒):', int(b - a))
参考资料:
https://www.cnblogs.com/chenxiyuxiao/p/10864180.html
https://www.cnblogs.com/lyxdw/p/10033118.html
爬虫URL管理
# 通过set管理已爬和未爬的URL
class UrlManager(object):
def __init__(self):
self.new_urls = set()
self.old_urls = set()
# 判断待爬取url是否在容器中
def add_new_url(self, url):
if url is None:
return
if url not in self.new_urls and url not in self.old_urls:
self.new_urls.add(url)
# 添加新url到待爬取集合中
def add_new_urls(self, urls):
if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)
# 判断是否还有待爬取的url
def has_new_url(self):
return len(self.new_urls) != 0
# 获取待爬取url并将url从待爬取移动到已爬取
def get_new_url(self):
new_url = self.new_urls.pop()
self.old_urls.add(new_url)
return new_url
公众号:Crossin的编程教室(编程实例)
https://crossincode.com/oj/practice_list/
django通过celery实现发送邮件的异步执行
http://www.imooc.com/article/16164
python的多路复用实现聊天群
http://www.imooc.com/article/39675
超常用的Python代码片段
https://mp.weixin.qq.com/s/QDX1J21RSaa609jUnmdgQw
Flask生成静态html
from flask import render_template
import flask
app = flask.Flask('my app')
document = {
'title': 'test',
'keyword': 'hahahahha'
}
if __name__ == "__main__":
with app.app_context():
rendered = render_template('index.html', datas=document)
print(rendered)
MongoDB操作
import pymongo
# 连接数据库(有、无密码)
connection = pymongo.MongoClient('127.0.0.1', 27017)
connection = pymongo.MongoClient(host=MONGO_HOST,
port=MONGO_PORT,
username=MONGO_USERNAME,
password=MONGO_PASSWORD,
authMechanism='SCRAM-SHA-1',
authSource=MONGO_DB)
col = connection[MONGO_DB][MONGO_COLLECTION]
# 插入数据
def save_to_mongodb(link_data):
# 连接数据库col
if col.insert(link_data):
print("存储成功", link_data)
return True
return False
参考:
https://blog.csdn.net/qq_34162294/article/details/73441559
http://www.runoob.com/python3/python-mongodb.html
异步数据库
https://www.jianshu.com/p/6d6fa94a01ef
Python 3 类型转换
https://mp.weixin.qq.com/s/H0uZCU9-j-RMhmaPay-XUw
地址拼接
# 地址形式:http://www.leolan.top/?paged=[页码]
# n是总页数;url是首页网址
def Address_stitching(url, n):
url_list = []
pagenumber = 1
while pagenumber <= n:
urlPart = url + '/?paged=%s' % (pagenumber)
pagenumber += 1
url_list.append(urlPart)
return url_list
获取页面最大页数
动态加载,总数写在js里的(如CSDN)
# 获取最大页数,url是用户首页网址
def csdn_last_page(url):
'''
# 每页加载20篇,855一共是42.75页,所以有43页。
# 以下是html文件里截取出来的。
<script>
var currentPage = 1;
var baseUrl = 'https://blog.csdn.net/chszs/article/list' ;
var pageSize = 20 ;
var listTotal = 855 ;
var pageQueryStr = '';
function getAllUrl(page) {
return baseUrl + "/" + page + pageQueryStr;
}
</script>
'''
response = requests.get(url)
response_text = response.text
response.encoding = 'UTF-8'
pattern1 = re.compile('<script>.*?pageSize.*?=.*?(\d+).*?;', re.S)
pageSize = re.findall(pattern1, response_text)[0] #每页篇数
pattern2 = re.compile('<script>.*?listTotal.*?=.*?(\d+).*?;', re.S)
listTotal = re.findall(pattern2, response_text)[0] #总篇数
if int(listTotal) % int(pageSize) == 0: #整除判断
last_page_num = int(listTotal) // int(pageSize)
else:
last_page_num = int(listTotal) // int(pageSize) + 1
return last_page_num
直接从源文件中提取页数
def hexo_next_last_page(url):
response = requests.get(url)
response_text = response.text
soup = BeautifulSoup(response_text, "lxml")
#选择器选择的结果是一个list,直接取出最后一个元素就包含了最大页数
m = soup.select('[class="page-number"]')[-1]
last_page_num = m.get_text()
return int(last_page_num)
对两个列表同时取出对应值(转字典)
# zip函数可以多个对象中的元素打包为元组:http://www.runoob.com/python3/python3-func-zip.html
for key, value in zip(title_list, url_list):
print(key, value)
判断网站状态
def web_status(url):
response = requests.get(url)
status_code = response.status_code
if status_code == 200:
return url
else:
with open('Error_Web_Url.txt', "a+") as f:
f.write(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ' ')
f.write(url)
f.write('\r\n')
print("网站访问异常,失败网址已记录在:Error_Web_Url.txt")
return False
代理
从开源免费代理池获取代理
# 项目地址:https://github.com/fengzhizi715/ProxyPool
def get_proxy():
response = requests.get('http://47.97.7.119:8080/proxypool/proxys/1')
data = json.loads(response.text).get('data')[0]
get_ip = data.get('proxyAddress')
get_port = data.get('proxyPort')
get_type = data.get('proxyType')
proxy = {str(get_type) : str(get_type) + '://' + str(get_ip) + ':' + str(get_port)}
return proxy
随机从mongoDB随机取出一个代理IP
def get_random_proxy_ip():
# 随机获取一个IP,这里不能用DB.py中自己写的class
connection = pymongo.MongoClient(MongoDB_IP, MongoDB_Port)
cdb = connection.BlogWorm_DB.ProxyIP
items = cdb.find()
length = items.count()
ind = random.randint(0, length-1)
ip = items[ind]['http'].replace('\n','')
print(ip)
return ip
从ProxyPool取出ip拼接为可用的格式
def get_proxy():
try:
response = requests.get(PROXY_POOL_URL)
if response.status_code == 200:
#print(response.text)
proxy = {'http': 'http://' + response.text}
try:
# 获取的IP再次测试是否可用
response = requests.get("http://ip.chinaz.com/getip.aspx", proxies=proxy, timeout=5)
return proxy # 代理可用,返回代理
except Exception:
return get_proxy() # 如果代理无效重新获取一个
except ConnectionError:
print("代理池好像挂掉了!!!")
return None # 连接失败说明代理池挂掉了
多线程
from multiprocessing import Pool
if __name__ == '__main__':
groups = [index * 1 for index in range(ProxyPages_Start, ProxyPages_End)]
pool = Pool()
pool.map(main, groups)
模拟登录
selenium
# 小米商城登录
from selenium import webdriver
# 登录地址
login_url = 'https://account.xiaomi.com/pass/serviceLogin'
#登录成功后的地址
login_sec = 'https://account.xiaomi.com/pass/auth/security/home'
# 账户、密码
username = 'xxxxxx@qq.com'
passwd = '123456'
# 登录
def login(name ,pwd):
browser.get(login_url) #打开登录网址
time.sleep(1)
# 如果找不到标签ID,可以使用其他方法来确定元素位置,find_element_by_class_name
browser.find_element_by_id("username").send_keys(name) #利用账号标签的ID,确定位置并send信息
browser.find_element_by_id("pwd").send_keys(pwd) #利用密码标签的ID,确定位置并send信息
try:
browser.find_element_by_id("login-button").click() #利用登录按钮的ID,确定位置并点击
# 循环等待登录,登录成功,跳出循环
while True:
if browser.current_url[:50] != login_sec:
time.sleep(1)
else:
# logbticket.info("登陆成功...")
print('登录成功!')
break
browser.refresh() # 刷新页面
except Exception as e:
print(e)
gzip压缩
# 获取网页压缩过的源码
import urllib2, httplib
request = urllib2.Request('http://xxxx.com')
request.add_header('Accept-encoding', 'gzip') 1
opener = urllib2.build_opener()
f = opener.open(request)
# 解压缩
import StringIO
import gzip
compresseddata = f.read()
compressedstream = StringIO.StringIO(compresseddata)
gzipper = gzip.GzipFile(fileobj=compressedstream)
print(gzipper.read())
遍历文件夹获取文件名
# 深度优先遍历
def get_file_path(root_path, file_list, dir_list):
# 获取该目录下所有的文件名称和目录名称
dir_or_files = os.listdir(root_path)
for dir_file in dir_or_files:
# 获取目录或者文件的路径
dir_file_path = os.path.join(root_path, dir_file)
# 判断该路径为文件还是路径
if os.path.isdir(dir_file_path):
# dir_list.append(dir_file_path)
# 递归获取所有文件和目录的路径
get_file_path(dir_file_path, file_list, dir_list)
else:
file_list.append(dir_file_path) # 追加文件路径
# 用来存放所有的文件路径
file_list = []
# 用来存放所有的目录路径
dir_list = []
get_file_path(root_path, file_list, dir_list)
评论区