1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
| import os import requests from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from concurrent.futures import ThreadPoolExecutor, as_completed import threading from queue import Queue import time
class ImageDownloader: def __init__(self, max_workers=10): self.base_url = "https://t.alcy.cc/img/" self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } self.max_workers = max_workers self.downloaded_count = 0 self.total_count = 0 self.lock = threading.Lock() def get_categories(self): """获取所有分类""" print(f"正在访问主页: {self.base_url}") try: response = requests.get(self.base_url, headers=self.headers) response.raise_for_status() except Exception as e: print(f"访问主页失败: {e}") return [] soup = BeautifulSoup(response.text, 'html.parser') categories = [] for a in soup.find_all('a', href=True): if '(' in a.text and ')' in a.text: category_name = a.text.split('(')[0].strip() category_url = urljoin(self.base_url, a['href']) categories.append({ 'name': category_name, 'url': category_url }) return categories def get_image_urls(self, category_url): """获取分类下的所有图片URL""" try: response = requests.get(category_url, headers=self.headers) response.raise_for_status() except Exception as e: print(f"获取分类页面失败: {e}") return [] soup = BeautifulSoup(response.text, 'html.parser') img_tags = soup.find_all('img') img_urls = [] for img in img_tags: src = img.get('data-src') or img.get('src') if src and not src.startswith('data:'): img_urls.append(urljoin(self.base_url, src)) return list(set(img_urls)) def download_single_image(self, img_url, save_path): """下载单张图片""" try: response = requests.get(img_url, headers=self.headers, timeout=15) response.raise_for_status() if os.path.exists(save_path): with self.lock: self.downloaded_count += 1 return True, "已存在" with open(save_path, 'wb') as f: f.write(response.content) with self.lock: self.downloaded_count += 1 return True, "成功" except Exception as e: return False, str(e) def download_category_images(self, category, img_urls): """多线程下载一个分类的所有图片""" category_name = category['name'] if not os.path.exists(category_name): os.makedirs(category_name) print(f"\n创建目录: {category_name}") print(f"分类 [{category_name}] 共有 {len(img_urls)} 张图片") with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for i, img_url in enumerate(img_urls): parsed_url = urlparse(img_url) filename = os.path.basename(parsed_url.path) if not filename: filename = f"image_{i}.jpg" save_path = os.path.join(category_name, filename) future = executor.submit(self.download_single_image, img_url, save_path) futures.append((future, filename, i+1)) completed = 0 for future, filename, idx in futures: try: success, message = future.result(timeout=20) completed += 1 if success and message == "成功": status = "✓" elif success and message == "已存在": status = "↻" else: status = "✗" progress = f"[{idx}/{len(img_urls)}] {status} {filename}" print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 当前: {progress[:50]}", end="") except Exception as e: completed += 1 print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 失败: {filename[:30]}...", end="") print(f"\n分类 [{category_name}] 下载完成") def run(self): """主运行函数""" print("=" * 60) print("开始获取图片分类...") print("=" * 60) categories = self.get_categories() if not categories: print("未找到任何分类。") return print(f"找到 {len(categories)} 个分类: {[c['name'] for c in categories]}") all_img_urls = [] category_data = [] for cat in categories: print(f"正在获取分类 [{cat['name']}] 的图片列表...", end="") img_urls = self.get_image_urls(cat['url']) all_img_urls.extend(img_urls) category_data.append({ 'category': cat, 'img_urls': img_urls }) print(f" 找到 {len(img_urls)} 张图片") self.total_count = len(all_img_urls) print(f"\n总共发现 {self.total_count} 张图片") print("=" * 60) start_time = time.time() for data in category_data: if data['img_urls']: self.download_category_images(data['category'], data['img_urls']) end_time = time.time() total_time = end_time - start_time print("\n" + "=" * 60) print("下载完成!") print("=" * 60) print(f"总计图片数量: {self.total_count}") print(f"成功下载/已存在: {self.downloaded_count}") print(f"总耗时: {total_time:.2f}秒") print(f"平均速度: {self.downloaded_count/total_time:.2f} 张/秒" if total_time > 0 else "速度计算中...") print("=" * 60)
def main(): max_workers = 20 downloader = ImageDownloader(max_workers=max_workers) downloader.run()
if __name__ == "__main__": main()
|