1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import json
import os
import re
import sys
import time
import logging
import traceback
import zlib
import pandas as pd
import redis
from datetime import datetime
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from sqlalchemy import create_engine
from utils.templates import Templates
# from ..utils.templates import Templates
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils.mysql_db import sql_connect, sql_update_many, sql_delete, get_country_engine
from pyspark.sql import SparkSession
class SpiderAsinImg(Templates):
def __init__(self, site_name='us', consumer_type='lastest', batch_size=100000):
super(SpiderAsinImg, self).__init__()
self.site_name = site_name
self.consumer_type = consumer_type # 消费实时还是消费历史
# 通过date_type 获取 topic
self.get_topic_name()
# 通过date_type 获取 schema
self.init_schema()
# self.topic_name = topic_name # 主题名字
self.batch_size = batch_size
self.batch_size_history = int(batch_size / 10)
# self.db_save = f'kafka_test_001'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
# self.schema = self.init_schema()
# 连接mysql
self.engine = get_country_engine(self.site_name)
self.pg14_engine = self.get_14pg_country_engine(self.site_name)
sql_connect(self.site_name)
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s',
level=logging.INFO)
def judge_spider_asin_detail_is_finished(self):
while True:
try:
sql = f'SELECT * from workflow_progress WHERE page="ASIN详情" and site_name="{self.site_name}" and date_type="{self.date_type}" and date_info="{self.date_info}" and status_val=3'
df = pd.read_sql(sql, con=self.engine)
if df.shape[0] == 1:
print(f"ASIN详情状态为3, 抓取完成并终止程序, site_name:{self.site_name}, date_type:{self.date_type}, date_info:{self.date_info}")
self.spark.stop()
quit() # 退出程序
break
except Exception as e:
print(e, traceback.format_exc())
time.sleep(10)
self.engine = self.get_connection()
def init_schema(self):
self.schema = StructType([
StructField("asin", StringType(), True),
StructField("img_url", StringType(), True),
StructField("img_order_by", StringType(), True),
StructField("data_type", StringType(), True),
StructField("site", StringType(), True),
])
self.col = ["asin", "img_url", "img_order_by", "data_type", "site"]
@staticmethod
def clean_kafka_df(df):
df = df.withColumnRenamed("seller_id", "account_id")
# cols_python = ["asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
# "brand", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
# "volume", "weight", "weight_str", "launchTime", "total_comments", "page_inventory"]
# oneCategoryRank, aoVal, bsrOrders, bsrOrdersSale
# siteName volumeFormat weightFormat asinUpdateTime
# java那边插件的字段名称
cols_java = ['asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating',
'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'siteName', 'buyBoxSellerType',
'volume', 'volumeFormat', 'weight', 'weightFormat', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime']
df = df.select("asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
"brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
"volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id")
return df
def get_topic_name(self):
# 需要注意表名问题
# 月表主题
self.topic_name = f"{self.site_name}_asin_image"
def handle_history(self):
consumer = self.get_kafka_object_by_python(topic_name=self.topic_name)
partition_data_count = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name)
beginning_offsets_list = []
end_offsets_list = []
for values in partition_data_count.values():
beginning_offsets_list.append(values['beginning_offsets'])
end_offsets_list.append(values['end_offsets'])
min_offset = min(beginning_offsets_list)
max_offset = max(end_offsets_list)
# max_offset = max(partition_data_count.values())
# for start_offset in range(0, max_offset+1, self.batch_size_history):
for start_offset in range(min_offset, max_offset+1, self.batch_size_history):
end_offset = start_offset + self.batch_size_history
starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partition_data_count.keys()}})
ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partition_data_count.keys()}})
kafka_df = self.spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", self.kafka_servers) \
.option("subscribe", self.topic_name) \
.option("startingOffsets", starting_offsets_json) \
.option("endingOffsets", ending_offsets_json) \
.option("failOnDataLoss", "false") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
.select("data.*")
print(f"kafka_df.count():{kafka_df.count()}, start_offset:{start_offset}, end_offset:{end_offset}")
pdf = kafka_df.toPandas()
# pdf.to_sql()
self.data_save(pdf)
# 关闭SparkSession
self.spark.stop()
def get_14pg_country_engine(self, site_name="us"):
h14_pg_us = {
"user": "postgres",
"password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
# "host": "61.145.136.61",
"host": "192.168.10.223",
"port": "5432",
# "port": 54328,
"database": "selection",
}
if site_name == 'us' or site_name == 'mx':
h14_pg_us["database"] = f"selection"
db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h14_pg_us.values())
# elif site_name == "keepa":
# db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*h6_pg_us.values())
else:
h14_pg_us["database"] = f"selection_{site_name}"
db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h14_pg_us.values())
engine = create_engine(db_, encoding='utf-8') # , pool_recycle=3600
return engine
def data_save(self, df):
df = df.toPandas()
# 获取对应表字段
df = df[self.col]
if "site" not in df.keys():
df["site"] = self.site_name
logging.info("site is not null")
df["site"] = df['site'].fillna("us")
# df.drop_duplicates(subset=["asin", "site"], inplace=True)
for name, group in df.groupby(['site']):
asins = list(set(group["asin"]))
logging.info(f"站点{name[0]} asin:{asins}")
if name[0] not in ['us', 'de', 'uk', 'it', 'es', 'fr', 'mx', 'ca']:
logging.info("非8大站点跳过")
continue
# if len(asins) == 1:
# sql_del = f"delete from {name[0]}_asin_image where asin in ('{tuple(asins)[0]}');"
# else:
# sql_del = f"delete from {name[0]}_asin_image where asin in {tuple(asins)};"
# self.del_pg_asin(sql_del, site=self.site)
# logging.info(f"清理{name[0]}_asin_image 表中数据 {asins}")
del group["site"]
# group.to_sql(name=f'{name[0]}_asin_image', con=self.pg14_engine, if_exists='append', index=False)
logging.info(f"入库{name[0]}_asin_image成功")
def start_stream(self, processing_time=300):
logging.info(f"主题:{self.topic_name} 每 {processing_time} 秒 存储到数据库")
kafka_df = self.get_kafka_df_by_spark(schema=self.schema, consumption_type="latest", topics=self.topic_name)
query = kafka_df.writeStream.foreachBatch(self.process_batch).trigger(processingTime=f'{processing_time} seconds').start()
query.awaitTermination()
def process_batch(self, df, epoch_id):
# self.judge_spider_asin_detail_is_finished()
logging.info(f"当前批次传输的数据量为df.count():{df.count()}")
# 确保schema非空以避免NoneType错误
if not self.schema:
raise ValueError("Schema is not defined")
# df.show(5, truncate=False)
logging.info(f"df.columns:{df.columns}")
self.data_save(df)
print("epoch_id:", epoch_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def run(self):
if self.consumer_type == 'lastest':
self.start_stream(processing_time=10)
else:
self.handle_history()
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
consumer_type = sys.argv[2] # 参数2:实时 lastest 历史 lastest
# us day date_info 2023-11-07
handle_obj = SpiderAsinImg(site_name=site_name, consumer_type=consumer_type, batch_size=100000)
handle_obj.run()
# /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest
# /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest
# /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest
# for i in `ps -ef|grep "spider_self_asin_detail" |awk '{print $2}' `; do kill -9 $i ; done;