在大多数前后端分离项目,更多的使用JWT来进行接口认证,接下来详细介绍:
一,认识JWT
1,什么是JWT
JWT(Json Web Token)是一种为了在网络应用环境间传递声明而执行的,基于JSON的开放标准,通过数字签名的方式,以JSON对象为载体,在不同的服务终端之间安全的传输信息
2,JWT的组成
JWT由头部(Header),负载(Payload),签名(Signature) 这3部分构成,其中每一部分都使用Base64编码处理。
Header:头部信息,主要包含两部分信息,类型,通常为”JWT”, 算法名称,比如HSHA256,RSA等。
Payload:具体用户的信息,需要注意的是,该部分内容只经过Base64编码(相当于明文存储),所以不要在其中放置敏感信息。
Signature:签名信息,签名用于验证信息在传递过程中是否被更改。
JWT信息由3段构成,它们之间用圆点”.“连接,格式如下:
aaaa.bbbbb.ccccc
下面我使用了在线jwt解码网址JWT Token在线解析解码 – ToolTT在线工具箱解码出的图例:
3,JWT应用场景
JWT最常见的场景就是授权认证,一旦用户登录,后续每个请求都将包含JWT,系统在每次处理用户请求之前,都要先进行JWT安全校验,通过之后再进行处理
二,JWT实战
1,django基于DRF框架实现JWT认证
可以基于djangorestframework-jwt库来使用JWT进行身份验证
(1)安装
pip install djangorestframework-jwt
(2)配置应用。
编写配置文件setting.py,配置如下:
(3)配置路由,在项目目录的urls.py中添加如下代码:
from rest_framework_jwt.views import obtain_jwt_token
urlpatterns = [
path('api-jwt-token-auth/', obtain_jwt_token),
]
执行结果如图:
输入认证的账号密码后点POST,输出结果
{
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNjg0MTQ1MDcwLCJlbWFpbCI6IjExMUAxMTEuY29tIn0.vHP9_SjPcEUq2El26qU9r8AdymtUwyNq2cW3OtlXGws"
}
现在我们给接口添加权限认证
DRF内置的权限组件的配置由以下4种:
rest_frameword.permissions.AllowAny:默认用户对所有的接口都有操作权限,即不做权限限制
rest_frameword.permissions.IsAuthenticated:通过认证的用户才可以访问接口
rest_frameword.permissions.IsAdminUser:仅管理员用户才可以访问
rest_frameword.permissions.IsAuthenticatedOrReadOnly:未认证的用户只有查询的权限,经过认证的用户有增加,删除,修改,查询的权限
这里我们用rest_frameword.permissions.IsAuthenticated做全局配置
setting.py里配置DEFAULT_PERMISSION_CLASSES
配置后访问任意一接口会显示如下信息:
{
"detail": "Authentication credentials were not provided."
}
接下来,我们使用JWT访问已经设置权限的接口,该如何使用呢
JWT的格式为:JWT + 两个空格 + 具体的Token,我这里使用apihost工具去执行接口调用测试
Header信息头中添加Authorization 值是JWT + 空格 + (之前调用api-jwt-token-auth得到的token值)
2,flask中使用JWT实现单点登录中的授权认证功能
单点登录整个请求流程图简单画下:
其中我们使用JWT主要是用来ssourl+/login用户登录成功后生成token信息,第二是,提供一个token验证接口,其他系统访问每次请求如果请求头中的有Authorization中有token值的话去验证token,只有token验证成功后才能执行后面的逻辑否则将重新跳转ssourl+/login上
使用JWT应用在单点登录token生成及验证代码实现示例:
class TokenAuth:
def __init__(self):
env = getenv()
# redis连接,用于操作User类.User类是专门给flask-login用的。删除User类中的数据就可以实现用户登出。
self.user_con_redis = configs[env].user_con_redis
# redis链接,用于存储密码版本。
self.ver_con_redis = configs[env].ver_con_redis
# self.logout_con_redis = configs[env].logout_con_redis
self.logger = HGLog('HGauth.log', log_path='/home/hero/log')
def gen_token(self, username, secret, ver):
"""
生成认证Token
:param ver:
:param username: str
:param secret: str
:return: string
"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=10800),
'iat': datetime.datetime.utcnow(),
'data': {
'username': username,
'ver': ver
}
}
self.logger.info("生成token:" + username)
return jwt.encode(payload, secret, algorithm='HS256')
except Exception as e:
self.logger.error("token生成异常 {} ".format(username) + str(e))
return e
def verify_token(self, token, secret):
"""
验证Token 除了常规的校验,还要校验ver,以保证用户密码修改后强制重新登录
"""
print('校验token')
print(token)
try:
payload = jwt.decode(token, secret, algorithms=['HS256'])
if 'data' in payload and 'username' in payload['data']:
payload_username = payload['data']['username']
payload_ver = payload['data']['ver']
# 判断一下用户是否在登录状态
if self.user_con_redis.get(payload['data']['username']):
pass
else:
return {"code": 2005, "message": "user is logout", "result": None, "success": False}
ver = self.get_ver(payload_username) # 校验payload中data字典中的ver。
if ver == payload_ver:
self.logger.info("校验token: " + payload['data']['username'] + ',payload_ver: ' + str(payload_ver))
try: # 查询用户接口的返回数据,捕获异常,返回2005错误码。
q = DataQuery()
success, userinfo = q.queryLdapDinginfo(payload_username)
if not success:
return {{"code": 2006, "message": userinfo, "result": None, "success": False}}
username = payload_username
nickname = userinfo['nickname']
dept = userinfo['full_dept']
# 以上是获取token jwt中的用户信息,
# 根据用户信息需要判断用户是否被锁 todo 暂时不处理
# p = ssoPW()
# l = p.lock_status(userName)
# if l['success']: # lock_status 返回True 说明账号被锁,不可登录;返回False 说明账号未被锁定,可以使用
# return {"code": 2006,
# "message": "Account has been locked",
# "result": None,
# "success": False}
# # 如果用户没有被锁,继续以下流程
if 'uid' in userinfo:
uid = userinfo['uid']
else:
uid = None
result = {
'username': username, # 规范返回值 都是用小写key
'dept': dept,
'nickname': nickname, # 规范返回值 都是用小写key
'uid': uid}
# # 是否为雇员 todu 额外功能 不处理
# if 'warning' in k['data'][0]:
# warning = k['data'][0]['warning']
# result['warning'] = warning
# result['employee'] = False
# else:
# result['employee'] = True
# # 是否为机器人
# if 'warning2' in k['data'][0]:
# warning2 = k['data'][0]['warning']
# result['warning2'] = warning2
# result['robot'] = True
# else:
# result['robot'] = False
# # 记录非员工用户
# if result['robot'] or not result['employee']:
# self.logger.info('非员工验证'+str(result))
return {"code": 1001, "message": "成功",
"result": result,
"success": True}
except Exception as e:
self.logger.error(str(e))
return {"code": 2005,
"message": "Internal Server Error. Please contect SSO developer",
"result": None,
"success": False}
#
# return {"code": 1001, "message": "成功",
# "result": {"userName": userName, # 历史遗留大小写混用,保持兼容
# 'username': userName, # 规范返回值 都是用小写key
# 'dept': dept,
# 'displayName': displayName, # 历史遗留大小写混用,保持兼容
# 'displayname': displayName, # 规范返回值 都是用小写key
# 'uid': uid},
# "success": True}
else: # ver 校验未通过
print(ver)
print(payload_username, payload_ver)
return {"code": 2004, "message": "password changed", "result": None, "success": False}
else:
raise jwt.InvalidTokenError
except jwt.ExpiredSignatureError:
# token过期
return {"code": 2001, "message": "expired token", "result": None, "success": False}
except jwt.InvalidTokenError:
# token无效
return {"code": 2003, "message": "invalid token", "result": None, "success": False}
def get_ver(self, username):
"""
从redis中读取中
:param username:
:return:
"""
data = self.ver_con_redis.get(username)
# 先判断是否存在username这个记录,如果没有直接返回0
if data is None:
return 0
data = pickle.loads(data)
return data["ver"]
def set_ver(self, username, ver):
"""
操作redis,设置ver
:param username:
:param ver:
:return:
"""
data = {"ver": ver}
self.ver_con_redis.set(username, pickle.dumps(data))
其中gen_token就是用户通过ssourl+/login登录成功后生成token,将token存入到headers和cookie中
verify_token函数为单点登录系统对外提供调用的验证token的函数
编写验证token接口
@app.route('/attach_valid')
def attach_valid():
"""
校验token
:return:
"""
token = request.headers.get('Authorization')
SSO_SECRET = configs[env].sso_secret
hglog.info('attach_valid' + str(token)) # token 可能为空
if token:
ta = TokenAuth()
data = ta.verify_token(token, SSO_SECRET)
print(data)
if data['code'] == 1001:
username = data['result']["username"]
# if nonem_block(username=username, sip=realip):
# return abort(403)
res = make_response(jsonify(data), 200)
else: # code非1001 认为异常 返回码202
res = make_response(jsonify(data), 202)
hglog.info(token + "-/attach_valid")
return res
else:
data = {"code": 2003, "message": "no token", "result": None, "success": False}
res = make_response(jsonify(data), 202)
hglog.info(request.remote_addr + "-" + 'no_token' + "-/attach_valid")
return res
简单的示范,如果需要详细咨询Python关于单点登录的实现的话,请联系作者博主:quietguoguo的博客_SSL,nginx,python_51CTO博客
服务器租用托管,机房租用托管,主机租用托管,https://www.e1idc.com