import json import os import re import sys import time import logging import traceback import zlib import pandas as pd import redis from datetime import datetime sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/") sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from sqlalchemy import create_engine from utils.templates import Templates # from ..utils.templates import Templates from utils.templates_mysql import TemplatesMysql # from ..utils.templates_mysql import TemplatesMysql from pyspark.sql.types import IntegerType from pyspark.sql import functions as F from pyspark.sql.types import * from utils.mysql_db import sql_connect, sql_update_many, sql_delete, get_country_engine from pyspark.sql import SparkSession class SpiderAsinImg(Templates): def __init__(self, site_name='us', consumer_type='lastest', batch_size=100000): super(SpiderAsinImg, self).__init__() self.site_name = site_name self.consumer_type = consumer_type # 消费实时还是消费历史 # 通过date_type 获取 topic self.get_topic_name() # 通过date_type 获取 schema self.init_schema() # self.topic_name = topic_name # 主题名字 self.batch_size = batch_size self.batch_size_history = int(batch_size / 10) # self.db_save = f'kafka_test_001' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}") # self.schema = self.init_schema() # 连接mysql self.engine = get_country_engine(self.site_name) self.pg14_engine = self.get_14pg_country_engine(self.site_name) sql_connect(self.site_name) logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', level=logging.INFO) def judge_spider_asin_detail_is_finished(self): while True: try: sql = f'SELECT * from workflow_progress WHERE page="ASIN详情" and site_name="{self.site_name}" and date_type="{self.date_type}" and date_info="{self.date_info}" and status_val=3' df = pd.read_sql(sql, con=self.engine) if df.shape[0] == 1: print(f"ASIN详情状态为3, 抓取完成并终止程序, site_name:{self.site_name}, date_type:{self.date_type}, date_info:{self.date_info}") self.spark.stop() quit() # 退出程序 break except Exception as e: print(e, traceback.format_exc()) time.sleep(10) self.engine = self.get_connection() def init_schema(self): self.schema = StructType([ StructField("asin", StringType(), True), StructField("img_url", StringType(), True), StructField("img_order_by", StringType(), True), StructField("data_type", StringType(), True), StructField("site", StringType(), True), ]) self.col = ["asin", "img_url", "img_order_by", "data_type", "site"] @staticmethod def clean_kafka_df(df): df = df.withColumnRenamed("seller_id", "account_id") # cols_python = ["asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating", # "brand", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type", # "volume", "weight", "weight_str", "launchTime", "total_comments", "page_inventory"] # oneCategoryRank, aoVal, bsrOrders, bsrOrdersSale # siteName volumeFormat weightFormat asinUpdateTime # java那边插件的字段名称 cols_java = ['asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating', 'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'siteName', 'buyBoxSellerType', 'volume', 'volumeFormat', 'weight', 'weightFormat', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime'] df = df.select("asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type", "volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id") return df def get_topic_name(self): # 需要注意表名问题 # 月表主题 self.topic_name = f"{self.site_name}_asin_image" def handle_history(self): consumer = self.get_kafka_object_by_python(topic_name=self.topic_name) partition_data_count = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name) beginning_offsets_list = [] end_offsets_list = [] for values in partition_data_count.values(): beginning_offsets_list.append(values['beginning_offsets']) end_offsets_list.append(values['end_offsets']) min_offset = min(beginning_offsets_list) max_offset = max(end_offsets_list) # max_offset = max(partition_data_count.values()) # for start_offset in range(0, max_offset+1, self.batch_size_history): for start_offset in range(min_offset, max_offset+1, self.batch_size_history): end_offset = start_offset + self.batch_size_history starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partition_data_count.keys()}}) ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partition_data_count.keys()}}) kafka_df = self.spark.read \ .format("kafka") \ .option("kafka.bootstrap.servers", self.kafka_servers) \ .option("subscribe", self.topic_name) \ .option("startingOffsets", starting_offsets_json) \ .option("endingOffsets", ending_offsets_json) \ .option("failOnDataLoss", "false") \ .load() \ .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \ .select("data.*") print(f"kafka_df.count():{kafka_df.count()}, start_offset:{start_offset}, end_offset:{end_offset}") pdf = kafka_df.toPandas() # pdf.to_sql() self.data_save(pdf) # 关闭SparkSession self.spark.stop() def get_14pg_country_engine(self, site_name="us"): h14_pg_us = { "user": "postgres", "password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS", # "host": "61.145.136.61", "host": "192.168.10.223", "port": "5432", # "port": 54328, "database": "selection", } if site_name == 'us' or site_name == 'mx': h14_pg_us["database"] = f"selection" db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h14_pg_us.values()) # elif site_name == "keepa": # db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*h6_pg_us.values()) else: h14_pg_us["database"] = f"selection_{site_name}" db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h14_pg_us.values()) engine = create_engine(db_, encoding='utf-8') # , pool_recycle=3600 return engine def data_save(self, df): df = df.toPandas() # 获取对应表字段 df = df[self.col] if "site" not in df.keys(): df["site"] = self.site_name logging.info("site is not null") df["site"] = df['site'].fillna("us") # df.drop_duplicates(subset=["asin", "site"], inplace=True) for name, group in df.groupby(['site']): asins = list(set(group["asin"])) logging.info(f"站点{name[0]} asin:{asins}") if name[0] not in ['us', 'de', 'uk', 'it', 'es', 'fr', 'mx', 'ca']: logging.info("非8大站点跳过") continue # if len(asins) == 1: # sql_del = f"delete from {name[0]}_asin_image where asin in ('{tuple(asins)[0]}');" # else: # sql_del = f"delete from {name[0]}_asin_image where asin in {tuple(asins)};" # self.del_pg_asin(sql_del, site=self.site) # logging.info(f"清理{name[0]}_asin_image 表中数据 {asins}") del group["site"] # group.to_sql(name=f'{name[0]}_asin_image', con=self.pg14_engine, if_exists='append', index=False) logging.info(f"入库{name[0]}_asin_image成功") def start_stream(self, processing_time=300): logging.info(f"主题:{self.topic_name} 每 {processing_time} 秒 存储到数据库") kafka_df = self.get_kafka_df_by_spark(schema=self.schema, consumption_type="latest", topics=self.topic_name) query = kafka_df.writeStream.foreachBatch(self.process_batch).trigger(processingTime=f'{processing_time} seconds').start() query.awaitTermination() def process_batch(self, df, epoch_id): # self.judge_spider_asin_detail_is_finished() logging.info(f"当前批次传输的数据量为df.count():{df.count()}") # 确保schema非空以避免NoneType错误 if not self.schema: raise ValueError("Schema is not defined") # df.show(5, truncate=False) logging.info(f"df.columns:{df.columns}") self.data_save(df) print("epoch_id:", epoch_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def run(self): if self.consumer_type == 'lastest': self.start_stream(processing_time=10) else: self.handle_history() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 consumer_type = sys.argv[2] # 参数2:实时 lastest 历史 lastest # us day date_info 2023-11-07 handle_obj = SpiderAsinImg(site_name=site_name, consumer_type=consumer_type, batch_size=100000) handle_obj.run() # /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest # /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest # /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest # for i in `ps -ef|grep "spider_self_asin_detail" |awk '{print $2}' `; do kill -9 $i ; done;