引言

最近我看到了一则有人利用DNS记录下载文件的报道,觉得很有意思便亲自尝试了一下。本文简单讲述原理及思路,并附上成品代码。

请注意,本文所探讨的相关技术和思路仅用于学习与交流,所有代码必须在下载/复制后24小时内删除,切勿用于任何非法用途!
违反此规定造成的一切后果与本文作者无关。

原理及相关信息

域名可以创建TXT记录,本来是用于存储与域名相关的文本信息,但我们或许可以存一些其他的东西。

首先来看一下TXT记录支持什么格式吧。
经本人测试(以Cloudflare为例):TXT记录最大长度为2048字符(不含双引号,且双引号会被自动添加);不支持部分特殊字符,例如支持base64编码表中的所有字符而不支持base85编码表中的部分字符;中文字符会被转义。
与此同时还发现,2048个字符并非在一个完整的字符串中,而是被分割为若干小字符串,每个255字符(不含双引号)。

由此发现将图片等文件转为base64编码应该效果不错。(但这会导致文件体积增大约33%)

如有其他疑问可自行测试,不再赘述。

实战

注:所有代码均为AI生成,我只做了极少量修改,谨慎参考!!

将文件base64编码

为了处理像图片这样的非文本文件,可以使用以下工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python3
import base64
import sys
import argparse

def main():
parser = argparse.ArgumentParser(description='文件与Base64编码转换工具')
parser.add_argument('input', help='输入文件')
parser.add_argument('-d', '--decode', action='store_true', help='解码模式')
parser.add_argument('-o', '--output', help='输出文件')

args = parser.parse_args()

if args.decode:
# 解码模式
with open(args.input, 'r', encoding='ascii') as f:
base64_str = f.read()

# 处理可能的填充字符
padding = len(base64_str) % 4
if padding:
base64_str += '=' * (4 - padding)

decoded = base64.b64decode(base64_str)

output_file = args.output or args.input.replace('.b64', '')
with open(output_file, 'wb') as f:
f.write(decoded)
print(f"解码完成: {output_file}")
else:
# 编码模式
with open(args.input, 'rb') as f:
data = f.read()
encoded = base64.b64encode(data).decode('ascii')

output_file = args.output or args.input + '.b64'
with open(output_file, 'w', encoding='ascii') as f:
f.write(encoded)
print(f"编码完成: {output_file}")

if __name__ == "__main__":
main()

用法(编码文件)python base64_tool.py myfile.txt,其余用法可使用-h参数查看。

将文件分割

使用以下脚本将上一步生成的.b64文件分割为每份2048字符的小文件以符合TXT记录的长度限制。生成的文件名带序号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import sys

def split_b64_file(input_file, chunk_size=2048):
"""
将Base64编码的文本文件分割成多个小文件

Args:
input_file: 输入的.b64文件路径
chunk_size: 每个小文件的字符数,默认为2048
"""

# 检查输入文件是否存在
if not os.path.exists(input_file):
print(f"错误:文件 '{input_file}' 不存在")
return False

# 检查文件后缀
if not input_file.endswith('.b64'):
print(f"警告:文件 '{input_file}' 不是.b64后缀")

try:
# 读取文件内容
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read().strip()

# 获取文件名(不含扩展名)
base_name = os.path.splitext(input_file)[0]

# 计算需要分割的文件数量
total_chars = len(content)
num_chunks = (total_chars + chunk_size - 1) // chunk_size # 向上取整

print(f"文件总字符数: {total_chars}")
print(f"将分割为 {num_chunks} 个文件,每个最多 {chunk_size} 字符")

# 分割文件
for i in range(num_chunks):
start = i * chunk_size
end = start + chunk_size
chunk_content = content[start:end]

# 生成输出文件名
output_file = f"{base_name}_part{i+1:04d}.b64"

# 写入分割后的文件
with open(output_file, 'w', encoding='utf-8') as f:
f.write(chunk_content)

print(f"已创建: {output_file} (字符数: {len(chunk_content)})")

print("文件分割完成!")
return True

except Exception as e:
print(f"处理文件时发生错误: {e}")
return False

def main():
# 使用方法说明
if len(sys.argv) != 2:
print("使用方法: python split_b64.py <filename.b64>")
print("示例: python split_b64.py myfile.b64")
return

input_file = sys.argv[1]
split_b64_file(input_file)

if __name__ == "__main__":
main()

用法:python split_b64.py myfile.b64

我们便得到了一堆小纯文本文件。

将文件内容上传到DNS

如果上一步分割出来的小文件很少,那么手动一个一个添加其实也无妨。但如果有上百个甚至上千个,无论如何是不能手动操作了。

所幸,大部分DNS解析提供商都会提供一个API接口来自动化完成这一操作。

获取API令牌

以Cloudflare为例,在此处可以获取API令牌,直接使用编辑区域 DNS的模板再选择区域资源中想要编辑DNS记录的域名即可(要提前把域名托管到Cloudflare!

再回到账户主页,打开相应域名,找到区域 ID并复制。

自动化上传

直接使用以下脚本,将API令牌、区域ID、域名替换为自己的就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import os
import re
import requests
import json
from pathlib import Path
from typing import List, Optional, Tuple
import time

class CloudflareDNSManager:
def __init__(self, api_token: str, zone_id: str, base_domain: str):
self.api_token = api_token
self.zone_id = zone_id
self.base_domain = base_domain
self.base_url = "https://api.cloudflare.com/client/v4/zones"
self.headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}

def extract_sequence_number(self, filename: str) -> Optional[int]:
"""
从文件名中提取序列号,支持各种格式如:
file_part0001.b64, data_segment001.txt, chunk123.b64等
"""
# 匹配文件名中的数字序列
match = re.search(r'(\d+)', filename)
if match:
return int(match.group(1))
return None

def get_dns_record_id(self, subdomain: str, record_type: str = "TXT") -> Optional[str]:
"""获取现有DNS记录的ID"""
url = f"{self.base_url}/{self.zone_id}/dns_records"
params = {
"type": record_type,
"name": f"{subdomain}.{self.base_domain}"
}

