引言
最近我看到了一则有人利用DNS记录下载文件的报道,觉得很有意思便亲自尝试了一下。本文仅简单讲述原理及思路,并附上成品代码。
请注意,本文所探讨的相关技术和思路仅用于学习与交流,所有代码必须在下载/复制后24小时内删除,切勿用于任何非法用途!
违反此规定造成的一切后果与本文作者无关。
原理及相关信息
域名可以创建TXT记录,本来是用于存储与域名相关的文本信息,但我们或许可以存一些其他的东西。
首先来看一下TXT记录支持什么格式吧。
经本人测试(以Cloudflare为例):TXT记录最大长度为2048字符(不含双引号,且双引号会被自动添加);不支持部分特殊字符,例如支持base64编码表中的所有字符而不支持base85编码表中的部分字符;中文字符会被转义。
与此同时还发现,2048个字符并非在一个完整的字符串中,而是被分割为若干小字符串,每个255字符(不含双引号)。
由此发现将图片等文件转为base64编码应该效果不错。(但这会导致文件体积增大约33%)
如有其他疑问可自行测试,不再赘述。
实战
注:所有代码均为AI生成,我只做了极少量修改,谨慎参考!!
将文件base64编码
为了处理像图片这样的非文本文件,可以使用以下工具。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import base64 import sys import argparse
def main(): parser = argparse.ArgumentParser(description='文件与Base64编码转换工具') parser.add_argument('input', help='输入文件') parser.add_argument('-d', '--decode', action='store_true', help='解码模式') parser.add_argument('-o', '--output', help='输出文件') args = parser.parse_args() if args.decode: with open(args.input, 'r', encoding='ascii') as f: base64_str = f.read() padding = len(base64_str) % 4 if padding: base64_str += '=' * (4 - padding)
decoded = base64.b64decode(base64_str) output_file = args.output or args.input.replace('.b64', '') with open(output_file, 'wb') as f: f.write(decoded) print(f"解码完成: {output_file}") else: with open(args.input, 'rb') as f: data = f.read() encoded = base64.b64encode(data).decode('ascii') output_file = args.output or args.input + '.b64' with open(output_file, 'w', encoding='ascii') as f: f.write(encoded) print(f"编码完成: {output_file}")
if __name__ == "__main__": main()
|
用法(编码文件)python base64_tool.py myfile.txt
,其余用法可使用-h
参数查看。
将文件分割
使用以下脚本将上一步生成的.b64
文件分割为每份2048字符的小文件以符合TXT记录的长度限制。生成的文件名带序号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import os import sys
def split_b64_file(input_file, chunk_size=2048): """ 将Base64编码的文本文件分割成多个小文件 Args: input_file: 输入的.b64文件路径 chunk_size: 每个小文件的字符数,默认为2048 """ if not os.path.exists(input_file): print(f"错误:文件 '{input_file}' 不存在") return False if not input_file.endswith('.b64'): print(f"警告:文件 '{input_file}' 不是.b64后缀") try: with open(input_file, 'r', encoding='utf-8') as f: content = f.read().strip() base_name = os.path.splitext(input_file)[0] total_chars = len(content) num_chunks = (total_chars + chunk_size - 1) // chunk_size print(f"文件总字符数: {total_chars}") print(f"将分割为 {num_chunks} 个文件,每个最多 {chunk_size} 字符") for i in range(num_chunks): start = i * chunk_size end = start + chunk_size chunk_content = content[start:end] output_file = f"{base_name}_part{i+1:04d}.b64" with open(output_file, 'w', encoding='utf-8') as f: f.write(chunk_content) print(f"已创建: {output_file} (字符数: {len(chunk_content)})") print("文件分割完成!") return True except Exception as e: print(f"处理文件时发生错误: {e}") return False
def main(): if len(sys.argv) != 2: print("使用方法: python split_b64.py <filename.b64>") print("示例: python split_b64.py myfile.b64") return input_file = sys.argv[1] split_b64_file(input_file)
if __name__ == "__main__": main()
|
用法:python split_b64.py myfile.b64
我们便得到了一堆小纯文本文件。
将文件内容上传到DNS
如果上一步分割出来的小文件很少,那么手动一个一个添加其实也无妨。但如果有上百个甚至上千个,无论如何是不能手动操作了。
所幸,大部分DNS解析提供商都会提供一个API接口来自动化完成这一操作。
获取API令牌
以Cloudflare为例,在此处可以获取API令牌,直接使用编辑区域 DNS
的模板再选择区域资源
中想要编辑DNS记录的域名即可(要提前把域名托管到Cloudflare!)
再回到账户主页,打开相应域名,找到区域 ID
并复制。
自动化上传
直接使用以下脚本,将API令牌、区域ID、域名替换为自己的就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
| import os import re import requests import json from pathlib import Path from typing import List, Optional, Tuple import time
class CloudflareDNSManager: def __init__(self, api_token: str, zone_id: str, base_domain: str): self.api_token = api_token self.zone_id = zone_id self.base_domain = base_domain self.base_url = "https://api.cloudflare.com/client/v4/zones" self.headers = { "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" } def extract_sequence_number(self, filename: str) -> Optional[int]: """ 从文件名中提取序列号,支持各种格式如: file_part0001.b64, data_segment001.txt, chunk123.b64等 """ match = re.search(r'(\d+)', filename) if match: return int(match.group(1)) return None def get_dns_record_id(self, subdomain: str, record_type: str = "TXT") -> Optional[str]: """获取现有DNS记录的ID""" url = f"{self.base_url}/{self.zone_id}/dns_records" params = { "type": record_type, "name": f"{subdomain}.{self.base_domain}" } try: response = requests.get(url, headers=self.headers, params=params, timeout=30) response.raise_for_status() data = response.json() if data['success'] and data['result']: return data['result'][0]['id'] return None except requests.RequestException as e: print(f"获取DNS记录失败: {e}") return None def create_dns_record(self, subdomain: str, content: str, record_type: str = "TXT") -> bool: """创建新的DNS记录""" url = f"{self.base_url}/{self.zone_id}/dns_records" data = { "type": record_type, "name": f"{subdomain}.{self.base_domain}", "content": content, "ttl": 120, "comment": f"Automatically created from file sequence {subdomain}" } try: response = requests.post(url, headers=self.headers, json=data, timeout=30) response.raise_for_status() result = response.json() if result['success']: print(f"✓ 成功创建记录 {subdomain}.{self.base_domain}") return True else: print(f"✗ 创建记录失败: {result.get('errors', 'Unknown error')}") return False except requests.RequestException as e: print(f"✗ 请求创建记录失败: {e}") return False def update_dns_record(self, record_id: str, subdomain: str, content: str, record_type: str = "TXT") -> bool: """更新现有的DNS记录""" url = f"{self.base_url}/{self.zone_id}/dns_records/{record_id}" data = { "type": record_type, "name": f"{subdomain}.{self.base_domain}", "content": content, "ttl": 120 } try: response = requests.put(url, headers=self.headers, json=data, timeout=30) response.raise_for_status() result = response.json() if result['success']: print(f"✓ 成功更新记录 {subdomain}.{self.base_domain}") return True else: print(f"✗ 更新记录失败: {result.get('errors', 'Unknown error')}") return False except requests.RequestException as e: print(f"✗ 请求更新记录失败: {e}") return False def process_file(self, file_path: Path, target_subdomain: str) -> bool: """处理单个文件并上传到DNS记录""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read().strip() if not content: print(f"✗ 文件 {file_path.name} 内容为空") return False formatted_subdomain = target_subdomain.zfill(4) record_id = self.get_dns_record_id(formatted_subdomain) if record_id: return self.update_dns_record(record_id, formatted_subdomain, content) else: return self.create_dns_record(formatted_subdomain, content) except Exception as e: print(f"✗ 处理文件 {file_path.name} 时出错: {e}") return False def process_directory(self, directory_path: str, file_pattern: str = "*.b64") -> None: """处理目录中的所有匹配文件""" directory = Path(directory_path) if not directory.exists() or not directory.is_dir(): print(f"✗ 目录不存在: {directory_path}") return files = [] for file_path in directory.glob(file_pattern): seq_number = self.extract_sequence_number(file_path.name) if seq_number is not None: files.append((seq_number, file_path)) if not files: print("✗ 未找到包含序列号的文件") return files.sort(key=lambda x: x[0]) print(f"找到 {len(files)} 个文件,开始处理...") print("-" * 50) success_count = 0 failed_files = [] for seq_number, file_path in files: print(f"处理文件: {file_path.name} -> {str(seq_number).zfill(4)}.{self.base_domain}") if self.process_file(file_path, str(seq_number)): success_count += 1 else: failed_files.append(file_path.name) time.sleep(1) print("-" * 50) print(f"处理完成! 成功: {success_count}/{len(files)}") if failed_files: print(f"失败的文件: {', '.join(failed_files)}")
def main(): API_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" ZONE_ID = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" BASE_DOMAIN = "file.xxx.com" DIRECTORY_PATH = "./files" if not API_TOKEN or not ZONE_ID: print("请设置API_TOKEN和ZONE_ID") return manager = CloudflareDNSManager(API_TOKEN, ZONE_ID, BASE_DOMAIN) manager.process_directory(DIRECTORY_PATH, "*.b64")
if __name__ == "__main__": main()
|
安装依赖:
1 2
| pip install requests pip install dnspython
|
在脚本目录新建一个文件夹files
,把切分好的小文件全放进去,运行脚本,文件内容就会自动按顺序上传到相应域名。文件顺序来自文件名,完美兼容上一步的脚本。例如input_part0001.b64
将会上传到0001.file.xxx.com
。
从DNS记录还原文件
使用以下脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
| import dns.resolver import sys import base64 import time from typing import List
class DNSFileReconstructor: def __init__(self, dns_servers=None): """ 初始化DNS文件重建器 Args: dns_servers: DNS服务器列表,默认为Cloudflare和Google DNS """ self.dns_servers = dns_servers or ['1.1.1.1', '8.8.8.8'] self.resolver = dns.resolver.Resolver() self.resolver.nameservers = self.dns_servers def download_txt_record(self, domain: str) -> str: """ 下载指定域名的TXT记录内容 Args: domain: 要查询的域名 Returns: TXT记录内容字符串,失败返回空字符串 """ try: print(f"查询域名: {domain}") answers = self.resolver.resolve(domain, 'TXT') txt_content = '' for answer in answers: for item in answer.strings: txt_content += item.decode('utf-8') txt_content = txt_content.strip('"') print(f"获取成功,长度: {len(txt_content)} 字符") return txt_content except dns.resolver.NXDOMAIN: print(f"域名不存在: {domain}") return "" except dns.resolver.NoAnswer: print(f"域名没有TXT记录: {domain}") return "" except dns.resolver.Timeout: print(f"查询超时: {domain}") return "" except Exception as e: print(f"查询域名 {domain} 时出错: {e}") return "" def download_all_parts(self, base_domain: str, start_part: int = 1, max_parts: int = 1000) -> List[str]: """ 下载所有分片的TXT记录 Args: base_domain: 基础域名 start_part: 起始分片号 max_parts: 最大尝试分片数 Returns: 分片内容列表 """ all_parts = [] consecutive_failures = 0 max_consecutive_failures = 3 for part_num in range(start_part, max_parts + 1): domain = f"{part_num:04d}.{base_domain}" content = self.download_txt_record(domain) if content: all_parts.append(content) consecutive_failures = 0 print(f"成功下载第 {part_num} 部分") else: consecutive_failures += 1 print(f"第 {part_num} 部分下载失败") if consecutive_failures >= max_consecutive_failures: print(f"连续 {consecutive_failures} 次下载失败,停止搜索") break time.sleep(0.1) return all_parts def reconstruct_b64_file(self, base_domain: str, output_b64_file: str = "reconstructed.b64") -> bool: """ 从DNS TXT记录重建Base64文件 Args: base_domain: 基础域名 output_b64_file: 输出的Base64文件路径 Returns: 是否成功 """ print(f"开始从DNS TXT记录重建Base64文件...") print(f"基础域名: {base_domain}") parts = self.download_all_parts(base_domain) if not parts: print("没有成功下载任何分片") return False full_content = ''.join(parts) try: with open(output_b64_file, 'w', encoding='utf-8') as f: f.write(full_content) print(f"\nBase64文件重建成功!") print(f"总共下载: {len(parts)} 个分片") print(f"总字符数: {len(full_content)}") print(f"Base64文件: {output_b64_file}") return True except Exception as e: print(f"写入Base64文件时出错: {e}") return False def decode_b64_file(self, b64_file: str, output_file: str) -> bool: """ 解码Base64文件为原始文件 Args: b64_file: Base64编码的文件路径 output_file: 解码后的输出文件路径 Returns: 是否成功解码 """ try: print(f"\n开始解码Base64文件: {b64_file}") with open(b64_file, 'r', encoding='utf-8') as f: b64_content = f.read().strip() b64_content = b64_content.replace('\n', '').replace('\r', '').replace(' ', '') decoded_data = base64.b64decode(b64_content) with open(output_file, 'wb') as f: f.write(decoded_data) print(f"Base64解码成功!") print(f"输出文件: {output_file}") print(f"文件大小: {len(decoded_data)} 字节") return True except base64.binascii.Error as e: print(f"Base64解码错误: {e}") print("请检查Base64内容是否完整和正确") return False except Exception as e: print(f"解码文件时出错: {e}") return False def reconstruct_and_decode(self, base_domain: str, output_file: str, temp_b64_file: str = "temp_reconstructed.b64") -> bool: """ 完整的重建和解码流程 Args: base_domain: 基础域名 output_file: 最终输出文件路径 temp_b64_file: 临时Base64文件路径 Returns: 是否成功 """ if not self.reconstruct_b64_file(base_domain, temp_b64_file): return False if not self.decode_b64_file(temp_b64_file, output_file): return False print(f"\n 文件重建和解码完成!") print(f"原始文件已保存到: {output_file}") return True
def main(): """ 主函数:处理命令行参数并执行重建流程 """ if len(sys.argv) < 3: print("DNS TXT记录文件重建和解码工具") print("使用方法: python dns_reconstruct.py <base_domain> <output_file> [temp_b64_file]") print("示例: python dns_reconstruct.py file.xxx.com original_file.zip") print("示例: python dns_reconstruct.py file.xxx.com photo.jpg temp.b64") print("\n参数说明:") print(" base_domain: 基础域名(不包含序号部分)") print(" output_file: 最终输出文件名") print(" temp_b64_file: 临时Base64文件名(可选,默认为temp_reconstructed.b64)") return base_domain = sys.argv[1] output_file = sys.argv[2] temp_b64_file = "temp_reconstructed.b64" if len(sys.argv) > 3: temp_b64_file = sys.argv[3] reconstructor = DNSFileReconstructor() success = reconstructor.reconstruct_and_decode(base_domain, output_file, temp_b64_file) if not success: print("\n❌ 文件重建失败") sys.exit(1)
if __name__ == "__main__": main()
|
直接运行python dns_reconstruct.py file.xxx.com photo.jpg
即可还原文件。
结语
至此,所有讲解已结束,如果有其他想法可继续交流。
最后再强调: