import json import os import re import ast import sys import time import logging import traceback import threading import zlib import pandas as pd import numpy as np import redis from datetime import datetime sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/") sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from sqlalchemy import create_engine from sqlalchemy.exc import PendingRollbackError from utils.templates import Templates # from ..utils.templates import Templates from utils.templates_mysql import TemplatesMysql # from ..utils.templates_mysql import TemplatesMysql from pyspark.sql.types import IntegerType from pyspark.sql import functions as F from pyspark.sql.types import * from utils.mysql_db import sql_connect, sql_update_many, sql_delete, get_country_engine from pyspark.sql import SparkSession class SpiderMerchantwordSearch(Templates): def __init__(self, site_name='us', date_type='day', date_info='2023-11-16', consumer_type='lastest', batch_size=100000): super(SpiderMerchantwordSearch, self).__init__() self.site_name = site_name self.date_info = date_info self.consumer_type = consumer_type # 消费实时还是消费历史 self.date_type = date_type # 通过date_type 获取 topic self.get_topic_name() # 通过date_type 获取 schema self.init_schema() # self.topic_name = topic_name # 主题名字 self.batch_size = batch_size self.batch_size_history = int(batch_size / 10) self.db_save = self.topic_name # self.spark = self.create_spark_object( # app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}, {self.consumer_type}") self.app_name = self.get_app_name() self.spark = self.create_spark_object(app_name=f"{self.app_name}") # self.schema = self.init_schema() # 连接mysql self.engine = get_country_engine(self.site_name) self.pg16_engine = self.get_16pg_country_engine(self.site_name) sql_connect(self.site_name) logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', level=logging.INFO) # 爬虫类型--流程表 self.spider_type = "Merchantword搜索词" def init_schema(self): self.schema = StructType([ StructField("cate_type", StringType(), True), StructField("data_list", StringType(), True), StructField("date_info", StringType(), True), StructField("spider_time", StringType(), True), ]) # self.col = ['search_term', 'asin', 'page', 'buy_data', 'label'] def get_topic_name(self): if self.date_type == "day": # self.topic_name = f"merchantwords_search_term" self.topic_name = f"{self.site_name}_merchantwords_{self.date_info.replace('-', '_')}" else: logging.info("self.date_type error -----") quit() def get_16pg_country_engine(self, site_name="us"): h16_pg_us = { "user": "postgres", "password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS", "host": "192.168.10.225", "port": "5432", "database": "selection", } if site_name == 'us' or site_name == 'mx' or site_name == 'ca': h16_pg_us["database"] = f"selection" db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h16_pg_us.values()) else: h16_pg_us["database"] = f"selection_{site_name}" db_ = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(*h16_pg_us.values()) engine = create_engine(db_, encoding='utf-8') # , pool_recycle=3600 return engine def cate_type(self, name, data_list): cate_type = name[0] # df_1['date_info'] = name[1] columns = ['search_term', 'asin', 'page', 'page_row', 'data_type', 'title', 'img', 'price', 'rating', 'reviews', 'created_time'] if cate_type in ['buy']: df = pd.DataFrame(data=data_list, columns=['search_term', 'asin', 'page', 'buy_data', 'label', 'created_time']) df.label = df.label.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符 df.buy_data = df.buy_data.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符 else: df = pd.DataFrame(data=data_list, columns=columns) df['asin'] = df['asin'].str.replace('/', '') df['date_info'] = self.date_info df_asin_detail = pd.DataFrame([]) if cate_type in ['zr', 'sp']: df_asin_detail = df.loc[:, ['asin', 'title', 'img', 'price', 'rating', 'reviews', 'date_info', 'created_time']] if cate_type in ['zr', 'sp']: df = df.loc[:, ['search_term', 'asin', 'page', 'page_row', 'date_info', 'created_time']] df.drop_duplicates(['search_term', 'asin', 'page', 'page_row'], inplace=True) elif cate_type in ['buy']: df = df.loc[:, ['search_term', 'asin', 'page', 'buy_data', 'date_info', 'label', 'created_time']] df.drop_duplicates(['search_term', 'asin', 'page', 'buy_data', 'label'], inplace=True) else: if cate_type in ['sb', 'tr']: df = df.loc[:, ['search_term', 'asin', 'page', 'data_type', 'date_info', 'created_time']] df.drop_duplicates(['search_term', 'asin', 'page', 'data_type'], inplace=True) elif cate_type in ['buy']: df = df.loc[:, ['search_term', 'asin', 'page', 'buy_data', 'date_info', 'label', 'created_time']] df.drop_duplicates(['search_term', 'asin', 'page', 'buy_data', 'label'], inplace=True) else: df = df.loc[:, ['search_term', 'asin', 'page', 'date_info', 'created_time']] df.drop_duplicates(['search_term', 'asin', 'page'], inplace=True) return df, df_asin_detail def add_column_to_list(self, row): l = [] for sub_list in json.loads(row['data_list']): l.append(sub_list + [row["spider_time"]]) return l def save_data_common(self, name, group): logging.info(f"name: {name}") search_exploded_list = group['data_list'].explode() # 展开后转换为一个大列表 search_list = [i for i in search_exploded_list.tolist() if not isinstance(i, float)] if search_list: logging.info(f"搜索词处理{search_list[0:5]}") # 列表等分 # self.list_svg(search_list, chunk_size=100) # 转换为df对象 # 通过类别对对应数据字段进行清洗 df_search_term, df_asin_detail = self.cate_type(name, search_list) logging.info(f"{name} {df_search_term.shape} \n {df_search_term.keys()} {df_search_term.head()}") # 通过站点 类别 和date_info 拼接表名 if name[0] == "buy": table_name = f"{self.site_name}_merchantwords_other_search_term_{name[1].replace('-', '_')}" else: table_name = f"{self.site_name}_merchantwords_search_term_rank_{name[0]}_{name[1].replace('-', '_')}" while True: try: start_time = time.time() df_search_term.to_sql(name=f'{table_name}', con=self.pg16_engine, if_exists='append', index=False) end_time = time.time() logging.info(f"入库 {table_name} 表 {df_search_term.shape} {df_search_term.head(10)} 成功, 耗时:{end_time - start_time}s") break except PendingRollbackError as e: time.sleep(3) logging.info(f"error {e} sleep 3") continue logging.info(f"detail --> save name: {name}") if df_asin_detail.shape[0]: detail_table = f"{self.site_name}_merchantwords_search_term_asin_detail_{name[1].replace('-', '_')}" while True: try: start_time = time.time() df_asin_detail.to_sql(name=detail_table, con=self.pg16_engine, if_exists='append', index=False) end_time = time.time() logging.info(f"入库 {detail_table} 表 {df_asin_detail.shape} {df_asin_detail.head(10)} 成功, 耗时:{end_time - start_time}s") break except PendingRollbackError as e: time.sleep(3) logging.info(f"error {e} sleep 3") continue def save_data(self, df): threads = [] # 将字符串类型改为 python list # df['data_list'] = df['data_list'].apply(json.loads) df["data_list"] = df.apply(self.add_column_to_list, axis=1) for name, group in df.groupby(['cate_type', 'date_info']): thread = threading.Thread(target=self.save_data_common, args=(name, group)) threads.append(thread) thread.start() for thread in threads: thread.join() logging.info("线程处理完成") def handle_kafka_df(self, kafka_df): kafka_df.show(20) # pyspark的kafka_df对象转换成pandas的df对象 pdf = kafka_df.toPandas() # pdf['asin'] = pdf['asin'].apply(lambda x: str(x).replace('/', '')) # 过滤--不符合当前周期的数据 pdf = pdf.loc[(~pdf.date_info.isna()) & (pdf.date_info == self.date_info)] if pdf.shape[0]: logging.info(f"{pdf.keys()}") logging.info(f"----------------------------") self.save_data(pdf) else: logging.info(f"{pdf.shape}") def handle_kafka_history(self, kafka_df): # kafka_df = kafka_df.withColumn("asin", F.regexp_replace("asin", "/", "")) self.handle_kafka_df(kafka_df) def handle_kafka_stream(self, kafka_df, epoch_id): # kafka_df = kafka_df.withColumn("asin", F.translate("asin", "/", "")) self.handle_kafka_df(kafka_df) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 consumer_type = sys.argv[4] # 参数3:实时 lastest 历史 history handle_obj = SpiderMerchantwordSearch(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, batch_size=10000) handle_obj.run_kafka() # for i in `ps -ef|grep "spider_asin_search_day" |awk '{print $2}' `; do kill -9 $i ; done; # 历史 # /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 10g --executor-memory 20g --executor-cores 4 --num-executors 2 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_search_day.py us day 2024-04-10 history > amazon_history_search_day_us.log 2>&1 & # 实时 # /opt/module/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --master yarn --driver-memory 10g --executor-memory 20g --executor-cores 4 --num-executors 2 --queue spark /opt/module/spark/demo/py_demo/my_kafka/spider_asin_search_day.py us day 2024-04-13 history > amazon_history_search_day13_us.log 2>&1 &