try:
response = requests.get(url, headers=self.headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()

if data['success'] and data['result']:
return data['result'][0]['id']
return None

except requests.RequestException as e:
print(f"获取DNS记录失败: {e}")
return None

def create_dns_record(self, subdomain: str, content: str, record_type: str = "TXT") -> bool:
"""创建新的DNS记录"""
url = f"{self.base_url}/{self.zone_id}/dns_records"
data = {
"type": record_type,
"name": f"{subdomain}.{self.base_domain}",
"content": content,
"ttl": 120, # 自动TTL
"comment": f"Automatically created from file sequence {subdomain}"
}

try:
response = requests.post(url, headers=self.headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()

if result['success']:
print(f"✓ 成功创建记录 {subdomain}.{self.base_domain}")
return True
else:
print(f"✗ 创建记录失败: {result.get('errors', 'Unknown error')}")
return False

except requests.RequestException as e:
print(f"✗ 请求创建记录失败: {e}")
return False

def update_dns_record(self, record_id: str, subdomain: str, content: str, record_type: str = "TXT") -> bool:
"""更新现有的DNS记录"""
url = f"{self.base_url}/{self.zone_id}/dns_records/{record_id}"
data = {
"type": record_type,
"name": f"{subdomain}.{self.base_domain}",
"content": content,
"ttl": 120
}

try:
response = requests.put(url, headers=self.headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()

if result['success']:
print(f"✓ 成功更新记录 {subdomain}.{self.base_domain}")
return True
else:
print(f"✗ 更新记录失败: {result.get('errors', 'Unknown error')}")
return False

except requests.RequestException as e:
print(f"✗ 请求更新记录失败: {e}")
return False

def process_file(self, file_path: Path, target_subdomain: str) -> bool:
"""处理单个文件并上传到DNS记录"""
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()

if not content:
print(f"✗ 文件 {file_path.name} 内容为空")
return False

# 检查TXT记录长度限制(Cloudflare TXT记录最大长度)
# if len(content) > 255:
# print(f"⚠ 警告: {file_path.name} 内容超过255字符,可能被截断")

# 格式化子域名(确保4位数字)
formatted_subdomain = target_subdomain.zfill(4)

# 检查记录是否已存在
record_id = self.get_dns_record_id(formatted_subdomain)

if record_id:
# 更新现有记录
return self.update_dns_record(record_id, formatted_subdomain, content)
else:
# 创建新记录
return self.create_dns_record(formatted_subdomain, content)

except Exception as e:
print(f"✗ 处理文件 {file_path.name} 时出错: {e}")
return False

def process_directory(self, directory_path: str, file_pattern: str = "*.b64") -> None:
"""处理目录中的所有匹配文件"""
directory = Path(directory_path)

if not directory.exists() or not directory.is_dir():
print(f"✗ 目录不存在: {directory_path}")
return

# 获取所有匹配的文件并按序列号排序
files = []
for file_path in directory.glob(file_pattern):
seq_number = self.extract_sequence_number(file_path.name)
if seq_number is not None:
files.append((seq_number, file_path))

if not files:
print("✗ 未找到包含序列号的文件")
return

# 按序列号排序
files.sort(key=lambda x: x[0])

print(f"找到 {len(files)} 个文件,开始处理...")
print("-" * 50)

success_count = 0
failed_files = []

for seq_number, file_path in files:
print(f"处理文件: {file_path.name} -> {str(seq_number).zfill(4)}.{self.base_domain}")

if self.process_file(file_path, str(seq_number)):
success_count += 1
else:
failed_files.append(file_path.name)

# 添加延迟避免API限制
time.sleep(1)

print("-" * 50)
print(f"处理完成! 成功: {success_count}/{len(files)}")

if failed_files:
print(f"失败的文件: {', '.join(failed_files)}")

def main():
# 配置信息
API_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # 从环境变量获取更安全
ZONE_ID = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # 在Cloudflare域名概述页面找到
BASE_DOMAIN = "file.xxx.com" # 你的基础域名(托管到Cloudflare的域名设为xxx.org)(不建议填写根域名)
DIRECTORY_PATH = "./files" # 包含b64文件的目录路径

# 从环境变量获取敏感信息(推荐)
# API_TOKEN = os.getenv('CF_API_TOKEN')
# ZONE_ID = os.getenv('CF_ZONE_ID')

if not API_TOKEN or not ZONE_ID:
print("请设置API_TOKEN和ZONE_ID")
return

# 创建管理器实例
manager = CloudflareDNSManager(API_TOKEN, ZONE_ID, BASE_DOMAIN)

# 处理目录中的所有b64文件
manager.process_directory(DIRECTORY_PATH, "*.b64")

if __name__ == "__main__":
main()

安装依赖:

1
2
pip install requests
pip install dnspython

在脚本目录新建一个文件夹files,把切分好的小文件全放进去,运行脚本,文件内容就会自动按顺序上传到相应域名。文件顺序来自文件名,完美兼容上一步的脚本。例如input_part0001.b64将会上传到0001.file.xxx.com

从DNS记录还原文件

使用以下脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import dns.resolver
import sys
import base64
import time
from typing import List

class DNSFileReconstructor:
def __init__(self, dns_servers=None):
"""
初始化DNS文件重建器

Args:
dns_servers: DNS服务器列表,默认为Cloudflare和Google DNS
"""
self.dns_servers = dns_servers or ['1.1.1.1', '8.8.8.8']
self.resolver = dns.resolver.Resolver()
self.resolver.nameservers = self.dns_servers

def download_txt_record(self, domain: str) -> str:
"""
下载指定域名的TXT记录内容

Args:
domain: 要查询的域名

Returns:
TXT记录内容字符串,失败返回空字符串
"""
try:
print(f"查询域名: {domain}")
answers = self.resolver.resolve(domain, 'TXT')

# 合并所有TXT记录字符串
txt_content = ''
for answer in answers:
for item in answer.strings:
txt_content += item.decode('utf-8')

# 移除可能的引号
txt_content = txt_content.strip('"')
print(f"获取成功,长度: {len(txt_content)} 字符")
return txt_content

except dns.resolver.NXDOMAIN:
print(f"域名不存在: {domain}")
return ""
except dns.resolver.NoAnswer:
print(f"域名没有TXT记录: {domain}")
return ""
except dns.resolver.Timeout:
print(f"查询超时: {domain}")
return ""
except Exception as e:
print(f"查询域名 {domain} 时出错: {e}")
return ""

def download_all_parts(self, base_domain: str, start_part: int = 1, max_parts: int = 1000) -> List[str]:
"""
下载所有分片的TXT记录

Args:
base_domain: 基础域名
start_part: 起始分片号
max_parts: 最大尝试分片数

Returns:
分片内容列表
"""
all_parts = []
consecutive_failures = 0
max_consecutive_failures = 3 # 连续失败3次后停止

for part_num in range(start_part, max_parts + 1):
# 生成四位数序号的域名
domain = f"{part_num:04d}.{base_domain}"

content = self.download_txt_record(domain)

if content:
all_parts.append(content)
consecutive_failures = 0
print(f"成功下载第 {part_num} 部分")
else:
consecutive_failures += 1
print(f"第 {part_num} 部分下载失败")

# 如果连续失败多次,认为已经到达末尾
if consecutive_failures >= max_consecutive_failures:
print(f"连续 {consecutive_failures} 次下载失败,停止搜索")
break

# 添加短暂延迟,避免DNS查询过于频繁
time.sleep(0.1)

return all_parts

def reconstruct_b64_file(self, base_domain: str, output_b64_file: str = "reconstructed.b64") -> bool:
"""
从DNS TXT记录重建Base64文件

Args:
base_domain: 基础域名
output_b64_file: 输出的Base64文件路径

Returns:
是否成功
"""
print(f"开始从DNS TXT记录重建Base64文件...")
print(f"基础域名: {base_domain}")

# 下载所有分片
parts = self.download_all_parts(base_domain)

if not parts:
print("没有成功下载任何分片")
return False

# 合并所有内容
full_content = ''.join(parts)

# 写入Base64文件
try:
with open(output_b64_file, 'w', encoding='utf-8') as f:
f.write(full_content)

print(f"\nBase64文件重建成功!")
print(f"总共下载: {len(parts)} 个分片")
print(f"总字符数: {len(full_content)}")
print(f"Base64文件: {output_b64_file}")

return True

except Exception as e:
print(f"写入Base64文件时出错: {e}")
return False

def decode_b64_file(self, b64_file: str, output_file: str) -> bool:
"""
解码Base64文件为原始文件

Args:
b64_file: Base64编码的文件路径
output_file: 解码后的输出文件路径

Returns:
是否成功解码
"""
try:
print(f"\n开始解码Base64文件: {b64_file}")

# 读取Base64内容
with open(b64_file, 'r', encoding='utf-8') as f:
b64_content = f.read().strip()

# 解码Base64
# 处理可能包含的换行符和空格
b64_content = b64_content.replace('\n', '').replace('\r', '').replace(' ', '')

# 解码
decoded_data = base64.b64decode(b64_content)

# 写入输出文件
with open(output_file, 'wb') as f:
f.write(decoded_data)

print(f"Base64解码成功!")
print(f"输出文件: {output_file}")
print(f"文件大小: {len(decoded_data)} 字节")

return True

except base64.binascii.Error as e:
print(f"Base64解码错误: {e}")
print("请检查Base64内容是否完整和正确")
return False
except Exception as e:
print(f"解码文件时出错: {e}")
return False

def reconstruct_and_decode(self, base_domain: str, output_file: str,
temp_b64_file: str = "temp_reconstructed.b64") -> bool:
"""
完整的重建和解码流程

Args:
base_domain: 基础域名
output_file: 最终输出文件路径
temp_b64_file: 临时Base64文件路径

Returns:
是否成功
"""
# 第一步:从DNS重建Base64文件
if not self.reconstruct_b64_file(base_domain, temp_b64_file):
return False

# 第二步:解码Base64文件
if not self.decode_b64_file(temp_b64_file, output_file):
return False

print(f"\n 文件重建和解码完成!")
print(f"原始文件已保存到: {output_file}")
return True

def main():
"""
主函数:处理命令行参数并执行重建流程
"""
if len(sys.argv) < 3:
print("DNS TXT记录文件重建和解码工具")
print("使用方法: python dns_reconstruct.py <base_domain> <output_file> [temp_b64_file]")
print("示例: python dns_reconstruct.py file.xxx.com original_file.zip")
print("示例: python dns_reconstruct.py file.xxx.com photo.jpg temp.b64")
print("\n参数说明:")
print(" base_domain: 基础域名(不包含序号部分)")
print(" output_file: 最终输出文件名")
print(" temp_b64_file: 临时Base64文件名(可选,默认为temp_reconstructed.b64)")
return

# 解析参数
base_domain = sys.argv[1]
output_file = sys.argv[2]
temp_b64_file = "temp_reconstructed.b64"

if len(sys.argv) > 3:
temp_b64_file = sys.argv[3]

# 创建重建器实例
reconstructor = DNSFileReconstructor()

# 执行完整的重建和解码流程
success = reconstructor.reconstruct_and_decode(base_domain, output_file, temp_b64_file)

if not success:
print("\n❌ 文件重建失败")
sys.exit(1)

if __name__ == "__main__":
main()

直接运行python dns_reconstruct.py file.xxx.com photo.jpg即可还原文件。

结语

至此,所有讲解已结束,如果有其他想法可继续交流。

最后再强调

严禁用于一切非法用途,后果自负!!!!!