import json import os import re import sys import time import traceback import zlib import pandas as pd import redis from datetime import datetime sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/") sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from utils.templates_mysql import TemplatesMysql # from ..utils.templates_mysql import TemplatesMysql from pyspark.sql.types import IntegerType from pyspark.sql import functions as F from pyspark.sql.types import * from pyspark.sql import SparkSession class DimStAsinInfo(Templates): def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000): super(DimStAsinInfo, self).__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.consumer_type = consumer_type # 消费实时还是消费历史 self.topic_name = topic_name # 主题名字 self.batch_size = batch_size self.batch_size_history = int(batch_size / 10) self.db_save = f'kafka_test_001' # self.spark = self.create_spark_object( # app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}") # 创建 SparkSession # self.spark = SparkSession.builder \ # .appName("KafkaConsumerApp") \ # .getOrCreate() self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}") self.schema = self.init_schema() # 连接mysql self.engine = self.get_connection() def get_connection(self): return TemplatesMysql(site_name="us").mysql_connect() def judge_spider_asin_detail_is_finished(self): while True: try: sql = f'SELECT * from workflow_progress WHERE page="ASIN详情" and site_name="{self.site_name}" and date_type="{self.date_type}" and date_info="{self.date_info}" and status_val=3' df = pd.read_sql(sql, con=self.engine) if df.shape[0] == 1: print(f"ASIN详情状态为3, 抓取完成并终止程序, site_name:{self.site_name}, date_type:{self.date_type}, date_info:{self.date_info}") self.spark.stop() quit() # 退出程序 break except Exception as e: print(e, traceback.format_exc()) time.sleep(10) self.engine = self.get_connection() @staticmethod def init_schema(): schema = StructType([ StructField("asin", StringType(), True), StructField("week", StringType(), True), StructField("title", StringType(), True), StructField("img_url", StringType(), True), StructField("rating", StringType(), True), StructField("total_comments", StringType(), True), StructField("price", FloatType(), True), StructField("rank", StringType(), True), StructField("category", StringType(), True), StructField("launch_time", StringType(), True), StructField("volume", StringType(), True), StructField("weight", StringType(), True), StructField("page_inventory", IntegerType(), True), StructField("buy_box_seller_type", IntegerType(), True), StructField("asin_vartion_list", IntegerType(), True), StructField("title_len", IntegerType(), True), StructField("img_num", IntegerType(), True), StructField("img_type", StringType(), True), StructField("activity_type", StringType(), True), StructField("one_two_val", StringType(), True), StructField("three_four_val", StringType(), True), StructField("eight_val", StringType(), True), StructField("qa_num", IntegerType(), True), StructField("five_star", IntegerType(), True), StructField("four_star", IntegerType(), True), StructField("three_star", IntegerType(), True), StructField("two_star", IntegerType(), True), StructField("one_star", IntegerType(), True), StructField("low_star", IntegerType(), True), StructField("together_asin", StringType(), True), StructField("brand", StringType(), True), StructField("ac_name", StringType(), True), StructField("material", StringType(), True), StructField("node_id", StringType(), True), StructField("data_type", IntegerType(), True), StructField("sp_num", StringType(), True), StructField("describe", StringType(), True), StructField("date_info", StringType(), True), StructField("weight_str", StringType(), True), StructField("package_quantity", StringType(), True), StructField("pattern_name", StringType(), True), StructField("seller_id", StringType(), True), StructField("variat_num", IntegerType(), True), StructField("site_name", StringType(), True), StructField("best_sellers_rank", StringType(), True), StructField("best_sellers_herf", StringType(), True), StructField("account_url", StringType(), True), StructField("account_name", StringType(), True), StructField("parentAsin", StringType(), True), StructField("asinUpdateTime", StringType(), True), ]) return schema @staticmethod def clean_kafka_df(df): df = df.withColumnRenamed("seller_id", "account_id") # cols_python = ["asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating", # "brand", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type", # "volume", "weight", "weight_str", "launchTime", "total_comments", "page_inventory"] # oneCategoryRank, aoVal, bsrOrders, bsrOrdersSale # siteName volumeFormat weightFormat asinUpdateTime # java那边插件的字段名称 cols_java = ['asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating', 'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'siteName', 'buyBoxSellerType', 'volume', 'volumeFormat', 'weight', 'weightFormat', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime'] df = df.select("asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type", "volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id") return df def get_topic_name(self): if self.site_name == "us" and self.date_type == "month": self.topic_name = f"{site_name}_asin_detail_{self.date_type}" else: self.topic_name = f"{site_name}_asin_detail" def handle_history(self): self.get_topic_name() consumer = self.get_kafka_object_by_python(topic_name=self.topic_name) partition_data_count = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name) beginning_offsets_list = [] end_offsets_list = [] for values in partition_data_count.values(): beginning_offsets_list.append(values['beginning_offsets']) end_offsets_list.append(values['end_offsets']) min_offset = min(beginning_offsets_list) max_offset = max(end_offsets_list) # max_offset = max(partition_data_count.values()) # for start_offset in range(0, max_offset+1, self.batch_size_history): for start_offset in range(min_offset, max_offset+1, self.batch_size_history): end_offset = start_offset + self.batch_size_history starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partition_data_count.keys()}}) ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partition_data_count.keys()}}) kafka_df = self.spark.read \ .format("kafka") \ .option("kafka.bootstrap.servers", self.kafka_servers) \ .option("subscribe", self.topic_name) \ .option("startingOffsets", starting_offsets_json) \ .option("endingOffsets", ending_offsets_json) \ .option("failOnDataLoss", "false") \ .load() \ .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \ .select("data.*") print(f"kafka_df.count():{kafka_df.count()}, start_offset:{start_offset}, end_offset:{end_offset}") pdf = kafka_df.toPandas() # pdf.to_sql() # 关闭SparkSession self.spark.stop() def run(self): self.handle_history() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 consumer_type = sys.argv[4] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, batch_size=100000) handle_obj.run()