1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import json
import os
import re
import sys
import time
import traceback
import zlib
import pandas as pd
import redis
from datetime import datetime
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
class DimStAsinInfo(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000):
super(DimStAsinInfo, self).__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.consumer_type = consumer_type # 消费实时还是消费历史
self.topic_name = topic_name # 主题名字
self.batch_size = batch_size
self.batch_size_history = int(batch_size / 10)
self.db_save = f'kafka_test_001'
# self.spark = self.create_spark_object(
# app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
# 创建 SparkSession
# self.spark = SparkSession.builder \
# .appName("KafkaConsumerApp") \
# .getOrCreate()
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
self.schema = self.init_schema()
# 连接mysql
self.engine = self.get_connection()
def get_connection(self):
return TemplatesMysql(site_name="us").mysql_connect()
def judge_spider_asin_detail_is_finished(self):
while True:
try:
sql = f'SELECT * from workflow_progress WHERE page="ASIN详情" and site_name="{self.site_name}" and date_type="{self.date_type}" and date_info="{self.date_info}" and status_val=3'
df = pd.read_sql(sql, con=self.engine)
if df.shape[0] == 1:
print(f"ASIN详情状态为3, 抓取完成并终止程序, site_name:{self.site_name}, date_type:{self.date_type}, date_info:{self.date_info}")
self.spark.stop()
quit() # 退出程序
break
except Exception as e:
print(e, traceback.format_exc())
time.sleep(10)
self.engine = self.get_connection()
@staticmethod
def init_schema():
schema = StructType([
StructField("asin", StringType(), True),
StructField("week", StringType(), True),
StructField("title", StringType(), True),
StructField("img_url", StringType(), True),
StructField("rating", StringType(), True),
StructField("total_comments", StringType(), True),
StructField("price", FloatType(), True),
StructField("rank", StringType(), True),
StructField("category", StringType(), True),
StructField("launch_time", StringType(), True),
StructField("volume", StringType(), True),
StructField("weight", StringType(), True),
StructField("page_inventory", IntegerType(), True),
StructField("buy_box_seller_type", IntegerType(), True),
StructField("asin_vartion_list", IntegerType(), True),
StructField("title_len", IntegerType(), True),
StructField("img_num", IntegerType(), True),
StructField("img_type", StringType(), True),
StructField("activity_type", StringType(), True),
StructField("one_two_val", StringType(), True),
StructField("three_four_val", StringType(), True),
StructField("eight_val", StringType(), True),
StructField("qa_num", IntegerType(), True),
StructField("five_star", IntegerType(), True),
StructField("four_star", IntegerType(), True),
StructField("three_star", IntegerType(), True),
StructField("two_star", IntegerType(), True),
StructField("one_star", IntegerType(), True),
StructField("low_star", IntegerType(), True),
StructField("together_asin", StringType(), True),
StructField("brand", StringType(), True),
StructField("ac_name", StringType(), True),
StructField("material", StringType(), True),
StructField("node_id", StringType(), True),
StructField("data_type", IntegerType(), True),
StructField("sp_num", StringType(), True),
StructField("describe", StringType(), True),
StructField("date_info", StringType(), True),
StructField("weight_str", StringType(), True),
StructField("package_quantity", StringType(), True),
StructField("pattern_name", StringType(), True),
StructField("seller_id", StringType(), True),
StructField("variat_num", IntegerType(), True),
StructField("site_name", StringType(), True),
StructField("best_sellers_rank", StringType(), True),
StructField("best_sellers_herf", StringType(), True),
StructField("account_url", StringType(), True),
StructField("account_name", StringType(), True),
StructField("parentAsin", StringType(), True),
StructField("asinUpdateTime", StringType(), True),
])
return schema
@staticmethod
def clean_kafka_df(df):
df = df.withColumnRenamed("seller_id", "account_id")
# cols_python = ["asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
# "brand", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
# "volume", "weight", "weight_str", "launchTime", "total_comments", "page_inventory"]
# oneCategoryRank, aoVal, bsrOrders, bsrOrdersSale
# siteName volumeFormat weightFormat asinUpdateTime
# java那边插件的字段名称
cols_java = ['asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating',
'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'siteName', 'buyBoxSellerType',
'volume', 'volumeFormat', 'weight', 'weightFormat', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime']
df = df.select("asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
"brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
"volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id")
return df
def get_topic_name(self):
if self.site_name == "us" and self.date_type == "month":
self.topic_name = f"{site_name}_asin_detail_{self.date_type}"
else:
self.topic_name = f"{site_name}_asin_detail"
def handle_history(self):
self.get_topic_name()
consumer = self.get_kafka_object_by_python(topic_name=self.topic_name)
partition_data_count = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name)
beginning_offsets_list = []
end_offsets_list = []
for values in partition_data_count.values():
beginning_offsets_list.append(values['beginning_offsets'])
end_offsets_list.append(values['end_offsets'])
min_offset = min(beginning_offsets_list)
max_offset = max(end_offsets_list)
# max_offset = max(partition_data_count.values())
# for start_offset in range(0, max_offset+1, self.batch_size_history):
for start_offset in range(min_offset, max_offset+1, self.batch_size_history):
end_offset = start_offset + self.batch_size_history
starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partition_data_count.keys()}})
ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partition_data_count.keys()}})
kafka_df = self.spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", self.kafka_servers) \
.option("subscribe", self.topic_name) \
.option("startingOffsets", starting_offsets_json) \
.option("endingOffsets", ending_offsets_json) \
.option("failOnDataLoss", "false") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
.select("data.*")
print(f"kafka_df.count():{kafka_df.count()}, start_offset:{start_offset}, end_offset:{end_offset}")
pdf = kafka_df.toPandas()
# pdf.to_sql()
# 关闭SparkSession
self.spark.stop()
def run(self):
self.handle_history()
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
consumer_type = sys.argv[4] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, batch_size=100000)
handle_obj.run()