spider_asin_image.py 10.8 KB
import json
import os
import re
import sys
import time
import logging
import traceback
import zlib
import pandas as pd
import redis
from datetime import datetime
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from sqlalchemy import create_engine
from utils.templates import Templates
# from ..utils.templates import Templates
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils.mysql_db import sql_connect, sql_update_many, sql_delete, get_country_engine
from pyspark.sql import SparkSession


class SpiderAsinImg(Templates):

    def __init__(self, site_name='us', consumer_type='lastest', batch_size=100000):
        super(SpiderAsinImg, self).__init__()
        self.site_name = site_name

        self.consumer_type = consumer_type  # 消费实时还是消费历史
        # 通过date_type 获取 topic
        self.get_topic_name()
        # 通过date_type 获取 schema
        self.init_schema()
        # self.topic_name = topic_name  # 主题名字
        self.batch_size = batch_size
        self.batch_size_history = int(batch_size / 10)
        # self.db_save = f'kafka_test_001'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        # self.schema = self.init_schema()

        # 连接mysql
        self.engine = get_country_engine(self.site_name)
        self.pg14_engine = self.get_14pg_country_engine(self.site_name)
        sql_connect(self.site_name)
        logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s',
                            level=logging.INFO)

    def judge_spider_asin_detail_is_finished(self):
        while True:
            try:
                sql = f'SELECT * from workflow_progress WHERE page="ASIN详情" and site_name="{self.site_name}" and date_type="{self.date_type}" and date_info="{self.date_info}" and status_val=3'
                df = pd.read_sql(sql, con=self.engine)
                if df.shape[0] == 1:
                    print(f"ASIN详情状态为3, 抓取完成并终止程序, site_name:{self.site_name}, date_type:{self.date_type}, date_info:{self.date_info}")
                    self.spark.stop()
                    quit()  # 退出程序
                break
            except Exception as e:
                print(e, traceback.format_exc())
                time.sleep(10)
                self.engine = self.get_connection()

    def init_schema(self):
        self.schema = StructType([
            StructField("asin", StringType(), True),
            StructField("img_url", StringType(), True),
            StructField("img_order_by", StringType(), True),
            StructField("data_type", StringType(), True),
            StructField("site", StringType(), True),
        ])
        self.col = ["asin", "img_url", "img_order_by", "data_type", "site"]

    @staticmethod
    def clean_kafka_df(df):
        df = df.withColumnRenamed("seller_id", "account_id")
        # cols_python = ["asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
        #         "brand", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
        #         "volume", "weight", "weight_str", "launchTime", "total_comments", "page_inventory"]
        # oneCategoryRank, aoVal, bsrOrders, bsrOrdersSale
        # siteName volumeFormat weightFormat asinUpdateTime
        # java那边插件的字段名称
        cols_java = ['asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating',
                    'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'siteName', 'buyBoxSellerType',
                    'volume', 'volumeFormat', 'weight', 'weightFormat', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime']
        df = df.select("asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
                        "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
                        "volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id")
        return df

    def get_topic_name(self):
        # 需要注意表名问题
        # 月表主题
        self.topic_name = f"{self.site_name}_asin_image"

    def handle_history(self):
        consumer = self.get_kafka_object_by_python(topic_name=self.topic_name)
        partition_data_count = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name)

        beginning_offsets_list = []
        end_offsets_list = []
        for values in partition_data_count.values():
            beginning_offsets_list.append(values['beginning_offsets'])
            end_offsets_list.append(values['end_offsets'])

        min_offset = min(beginning_offsets_list)
        max_offset = max(end_offsets_list)

        # max_offset = max(partition_data_count.values())
        # for start_offset in range(0, max_offset+1, self.batch_size_history):
        for start_offset in range(min_offset, max_offset+1, self.batch_size_history):
            end_offset = start_offset + self.batch_size_history
            starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partition_data_count.keys()}})
            ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partition_data_count.keys()}})
            kafka_df = self.spark.read \
                .format("kafka") \
                .option("kafka.bootstrap.servers", self.kafka_servers) \
                .option("subscribe", self.topic_name) \
                .option("startingOffsets", starting_offsets_json) \
                .option("endingOffsets", ending_offsets_json) \
                .option("failOnDataLoss", "false") \
                .load() \
                .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
                .select("data.*")
            print(f"kafka_df.count():{kafka_df.count()}, start_offset:{start_offset}, end_offset:{end_offset}")
            pdf = kafka_df.toPandas()
            # pdf.to_sql()
            self.data_save(pdf)
        # 关闭SparkSession
        self.spark.stop()

    def get_14pg_country_engine(self, site_name="us"):
        h14_pg_us = {
            "user": "postgres",
            "password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
            # "host": "61.145.136.61",
            "host": "192.168.10.223",
            "port": "5432",
            # "port": 54328,
            "database": "selection",
        }
        if site_name == 'us' or site_name == 'mx':
            h14_pg_us["database"] = f"selection"
            db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h14_pg_us.values())
        # elif site_name == "keepa":
        #     db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*h6_pg_us.values())
        else:
            h14_pg_us["database"] = f"selection_{site_name}"
            db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h14_pg_us.values())
        engine = create_engine(db_, encoding='utf-8')  # , pool_recycle=3600
        return engine

    def data_save(self, df):
        df = df.toPandas()
        # 获取对应表字段
        df = df[self.col]
        if "site" not in df.keys():
            df["site"] = self.site_name
            logging.info("site is not null")
        df["site"] = df['site'].fillna("us")
        # df.drop_duplicates(subset=["asin", "site"], inplace=True)
        for name, group in df.groupby(['site']):
            asins = list(set(group["asin"]))
            logging.info(f"站点{name[0]}  asin:{asins}")
            if name[0] not in ['us', 'de', 'uk', 'it', 'es', 'fr', 'mx', 'ca']:
                logging.info("非8大站点跳过")
                continue
            # if len(asins) == 1:
            #     sql_del = f"delete from {name[0]}_asin_image where asin in ('{tuple(asins)[0]}');"
            # else:
            #     sql_del = f"delete from {name[0]}_asin_image where asin in {tuple(asins)};"
            # self.del_pg_asin(sql_del, site=self.site)
            # logging.info(f"清理{name[0]}_asin_image 表中数据   {asins}")
            del group["site"]
            # group.to_sql(name=f'{name[0]}_asin_image', con=self.pg14_engine, if_exists='append', index=False)
            logging.info(f"入库{name[0]}_asin_image成功")

    def start_stream(self, processing_time=300):
        logging.info(f"主题:{self.topic_name} 每 {processing_time} 秒  存储到数据库")
        kafka_df = self.get_kafka_df_by_spark(schema=self.schema, consumption_type="latest", topics=self.topic_name)
        query = kafka_df.writeStream.foreachBatch(self.process_batch).trigger(processingTime=f'{processing_time} seconds').start()
        query.awaitTermination()

    def process_batch(self, df, epoch_id):
        # self.judge_spider_asin_detail_is_finished()
        logging.info(f"当前批次传输的数据量为df.count():{df.count()}")
        # 确保schema非空以避免NoneType错误
        if not self.schema:
            raise ValueError("Schema is not defined")
        # df.show(5, truncate=False)
        logging.info(f"df.columns:{df.columns}")
        self.data_save(df)
        print("epoch_id:", epoch_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def run(self):
        if self.consumer_type == 'lastest':
            self.start_stream(processing_time=10)
        else:
            self.handle_history()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    consumer_type = sys.argv[2]  # 参数2:实时 lastest 历史 lastest
    # us day date_info 2023-11-07
    handle_obj = SpiderAsinImg(site_name=site_name, consumer_type=consumer_type, batch_size=100000)
    handle_obj.run()

# /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest

# /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest

# /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 2g --executor-memory 2g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_image.py us lastest
# for i in `ps -ef|grep "spider_self_asin_detail" |awk '{print $2}' `; do kill -9 $i ; done;