Commit 85922441 by Peng

no message

parent f56f31df
......@@ -5,12 +5,16 @@ import json
class ParseSearchTermUs(object):
def __init__(self, page_source, driver=None, search_term=None, page=1, site_name='us'):
self.etree_html = etree.HTML(page_source)
def __init__(self, page_source, driver=None, search_term=None, page=1, site_name='us',
time_batch=None, sp_all=True, etree_html=None):
self.etree_html = etree_html if etree_html is not None else etree.HTML(page_source)
self.search_term = search_term
self.search_term_html = page_source
self._totalResultCount_raw = re.findall(r'totalResultCount.*?\);</script>', page_source) if page_source else []
print(self._totalResultCount_raw)
self.page = page
self.time_batch = time_batch
self.site_name = site_name
self.sp_all = sp_all
# zr和sp才有page和page_row
self.zr_list = []
self.sp_list = []
......@@ -29,6 +33,17 @@ class ParseSearchTermUs(object):
self.sp_list_all = []
# 获取搜索的所有结果--只针对page=1
self.sold_list = []
# 建立 asin → data-index 位置映射,用于 SP/ZR 的 page_row 排序
# data-index 是 Amazon 页面上每个搜索结果的真实位置编号
# 同一个 ASIN 可能出现多次,只取第一次出现的位置
items = self.etree_html.xpath('//div[@data-index and @data-asin]')
self.asin_position_map = {}
for item in items:
asin = item.get('data-asin', '').strip()
index = item.get('data-index', '')
if asin and index:
if asin not in self.asin_position_map:
self.asin_position_map[asin] = int(index)
def parse_sold_quantity(self):
if self.page == 1:
......@@ -77,7 +92,7 @@ class ParseSearchTermUs(object):
quantity_being_sold_str = ele_span_list[0]
else:
quantity_being_sold_str = None
totalResultCount_list = re.findall(r'totalResultCount.*\);</script>', self.search_term_html)
totalResultCount_list = self._totalResultCount_raw
if totalResultCount_list:
try:
result_count = '{"' + totalResultCount_list[0].replace(');</script>', '')
......@@ -105,20 +120,20 @@ class ParseSearchTermUs(object):
asin_all = self.etree_html.xpath('//div[@data-asin]/@data-asin')
asin_all_str = "-".join(asin_all).replace('/', '')
asin_all = re.findall("(\w+)", asin_all_str)
self.asin_all = asin_all
asin_sb = self.sb_list_all
asin_sp = self.sp_list_all
asin_sb.extend(asin_sp)
for asin in asin_sb:
if asin in asin_all:
asin_all.remove(asin)
return asin_all
self.asin_all = asin_all # 保留原始列表给 parse_buy 用
# 用集合排除 SB/SP 的 ASIN,避免 list.remove() 只删第一个导致漏删
# 先去重保持页面顺序,再排除已识别的 SB 和 SP
exclude_set = set(self.sb_list_all) | set(self.sp_list_all)
asin_unique = list(dict.fromkeys(asin_all))
zr_list = [a for a in asin_unique if a not in exclude_set]
return zr_list
def parse_type_common(self, asin_list=None, cate_type=None):
"""
asin_list: list
"""
asin_list = list(dict.fromkeys(asin_list)) # 去重
asin_list = list(dict.fromkeys(asin_list)) # 去重保序
asin_list.sort(key=lambda a: self.asin_position_map.get(a, 9999)) # 按 data-index 页面位置排序,无 data-index 的排最后
asin_detail_all_list = []
cate_type_copy = 1
asin_detail_dict = {
......@@ -129,14 +144,14 @@ class ParseSearchTermUs(object):
"reviews": None
}
if len(asin_list):
for asin in asin_list:
for idx, asin in enumerate(asin_list):
asin_detail_list = []
page_row = idx + 1 # 类型内排名
asin_detail_list.extend(
(self.search_term, asin, self.page, asin_list.index(asin) + 1, cate_type_copy))
if cate_type in ['zr', 'sp']:
asin_detail_list.extend(self.parse_detail(asin=asin, cate_type=cate_type).values())
else:
asin_detail_list.extend(asin_detail_dict.values()) # 除了zr、sp,其他都不需要解析asin详情
(self.search_term, asin, self.page, page_row, cate_type_copy,self.time_batch
))
asin_detail_list.extend(asin_detail_dict.values()) # 详情字段不入库,统一填None
asin_detail_all_list.append(asin_detail_list)
else:
pass
......@@ -219,33 +234,11 @@ class ParseSearchTermUs(object):
li_list = div.xpath('.//li[@class="a-carousel-card"]')
if li_list:
for li in li_list:
asin_detail_dict = {
"title": None,
"img": None,
"price": None,
"rating": None,
"reviews": None
}
asin_list = li.xpath('.//div[@data-asin]/@data-asin')
if asin_list:
for asin in asin_list:
asin_detail_list = [self.search_term, asin, self.page, page_row, cate_type]
price_list = li.xpath(
'.//span[@class="a-price"]/span[@class="a-offscreen"]/text()')
span_list = li.xpath(
'.//div[@class="a-row a-size-small"]/span[@aria-label]/@aria-label')
img_list = li.xpath('.//img[@class="s-image" and @src]/@src')
h2_list = li.xpath('.//h2//text()')
if price_list:
asin_detail_dict['price'] = price_list[0]
if span_list == 2:
asin_detail_dict['rating'], asin_detail_dict['reviews'] = span_list[0], \
span_list[1]
if img_list:
asin_detail_dict['img'] = img_list[0]
if h2_list:
asin_detail_dict['title'] = h2_list[0]
asin_detail_list.extend(asin_detail_dict.values())
asin_detail_list = [self.search_term, asin, self.page, page_row, cate_type,self.time_batch]
asin_detail_list.extend([None, None, None, None, None])
asin_detail_all_list.append(asin_detail_list)
break
return asin_detail_all_list
......@@ -259,29 +252,38 @@ class ParseSearchTermUs(object):
def parse_buy(self):
## 四月替换以下 同时更改表字段
# 预建 ASIN -> 节点映射,避免每次全树搜索
asin_node_map = {}
for node in self.etree_html.xpath('//div[@data-asin]'):
asin = node.get('data-asin', '').strip()
if asin and asin not in asin_node_map:
asin_node_map[asin] = node
for i in self.asin_all:
buy_text_list = self.etree_html.xpath(
f'//div[@data-asin="{i}"]//span[contains(text(),"bought in past")]/text()')
node = asin_node_map.get(i)
if node is None:
continue
buy_text_list = node.xpath(
'.//span[contains(text(),"bought in past")]/text()')
if len(buy_text_list) == 0:
buy_text_list = self.etree_html.xpath(
f'//div[@data-asin="{i}"]//div[@class="a-row a-size-base"]/span[@class="a-size-base a-color-secondary"]/text()')
buy_text_list = node.xpath(
'.//div[@class="a-row a-size-base"]/span[@class="a-size-base a-color-secondary"]/text()')
if len(buy_text_list) == 0:
if self.site_name == 'us' or self.site_name == 'uk':
buy_text_list = self.etree_html.xpath(
f'//div[@data-asin="{i}"]//span[contains(text(),"past ")]/text()')
buy_text_list = node.xpath(
'.//span[contains(text(),"past ")]/text()')
else:
buy_text_list = self.etree_html.xpath(
f'//div[@data-asin="{i}"]//span[contains(text()," im letzten")]/text()')
label_text_list = self.etree_html.xpath(
f"//div[@data-asin='{i}']//div[contains(@class,'a-size-base a-color-base')]/a/text()")
buy_text_list = node.xpath(
'.//span[contains(text()," im letzten")]/text()')
label_text_list = node.xpath(
".//div[contains(@class,'a-size-base a-color-base')]/a/text()")
if buy_text_list:
if len(buy_text_list[0]) < 2:
buy_text_list = [None]
else:
buy_text_list = [None]
print('月销::', buy_text_list)
asin_brand_list = self.etree_html.xpath(f'//div[@data-asin="{i}"]//h2/following-sibling::div/span/text()|//div[@data-asin="{i}"]//div[@data-cy="title-recipe"]//h2/span[@class="a-size-base-plus a-color-base"]/text()')
asin_brand_list = node.xpath('.//h2/following-sibling::div/span/text()|.//div[@data-cy="title-recipe"]//h2/span[@class="a-size-base-plus a-color-base"]/text()')
if asin_brand_list:
asin_brand = asin_brand_list[0]
else:
......@@ -301,47 +303,26 @@ class ParseSearchTermUs(object):
else:
label_data = label_text_list[0]
if label_data or buy_text_list[0]:
self.buy_text_list.append([self.search_term, i, self.page, buy_text_list[0], label_data, asin_brand])
self.buy_text_list.append([self.search_term, i, self.page, buy_text_list[0], label_data, asin_brand,self.time_batch])
def parse_sp(self):
"""
通过 AdHolder class 识别 SP 广告位(替代原 label-popover-default 方式)
AdHolder 只存在于真正的 SP 广告 DOM 上,不会误匹配标签广告位和品牌广告
"""
try:
sp_asin_list = []
if self.site_name == 'us' or self.site_name == 'uk':
brand_sp_asin_list = self.etree_html.xpath(
'//span[contains(text(),"from Amazon brands")]/../../../../../../../../div//div/@data-csa-c-asin')
else:
brand_sp_asin_list = self.etree_html.xpath(
'//span[contains(text(),"von Amazon-Marken")]/../../../../../../../../div//div/@data-csa-c-asin')
if brand_sp_asin_list:
self.sp_list_all.extend(brand_sp_asin_list)
sp_asin_list.extend(brand_sp_asin_list)
# self.sp_list.extend(self.parse_type_common(asin_list=brand_sp_asin_list, cate_type='sp'))
asin_list = self.etree_html.xpath('//span[contains(@class,"label-popover-default")]/../../../div//@id')
if len(asin_list):
asin_list = [asin.split("-")[-1] for asin in asin_list if len(asin.split("-")[-1]) >= 9]
self.sp_list_all.extend(asin_list)
sp_asin_list.extend(asin_list)
# self.sp_list.extend(self.parse_type_common(asin_list=asin_list, cate_type='sp'))
# 获取标签下的广告位asin,一般是5个,如 Customers frequently viewed Today's deals
if self.site_name == 'de':
sp_label = 'Gesponsert'
elif self.site_name == 'us' or self.site_name == 'uk':
sp_label = 'Sponsored'
else:
sp_label = 'Sponsored'
tag_asin_list = self.etree_html.xpath(
f'//span[@class="a-declarative"]/span[contains(text(),"{sp_label}")]/../../../../../../../../div/following-sibling::span[2]//div/@data-asin|//span/a[contains(text(),"{sp_label}")]/../../../../../../../../div/following-sibling::span[2]//div/@data-asin')
if tag_asin_list:
self.sp_list_all.extend(tag_asin_list)
sp_asin_list.extend(tag_asin_list)
print('所有广告asin:', len(sp_asin_list), 'brand_sp_asin_list:', len(brand_sp_asin_list), '正常sp asin_list::',
len(asin_list), "sp标签下广告位:", len(tag_asin_list))
_sp_asin_list = []
# 有 data-index + data-asin + AdHolder 的就是 SP 广告
items = self.etree_html.xpath('//div[@data-index and @data-asin and contains(@class, "AdHolder")]')
for item in items:
asin = item.get('data-asin', '').strip()
print(self.search_term,' 页数:',self.page,'广告asin:',asin)
if asin and len(asin) >= 9 and asin not in sp_asin_list:
sp_asin_list.append(asin)
self.sp_list_all = sp_asin_list.copy() # 供 parse_asin_zr 排除用
if sp_asin_list:
for _sp_asin in sp_asin_list:
if _sp_asin not in _sp_asin_list:
_sp_asin_list.append(_sp_asin)
self.sp_list.extend(self.parse_type_common(asin_list=_sp_asin_list, cate_type='sp'))
sp_asin_list.sort(key=lambda a: self.asin_position_map.get(a, 9999))
self.sp_list.extend(self.parse_type_common(asin_list=sp_asin_list, cate_type='sp'))
except Exception as e:
pass
......@@ -361,6 +342,8 @@ class ParseSearchTermUs(object):
if sbv1_asin_list:
if len(sbv1_asin_list) > 0:
asin_list.extend(sbv1_asin_list)
self.sb_list_all.extend([asin.replace('/', '') for asin in list(set(sbv1_asin_list))])
else:
# 既是视频又是品牌
sbv2_asin_list = self.etree_html.xpath(
......@@ -382,14 +365,16 @@ class ParseSearchTermUs(object):
if len(svb_asins_list[0]) > 10:
svb_asins_list[0] = svb_asins_list[0][:10]
_sbv_list.extend(svb_asins_list)
self.sb_list_all.extend([asin.replace('/', '') for asin in list(set(svb_asins_list))])
print('头部品牌:', asin_list, '视频后面三个asin:', sbv1_asin_list, '头部视频品牌:', _sbv_list)
if asin_list:
self.sb_list_all.extend([asin.replace('/', '') for asin in asin_list])
self.sb_list_all.extend([asin.replace('/', '') for asin in list(set(asin_list))])
if _sbv_list:
asin_list.extend(_sbv_list)
for asin in asin_list:
asin_detail_list = [self.search_term, asin, self.page, page_row, cate_type]
asin_detail_list.extend(self.parse_detail(asin=asin).values())
asin_detail_list = [self.search_term, asin, self.page, page_row, cate_type,self.time_batch]
asin_detail_list.extend([None, None, None, None, None])
self.sb_list.append(asin_detail_list)
except Exception as e:
pass
......@@ -397,18 +382,30 @@ class ParseSearchTermUs(object):
try:
cate_type = 2
asin_list = self.etree_html.xpath('//div[@class="sb_1LIJTce6"]//a//@href')
if len(asin_list) == 0:
if not asin_list:
asin_list = self.etree_html.xpath(
'//a[@class="a-spacing-none a-link-normal _bXVsd_mainImageLink_1UpRh _bXVsd_link_gJc5l _bXVsd_hidden_L-XDK"]//@href|//div[@data-id="track"]/div/div/a/@href|//a[@class="a-link-normal _bXVsd_link_2cNGK _bXVsd_hidden_FUOrV"]/@href')
asin_list = [re.findall("lp_asins=(.*?)&", asin)[0].split('%2C')[0].replace('/', '') for asin in asin_list
if 'lp_asins' in asin]
'//div[@data-iid="oDeh4GD5fRXSDxLNSBN1Og"]//a[@class="a-link-normal _bXVsd_link_2cNGK _bXVsd_hidden_FUOrV"]/@href|//div[@data-slot="desktop-hsa-3psl"]//div/a[@tabindex="-1"]/@href'
)
if not asin_list:
asin_list = self.etree_html.xpath(
'//div[@data-a-display-strategy="sb-search-grid"]//div/@data-asin'
)
if asin_list:
if len(asin_list) > 6:
asin_list = asin_list[6:9]
elif len(asin_list) > 3:
asin_list = asin_list[3:6]
else:
asin_list = [
re.findall(r"lp_asins=(.*?)&", a)[0].split('%2C')[0].replace('/', '')
for a in asin_list if 'lp_asins' in a
]
print('尾部sb品牌:', asin_list)
if asin_list:
# self.sb_list_all.extend([asin.replace('/', '') for asin in asin_list]) #
self.sb_list_all.extend([asin.replace('/', '') for asin in list(set(asin_list))]) #
for asin in asin_list:
asin_detail_list = [self.search_term, asin.replace('/', ''), self.page, page_row, cate_type]
asin_detail_list.extend(self.parse_detail(asin=asin).values())
asin_detail_list = [self.search_term, asin.replace('/', ''), self.page, page_row, cate_type,self.time_batch]
asin_detail_list.extend([None, None, None, None, None])
self.sb_list.append(asin_detail_list)
except Exception as e:
pass
......@@ -449,30 +446,42 @@ class ParseSearchTermUs(object):
asin_list.extend(asins)
print("解析视频sb sb_3:", asin_list)
if asin_list:
# self.sb_list_all.extend([asin.replace('/', '') for asin in asin_list])
self.sb_list_all.extend([asin.replace('/', '') for asin in list(set(asin_list))])
for i in asin_list:
asin_detail_list = [self.search_term, i.replace('/', ''), self.page, page_row, cate_type]
sb_title = self.etree_html.xpath(
'//div[@class="a-section a-spacing-none faceout-product-title"]//../h2//span//text()')
sb_img = self.etree_html.xpath('//img[@class="sbv-product-img"]/@src')
sb_price = self.etree_html.xpath(
'//div[@class="a-section a-spacing-none faceout-product-title"]//..//span[@class="a-offscreen"]//text()')
sb_rating = self.etree_html.xpath(
'//div[@class="a-section a-spacing-none faceout-product-title"]//..//span[@class="a-icon-alt"]//text()')
sb_review = self.etree_html.xpath(
'//div[@class="a-section a-spacing-none faceout-product-title"]//..//span[@class="a-size-base"]//text()')
sb_title = sb_title[0] if sb_title else None
sb_img = sb_img[0] if sb_img else None
sb_price = sb_price[0] if sb_price else None
sb_rating = sb_rating[0] if sb_rating else None
sb_review = sb_review[0] if sb_review else None
asin_detail_list.extend([sb_title, sb_img, sb_price, sb_rating, sb_review])
asin_detail_list = [self.search_term, i.replace('/', ''), self.page, page_row, cate_type,self.time_batch]
asin_detail_list.extend([None, None, None, None, None])
self.sb_list.append(asin_detail_list)
else:
pass
except Exception as e:
import traceback
print("error:", traceback.format_exc())
# 解析品牌广告 "Sponsored ad from" / "from Amazon brands"
# 原属于 parse_sp,但品牌广告本质是 SB 类型,移到此处
# 前3个 cate_type=1(头部),超出3个的 cate_type=2(尾部)
try:
if self.site_name == 'us' or self.site_name == 'uk':
brand_ad_list = self.etree_html.xpath(
'//span[contains(text(),"from Amazon brands")]/../../../../../../../../div//div/@data-csa-c-asin')
else:
brand_ad_list = self.etree_html.xpath(
'//span[contains(text(),"von Amazon-Marken")]/../../../../../../../../div//div/@data-csa-c-asin')
if not brand_ad_list:
# 备用 xpath:通过 aria-label 匹配
brand_ad_list = self.etree_html.xpath('//a[contains(@aria-label,"Sponsored ad from ")]/..//@data-asin')
if brand_ad_list:
if len(brand_ad_list) == 6:
brand_ad_list = brand_ad_list[3:6]
if brand_ad_list:
self.sb_list_all.extend([asin.replace('/', '') for asin in list(set(brand_ad_list))])
for idx, asin in enumerate(brand_ad_list):
cate_type = 1 if idx < 3 else 2 # 前3个头部,后续尾部
asin_detail_list = [self.search_term, asin.replace('/', ''), self.page, page_row, cate_type,
self.time_batch]
asin_detail_list.extend([None, None, None, None, None])
self.sb_list.append(asin_detail_list)
except Exception as e:
pass
def parse_ac(self):
try:
......@@ -504,12 +513,13 @@ class ParseSearchTermUs(object):
if len(asin)>10:
pattern = re.compile(r'(?<=amzn1\.asin\.)[A-Z0-9]{10}', re.I)
asins = pattern.findall(asin)
bsr_asin_list.append(asins)
if asins:
bsr_asin_list.append(asins[0])
else:
bsr_asin_list.append(asin)
print('############## bsr_asin::', bsr_asin_list)
self.bs_list.extend(self.parse_type_common(asin_list=bsr_asin_list, cate_type='sb'))
self.bs_list.extend(self.parse_type_common(asin_list=bsr_asin_list, cate_type='bs'))
break
except Exception as e:
pass
......@@ -543,10 +553,10 @@ class ParseSearchTermUs(object):
f'//div[@data-asin="{asin}"]//a[@data-type="productTitle"]/text()') # 可以解析
else:
asin_title = self.etree_html.xpath(
'//div[@data-asin="' + asin + '"]//span[contains(@class,"a-text-normal")]//text()') # 可以解析
'//div[@data-asin="' + asin + '"]//h2/@aria-label') # 可以解析
if (len(asin_title) == 0):
asin_title = self.etree_html.xpath(
'//div[@data-asin="' + asin + '"]//span[@class="a-truncate-cut"]//text()') # 失效
'//div[@data-asin="' + asin + '"]//div[@data-cy="title-recipe"]//text()') # 失效
asin_detail_dict['title'] = asin_title[0]
except Exception as e:
pass
......@@ -565,13 +575,13 @@ class ParseSearchTermUs(object):
'//div[@data-asin="' + asin + '"]//span[@class="a-color-link"]//text()')
if len(asin_review) == 0:
asin_review = self.etree_html.xpath(
f'//div[@data-asin="{asin}"]//span[contains(@class,"a-size-base")]/parent::a/parent::span/@aria-label')
f'//div[@data-asin="{asin}"]//div[@data-csa-c-slot-id="alf-reviews"]/a/@aria-label')
asin_detail_dict['reviews'] = asin_review[0]
except Exception as e:
pass
try:
if cate_type == 'sb':
asin_rating = self.etree_html.xapth(
asin_rating = self.etree_html.xpath(
f'//a[contains(@href,"{asin}")]/parent::div/parent::div/parent::div/parent::div//span/@aria-label')
else:
asin_rating = self.etree_html.xpath(
......@@ -610,11 +620,15 @@ class ParseSearchTermUs(object):
return (self.zr_list, self.sp_list, self.sb_list, self.ac_list,
self.bs_list, self.er_list, self.tr_list, self.sold_list, self.buy_text_list, self.hr_list)
# if __name__ == '__main__':
# with open(r'C:\Users\ASUS\Downloads\python2.html','r',encoding='utf-8')as f:
# response = f.read()
# parse_search_term = ParseSearchTermUs(page_source=response, driver=None, search_term='keywords',
# page=1, site_name='us')
# st_list = parse_search_term.run()
# zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list = st_list
# print( zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list )
\ No newline at end of file
if __name__ == '__main__':
with open(r'C:\Users\ASUS\Desktop\text.html','r',encoding='utf-8')as f:
response = f.read()
parse_search_term = ParseSearchTermUs(page_source=response, driver=None, search_term='keywords',
page=1, site_name='us')
st_list = parse_search_term.run()
zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list = st_list
print( 'sp_list:',sp_list)
print('zr_list:' ,zr_list)
print('sb_list:' ,sb_list)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment