狗儿

热爱的话就坚持吧~

0%

P站爬虫 by萌咖

啥是P站??

萌咖大佬写的,明晚考完试后学习一下里面的多线程和命令行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Author: MoeClub.org

from urllib import request
import threading
import queue
import re
import os


class pixiv:
def __init__(self):
self.folder = 'PixivImage'
self.web_coding = 'utf-8'
self.root = os.path.dirname(os.path.abspath(__file__))
self.DefaultHeader = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3",
"Accept-Encoding": "",
"Connection": "keep-alive",
}
self.data_low = []
self.num = 0

def _http(self, url, headers, Obj=False):
res = request.urlopen(request.Request(url, headers=headers, method='GET'))
if Obj:
return res
else:
return res.read().decode(self.web_coding, "ignore")

def data_image(self, url_id):
_header = self.DefaultHeader.copy()
_header["Referer"] = "https://www.pixiv.net/member_illust.php?mode=medium&illust_id={}".format(url_id)
_url_data = "https://www.pixiv.net/touch/ajax/illust/details?illust_id={}".format(url_id)
_data_details = self._http(_url_data, _header)
data_url = self.sort_data(re.findall('"url_big":"[^"]*"', _data_details))
data_uid = str(str(str(re.findall('"user_id":"[^"]*"', _data_details)[0]).split(':', 1)[-1]).strip('"'))
return data_url, _header, data_uid

def sort_data(self, data):
_data = []
for item in data:
if item not in _data:
_data.append(item)
return [str(str(item).replace('\\', '').split(':', 1)[-1]).strip('"') for item in _data]

def get_item(self, UserID=None):
if not UserID:
UserID = 'https://www.pixiv.net/ranking.php?mode=male'
if '://' in str(UserID):
Mode_ID = False
else:
Mode_ID = True
if Mode_ID:
_url = "https://www.pixiv.net/ajax/user/{}/profile/all".format(str(UserID))
page = self._http(_url, self.DefaultHeader, True)
if page.code != 200:
raise Exception("Pixiv Page:", page.code)
_data = re.findall('"[0-9]+":null', page.read().decode(self.web_coding, "ignore"))
self.data_low = [str(str(item).split(":")[0]).strip('"') for item in _data if ':null' in str(item)]
else:
page = self._http(UserID, self.DefaultHeader, True)
if page.code != 200:
raise Exception("Pixiv Page:", page.code)
_data = re.findall('data-src="[^"]*"', page.read().decode(self.web_coding, "ignore"))
self.data_low = [str(str(str(str(str(item).split("=", 1)[-1]).strip('"')).rsplit('/', 1)[-1]).split('_')[0]) for item in _data if '/img-master/img/' in str(item)]
self.fliter_item()

def fliter_item(self):
folder = os.path.join(self.root, self.folder)
if not os.path.exists(folder):
return None
_split = "_"
_exist = {}.fromkeys([str(str(item).split(_split)[1]) for item in os.listdir(folder) if _split in item]).keys()
print("Exist Item:", len(_exist))
for _item in self.data_low.copy():
if _item in _exist:
self.data_low.remove(_item)

def get_data_by_item(self, item):
data = self.data_image(item)
for data_url in data[0]:
image = self._http(data_url, data[1], True)
if image.code != 200:
raise Exception("Pixiv Image: [{} | {}]".format(image.code, data[0]))
self.write(str("{}_{}").format(str(data[2]), str(str(data_url).rsplit('/', 1)[-1])), image.read())

def get_data(self, data_list=None):
if not data_list:
data_list = self.data_low
for item in data_list:
self.get_data_by_item(item)
print("\nTotal Image: ", self.num)

def write(self, name, data):
folder = os.path.join(self.root, self.folder)
if not os.path.exists(folder):
os.mkdir(folder)
file = os.path.join(folder, str(name))
fp = open(file, 'wb')
fp.write(data)
fp.close()
self.num += 1
print("Pixiv Image: [ OK | {} ]".format(file))

def add_queue(self, _queue, data_list=None):
for item in data_list:
_item = str(item).strip()
if item and _item:
_queue.put(_item)

def multi_data(self, data_list=None, max=25):
if not data_list:
data_list = self.data_low
print("New Item:", len(data_list))
_threads = []
_queue = queue.Queue(maxsize=max)
task_main = threading.Thread(target=self.add_queue, args=(_queue, data_list))
task_main.setName("TaskMain")
task_main.setDaemon(True)
task_main.start()
while _queue.qsize() > 0:
if len(_threads) >= max:
for _item in _threads.copy():
if not _item.is_alive():
_threads.remove(_item)
continue
item = _queue.get()
task = threading.Thread(target=self.get_data_by_item, args=(item,))
task.setDaemon(True)
task.start()
_threads.append(task)
for _task in _threads:
_task.join()
print("\nTotal Image: ", self.num)


if __name__ == '__main__':
try:
task = os.sys.argv[1]
except:
task = None
p = pixiv()
p.get_item(task)
p.multi_data(max=25)