没什么好说的,又去爬举个栗子的Api了,写了个新脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from queue import Queue
import time

class ImageDownloader:
def __init__(self, max_workers=10):
self.base_url = "https://t.alcy.cc/img/"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.max_workers = max_workers
self.downloaded_count = 0
self.total_count = 0
self.lock = threading.Lock()

def get_categories(self):
"""获取所有分类"""
print(f"正在访问主页: {self.base_url}")
try:
response = requests.get(self.base_url, headers=self.headers)
response.raise_for_status()
except Exception as e:
print(f"访问主页失败: {e}")
return []

soup = BeautifulSoup(response.text, 'html.parser')
categories = []

for a in soup.find_all('a', href=True):
if '(' in a.text and ')' in a.text:
category_name = a.text.split('(')[0].strip()
category_url = urljoin(self.base_url, a['href'])
categories.append({
'name': category_name,
'url': category_url
})

return categories

def get_image_urls(self, category_url):
"""获取分类下的所有图片URL"""
try:
response = requests.get(category_url, headers=self.headers)
response.raise_for_status()
except Exception as e:
print(f"获取分类页面失败: {e}")
return []

soup = BeautifulSoup(response.text, 'html.parser')
img_tags = soup.find_all('img')
img_urls = []

for img in img_tags:
src = img.get('data-src') or img.get('src')
if src and not src.startswith('data:'):
img_urls.append(urljoin(self.base_url, src))

# 去重并返回
return list(set(img_urls))

def download_single_image(self, img_url, save_path):
"""下载单张图片"""
try:
response = requests.get(img_url, headers=self.headers, timeout=15)
response.raise_for_status()

# 检查文件是否已存在
if os.path.exists(save_path):
with self.lock:
self.downloaded_count += 1
return True, "已存在"

# 保存图片
with open(save_path, 'wb') as f:
f.write(response.content)

with self.lock:
self.downloaded_count += 1

return True, "成功"

except Exception as e:
return False, str(e)

def download_category_images(self, category, img_urls):
"""多线程下载一个分类的所有图片"""
category_name = category['name']

# 创建分类目录
if not os.path.exists(category_name):
os.makedirs(category_name)
print(f"\n创建目录: {category_name}")

print(f"分类 [{category_name}] 共有 {len(img_urls)} 张图片")

# 使用ThreadPoolExecutor进行多线程下载
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []

for i, img_url in enumerate(img_urls):
# 获取文件名
parsed_url = urlparse(img_url)
filename = os.path.basename(parsed_url.path)
if not filename:
filename = f"image_{i}.jpg"

save_path = os.path.join(category_name, filename)

# 提交下载任务
future = executor.submit(self.download_single_image, img_url, save_path)
futures.append((future, filename, i+1))

# 显示进度
completed = 0
for future, filename, idx in futures:
try:
success, message = future.result(timeout=20)
completed += 1

if success and message == "成功":
status = "✓"
elif success and message == "已存在":
status = "↻"
else:
status = "✗"

# 更新进度显示
progress = f"[{idx}/{len(img_urls)}] {status} {filename}"
print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 当前: {progress[:50]}", end="")

except Exception as e:
completed += 1
print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 失败: {filename[:30]}...", end="")

print(f"\n分类 [{category_name}] 下载完成")

def run(self):
"""主运行函数"""
print("=" * 60)
print("开始获取图片分类...")
print("=" * 60)

# 获取所有分类
categories = self.get_categories()
if not categories:
print("未找到任何分类。")
return

print(f"找到 {len(categories)} 个分类: {[c['name'] for c in categories]}")

# 获取所有图片URL
all_img_urls = []
category_data = []

for cat in categories:
print(f"正在获取分类 [{cat['name']}] 的图片列表...", end="")
img_urls = self.get_image_urls(cat['url'])
all_img_urls.extend(img_urls)
category_data.append({
'category': cat,
'img_urls': img_urls
})
print(f" 找到 {len(img_urls)} 张图片")

self.total_count = len(all_img_urls)
print(f"\n总共发现 {self.total_count} 张图片")
print("=" * 60)

# 开始下载
start_time = time.time()

for data in category_data:
if data['img_urls']:
self.download_category_images(data['category'], data['img_urls'])

# 统计信息
end_time = time.time()
total_time = end_time - start_time

print("\n" + "=" * 60)
print("下载完成!")
print("=" * 60)
print(f"总计图片数量: {self.total_count}")
print(f"成功下载/已存在: {self.downloaded_count}")
print(f"总耗时: {total_time:.2f}秒")
print(f"平均速度: {self.downloaded_count/total_time:.2f} 张/秒" if total_time > 0 else "速度计算中...")
print("=" * 60)

def main():
# 可以调整线程数,根据网络情况和电脑性能
max_workers = 20 # 默认20个线程,可以调整

downloader = ImageDownloader(max_workers=max_workers)
downloader.run()

if __name__ == "__main__":
main()