""" author: 方星钧(ffman) description: pyspark程序继承模板 table_read_name: 无 table_save_name: 无 table_save_level: 无 version: 1.0 created_date: 2022-05-07 updated_date: 2022-05-07 """ import calendar import json import os import sys import time import traceback import uuid import pandas as pd import redis sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.spark_util import SparkUtil from pyspark.sql import functions as F from kafka import KafkaConsumer, TopicPartition from pyspark.sql.types import * from utils.db_util import DbTypes, DBUtil from utils.hdfs_utils import HdfsUtils from pyspark.sql import Window from utils.common_util import CommonUtil import subprocess import requests class Templates(object): def __init__(self): # 站点 self.site_name = str() # 日期相关 self.year_quarter = str() self.year_month = str() self.year_week = str() self.year = int() self.quarter = int() self.month = int() self.week = int() self.day = int() self.year_month_days_dict = dict() self.year_quarter_tuple = tuple() self.year_month_tuple = tuple() self.year_week_tuple = tuple() self.date_info_tuple = tuple() self.last_30_date_tuple = tuple() self.date_type = str() # week/4_week/month/quarter self.date_info = str() # 2022-1/2022-1/2022-1/2022-1 self.app_name = str() # spark相关 self.db_save = str() # 需要存储的hive表名 self.db_name = "big_data_selection" # 需要存储的hive表名 # self.db_name = "selection_off_line" # 需要存储的hive表名 self.df_save = object() # 需要存储的df数据对象 self.df_week = object() # 需要存储的df数据对象 self.df_date = object() # 需要存储的df数据对象 self.partitions_num = int() # df数据对象的分区数重置 self.partitions_by = list() # hive分区表对应的分区 # self.reset_partitions() self.reset_partitions_by() self.spark = None # spark程序的执行入口对象 self.topic_name = str() # my_kafka self.consumer = object() # 在此处定义 Kafka 认证和安全参数 # self.kafka_servers = "192.168.10.221:9092,192.168.10.220:9092,192.168.10.210:9092" # self.kafka_servers_producer = "61.145.136.61:19092,61.145.136.61:29092,61.145.136.61:39092" self.kafka_servers = "192.168.10.218:9092,192.168.10.219:9092,192.168.10.220:9092" self.kafka_servers_producer = "'61.145.136.61:19092,61.145.136.61:29092,61.145.136.61:49092,61.145.136.61:59092'" self.kafka_security_protocol = "SASL_PLAINTEXT" self.kafka_sasl_mechanism = "PLAIN" self.kafka_username = "consumer" self.kafka_password = "J2#aLmPq7zX" self.consumer_type = 'lastest' self.processing_time = 300 self.check_path = str() # 实时计算的query对象 self.query = None # 数据库连接 self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us") # 爬虫类型 self.spider_type = "asin详情" # 指定历史消费起始偏移量 self.beginning_offsets = 0 # 测试标识 self.test_flag = 'normal' self.beginning_offsets_dict = {} # history消费时, 初始的偏移量 # redis连接对象--用来锁定--解决并发 self.client = redis.Redis(host='192.168.10.224', port=6379, db=9, password='yswg2023') def create_spark_object(self, app_name=None): if self.topic_name != '': print("创建实时相关SparkSession对象") spark = SparkUtil.get_stream_spark(app_name, self.db_name) else: print("创建非实时相关SparkSession对象") spark = SparkUtil.get_spark_session(app_name, self.db_name) return spark # 针对消费kafka得到的dataframe去重 def deduplication_kafka_data(self, kafka_df, deduplicaiton_key_field, deduplication_time_field): print(f"数据去重清洗,清洗依据字段: {deduplicaiton_key_field}, 排序依据字段: {deduplication_time_field}") window = Window.partitionBy(deduplicaiton_key_field).orderBy( F.col(deduplication_time_field).desc_nulls_last() ) kafka_df = kafka_df.withColumn("k_rank", F.row_number().over(window=window)) kafka_df = kafka_df.filter("k_rank=1").drop("k_rank") return kafka_df def create_kafka_df_object(self, consumer_type=str(), topic_name=str(), starting_offsets_json=str(), ending_offsets_json=str(), schema=StructType()): if consumer_type == "latest": # 流处理 kafka_df = self.spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", self.kafka_servers) \ .option("subscribe", topic_name) \ .option("kafka.security.protocol", self.kafka_security_protocol) \ .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \ .option("kafka.sasl.jaas.config", f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \ .option("startingOffsets", consumer_type) \ .option("failOnDataLoss", "false") \ .load() \ .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \ .select("data.*") return kafka_df elif consumer_type == "history": # 批处理 kafka_df = self.spark.read \ .format("kafka") \ .option("kafka.bootstrap.servers", self.kafka_servers) \ .option("subscribe", topic_name) \ .option("kafka.security.protocol", self.kafka_security_protocol) \ .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \ .option("kafka.sasl.jaas.config", f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \ .option("failOnDataLoss", "false") \ .option("startingOffsets", starting_offsets_json) \ .option("endingOffsets", ending_offsets_json) \ .load() \ .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \ .select("data.*") if self.spider_type == 'asin详情' and kafka_df.count() > 0: kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime") return kafka_df def get_kafka_object_by_python(self, topic_name="us_asin_detail"): consumer = KafkaConsumer( topic_name, bootstrap_servers=self.kafka_servers, value_deserializer=lambda x: json.loads(x.decode('utf-8')), security_protocol=self.kafka_security_protocol, # 或者 'SASL_SSL' 如果你使用 SSL sasl_mechanism=self.kafka_sasl_mechanism, sasl_plain_username=self.kafka_username, sasl_plain_password=self.kafka_password ) return consumer # @staticmethod def get_kafka_partitions_data(self, consumer=None, topic_name="us_asin_detail"): partitions = consumer.partitions_for_topic(topic_name) partition_data_count = {} for pid in partitions: # 创建一个TopicPartition对象 tp = TopicPartition(topic_name, pid) # 获取该分区的最早和最新的offsets beginning_offsets = consumer.beginning_offsets([tp])[tp] if self.beginning_offsets == 0 else self.beginning_offsets end_offsets = consumer.end_offsets([tp])[tp] # 数据量即为这两个offsets之差 data_count = end_offsets - beginning_offsets offset_dict = { "beginning_offsets": beginning_offsets, "end_offsets": end_offsets, "data_count": data_count, } # partition_data_count[pid] = data_count partition_data_count[pid] = offset_dict print("partition_data_count:", partition_data_count) return partition_data_count def get_kafka_df_by_spark(self, schema=None, consumption_type="lastest", topics=f"us_asin_detail"): # .option("startingOffsets", consumption_type) \ # .option("maxOffsetsPerTrigger", 1000) \ # 每个触发器周期读取的最大消息数量 kafka_df = self.spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", self.kafka_servers) \ .option("subscribe", topics) \ .option("kafka.security.protocol", self.kafka_security_protocol) \ .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \ .option("kafka.sasl.jaas.config", f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \ .option("startingOffsets", consumption_type) \ .load() \ .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \ .select("data.*") return kafka_df def get_year_month_days_dict(self, year=2022): self.year_month_days_dict = {month: calendar.monthrange(year, month)[-1] for month in range(1, 13)} def get_date_info_tuple(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;") df = self.df_date.toPandas() if self.date_type == 'day': df_loc = df.loc[df.date == f'{self.date_info}'] self.date_info_tuple = f"('{tuple(df_loc.date)[0]}')" self.year, self.month, self.day = self.date_info.split("-") self.week = list(df_loc.year_week)[0].split("-")[-1] if self.date_type in ['week', 'week_old', 'month', 'month_old']: # df_loc = df.loc[df[f'year_{self.date_type}'] == f"{self.date_info}"] # self.date_info_tuple = tuple(df_loc.date) if self.date_type in ['week', 'week_old']: df_loc = df.loc[df[f'year_week'] == f"{self.date_info}"] self.date_info_tuple = tuple(df_loc.date) self.year, self.week = self.date_info.split("-") self.month = list(df_loc.year_month)[0].split("-")[-1] if self.date_type in ['month', 'month_old']: df_loc = df.loc[df[f'year_month'] == f"{self.date_info}"] self.date_info_tuple = tuple(df_loc.date) self.year, self.month = self.date_info.split("-") if self.date_type == '4_week': df_loc = df.loc[(df[f'year_week'] == f"{self.date_info}") & (df.week_day == 1)] current_id = tuple(df_loc.id)[0] id_tuple = (current_id, current_id - 7 * 1, current_id - 7 * 2, current_id - 7 * 3) df_4_week = df.loc[df.id.isin(id_tuple)] # 4 df_4_week = df.loc[df.year_week.isin(df_4_week.year_week)] # 4*7 self.date_info_tuple = tuple(sorted(list(df_4_week.date))) self.year, self.week = self.date_info.split("-") self.year = tuple(df_loc.year)[0] self.month = list(df_loc.year_month)[0].split("-")[-1] if self.date_type == 'last30day': df_loc = df.loc[df.date == f'{self.date_info}'] day_end_id = list(df_loc.id)[0] # 减去29天,获取到30天前的id day_begin_id = (int(day_end_id) - 29) df_loc = df.loc[(df.id >= day_begin_id) & (df.id <= day_end_id)] self.date_info_tuple = tuple(df_loc.date) self.year, self.month, self.day = self.date_info.split("-") print("self.date_info_tuple:", self.date_info_tuple) def get_year_week_tuple(self): self.df_week = self.spark.sql(f"select * from dim_date_20_to_30 where week_day=1;") # self.df_week = self.spark.sql(f"select * from dim_week_20_to_30;") df = self.df_week.toPandas() df.year_month = df.year_month.apply(lambda x: x.replace("_", "-")) df.year_quarter = df.year_quarter.apply(lambda x: x.replace("_quarter_", "-")) if self.date_type in ['week']: self.year_week = self.date_info # self.year, self.week = int(self.year_week.split("-")[0]), int(self.year_week.split("-")[1]) self.year, self.week = self.year_week.split("-")[0], self.year_week.split("-")[1] self.year_week_tuple = f"('{self.year_week}')" if self.date_type in ['4_week']: self.year_week = self.date_info self.year, self.week = self.year_week.split("-")[0], self.year_week.split("-")[1] df_week = df.loc[df.year_week == self.year_week] current_id = list(df_week.id)[0] if list(df_week.id) else None id_tuple = (current_id, current_id - 7*1, current_id - 7*2, current_id - 7*3) df_4_week = df.loc[df.id.isin(id_tuple)] self.year_week_tuple = tuple(df_4_week.year_week) if tuple(df_4_week.year_week) else () df_week = df.loc[(df.year_week == self.date_info) & (df.week_day == 1)] self.year = tuple(df_week.year)[0] self.month = tuple(df_week.month)[0] print(f"self.year:{self.year}, self.month:{self.month}") if self.date_type in ['month', 'month_old', 'month_week']: self.year_month = self.date_info self.year, self.month = self.year_month.split("-")[0], self.year_month.split("-")[1] df_month = df.loc[df.year_month == self.year_month] self.year_week_tuple = tuple(df_month.year_week) if tuple(df_month.year_week) else () if self.date_type in ['quarter']: self.year_quarter = self.date_info self.year, self.quarter = self.year_quarter.split("-")[0], self.year_quarter.split("-")[1] df_quarter = df.loc[df.year_quarter == self.year_quarter] self.year_week_tuple = tuple(df_quarter.year_week) if tuple(df_quarter.year_week) else () print("self.year_week_tuple:", self.year_week_tuple) return df def reset_partitions(self, partitions_num=10): print("重置分区数") if self.site_name in ['us']: self.partitions_num = partitions_num elif self.site_name in ['uk', 'de']: self.partitions_num = partitions_num // 2 if partitions_num // 2 > 0 else 1 elif self.site_name in ['es', 'fr', 'it']: self.partitions_num = partitions_num // 4 if partitions_num // 4 > 0 else 1 def reset_partitions_by(self): if self.date_type in ['week', '4_week']: self.partitions_by = ['site_name', 'dt'] if self.date_type in ['month']: self.partitions_by = ['site_name', 'dm'] if self.date_type in ['quarter']: self.partitions_by = ['site_name', 'dq'] def read_data(self): pass def handle_data(self): pass @staticmethod def save_data_common(df_save=None, db_save=None, partitions_num=None, partitions_by=None): print("当前存储的表名为:", db_save) df_save = df_save.repartition(partitions_num) df_save.write.saveAsTable(name=db_save, format='hive', mode='append', partitionBy=partitions_by) def save_data(self): self.save_data_common(df_save=self.df_save, db_save=self.db_save, partitions_num=self.partitions_num, partitions_by=self.partitions_by) # 采用insert overwrite模式覆写数据,覆写模式一定要保证dataFrame的字段顺序与表字段顺序一致 @staticmethod def insert_data_overwrite(df_save=None, db_save=None, partitions_num=None): print("当前覆写得表名为:", db_save) df_save = df_save.repartition(partitions_num) df_save.write.insertInto(tableName=db_save, overwrite=True) def insert_data(self): self.insert_data_overwrite(df_save=self.df_save, db_save=self.db_save, partitions_num=self.partitions_num) def run(self): # while True: # try: self.read_data() self.handle_data() self.save_data() # break # except Exception as e: # print("error_info:", e, traceback.format_exc()) # continue def start_process_instance(self): pass def kafka_stream_stop(self): try: self.start_process_instance() # 开启海豚调度 if self.query is not None: self.query.awaitTermination() self.query.stop() # 退出实时消费 if self.spark is not None: self.spark.stop() exit(0) # 退出程序 except Exception as e: print(e, traceback.format_exc()) def kafka_consumption_is_finished(self): while True: try: # if self.site_name == 'us': # # sql = f"SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;" # sql = f""" # SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}' # and date_info in # -- (SELECT MAX(year_week) as date_info from date_20_to_30 WHERE `year_month` = '2024-02' and week_day =1 # (SELECT year_week as date_info from date_20_to_30 WHERE `year_month` = '{self.date_info}' and week_day =1 # ) # ORDER BY created_at desc LIMIT 1; # # """ # else: # sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_info='{self.date_info}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;" sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='{self.spider_type}' and spider_state=3;" print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}") df = pd.read_sql(sql, con=self.engine_mysql) if df.shape[0]: status_val = list(df.status_val)[0] if int(status_val) == 3: print(f"spider_type:{self.spider_type}已经爬取完毕, 退出kafka消费和停止程序") if self.consumer_type == "latest": if HdfsUtils.delete_hdfs_file_with_checkpoint(self.check_path): print("实时消费正常完成,删除对应的检查点文件") self.kafka_stream_stop() else: print(f"spider_type:{self.spider_type}还在爬取中, 继续下一个批次数据消费") break except Exception as e: print(f"判断判断爬虫'{self.spider_type}'是否结束---出现异常, 等待20s", e, traceback.format_exc()) time.sleep(20) self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us") continue def kafka_stream(self, processing_time): kafka_df = self.create_kafka_df_object(consumer_type="latest", topic_name=self.topic_name, schema=self.schema) if self.test_flag == 'test': self.query = kafka_df.writeStream \ .outputMode("append") \ .format("console") \ .foreachBatch(self.handle_kafka_stream_templates) \ .trigger(processingTime=f'{processing_time} seconds') \ .start() self.query.awaitTermination() else: self.check_path = f"/tmp/kafka/{self.topic_name}" if self.check_path == "" else self.check_path print("检查点目录为:", self.check_path) HdfsUtils.is_checkpoint_exist(self.check_path) self.query = kafka_df.writeStream \ .outputMode("append") \ .format("console") \ .foreachBatch(self.handle_kafka_stream_templates) \ .trigger(processingTime=f'{processing_time} seconds') \ .option("checkpointLocation", self.check_path) \ .start() self.query.awaitTermination() def handle_kafka_stream_templates(self, kafka_df, epoch_id): if self.spider_type == 'asin详情' and kafka_df.count() > 0: kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime") self.handle_kafka_stream(kafka_df, epoch_id) if self.test_flag == 'normal': self.kafka_consumption_is_finished() def handle_kafka_stream(self, kafka_df, epoch_id): pass def get_offsets_by_history(self): if self.db_save in ['spider_asin_detail', 'spider_asin_search']: sql = f"select * from selection.kafka_offset_history_detail " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' " \ f"and date_info='{self.date_info}' and topic='{self.topic_name}';" print(f"sql: {sql}") df = pd.read_sql(sql, con=self.engine_mysql) if df.shape[0] == 1: end_offsets_json = list(df.end_offsets_json)[0] print(f"end_offsets_json: {end_offsets_json}") # self.beginning_offsets_dict = json.loads(end_offsets_json) # history消费时, 初始的偏移量 self.beginning_offsets_dict = eval(end_offsets_json) # history消费时, 初始的偏移量 def record_offsets_by_history(self, end_offsets_dict): if self.db_save in ['spider_asin_detail', 'spider_asin_search']: # 将字典转换为 JSON 字符串 end_offsets_json = json.dumps(end_offsets_dict) sql = f""" INSERT INTO selection.kafka_offset_history_detail (site_name, date_type, date_info, topic, end_offsets_json) VALUES ('{self.site_name}', '{self.date_type}', '{self.date_info}', '{self.topic_name}', '{end_offsets_json}') ON DUPLICATE KEY UPDATE site_name = VALUES(site_name), date_type = VALUES(date_type), date_info = VALUES(date_info), topic = VALUES(topic); -- end_offsets_json = VALUES(end_offsets_json); """ print(f"记录爬虫历史消费的偏移量: {self.db_save}--sql: {sql}") with self.engine_mysql.begin() as conn: conn.execute(sql) else: print(f"只有爬虫才需要记录历史消费的偏移量: {self.db_save}") pass def kafka_history(self, topic_name, batch_size_history, schema): consumer = self.get_kafka_object_by_python(topic_name=topic_name) partition_offsets_dict = self.get_kafka_partitions_data(consumer=consumer, topic_name=topic_name) partition_num = len(partition_offsets_dict) beginning_offsets_dict = {} end_offsets_dict = {} # self.beginning_offsets_dict = {"0": 69355, "1": 69761, "2": 70827, "3": 69609, "4": 71099, "5": 69922, "6": 70054, "7": 70798} self.get_offsets_by_history() # 获取历史消费的偏移量 if self.beginning_offsets_dict != {}: for key, value in self.beginning_offsets_dict.items(): beginning_offsets = int(partition_offsets_dict[int(key)]['beginning_offsets']) beginning_offsets = max(beginning_offsets, int(value)) partition_offsets_dict[int(key)]['beginning_offsets'] = beginning_offsets while True: try: # 更新偏移量(当kafka主题有数据正在生产/数据自动删除时, 就需要及时更新起始偏移量) partition_offsets_dict_check = self.get_kafka_partitions_data(consumer=consumer, topic_name=topic_name) print("partition_offsets_dict:", partition_offsets_dict) print("partition_offsets_dict_check:", partition_offsets_dict_check) # 生产时 -- 更新end_offsets for key, value in partition_offsets_dict_check.items(): partition_offsets_dict[key]['end_offsets'] = value['end_offsets'] # 删除时 -- 更新beginning_offsets if value['beginning_offsets'] > partition_offsets_dict[key]['beginning_offsets']: partition_offsets_dict[key]['beginning_offsets'] = value['beginning_offsets'] num = 0 for key, value in partition_offsets_dict.items(): # 起始偏移量 beginning_offsets = value['beginning_offsets'] beginning_offsets_dict[str(key)] = beginning_offsets # 结束偏移量 end_offsets = value['beginning_offsets'] + batch_size_history end_offsets_partition = value['end_offsets'] end_offsets_dict[str(key)] = min(end_offsets, end_offsets_partition) if end_offsets >= end_offsets_partition: num += 1 else: partition_offsets_dict[key]['beginning_offsets'] = end_offsets # 当kafka主题有数据正在生产/数据自动删除时, 就需要及时更新起始偏移量 # 删除时 # if beginning_offsets_dict[str(key)] < partition_offsets_dict_check[key]['beginning_offsets']: # beginning_offsets_dict[str(key)] = partition_offsets_dict_check[key]['beginning_offsets'] # partition_offsets_dict[key]['beginning_offsets'] = partition_offsets_dict_check[key]['beginning_offsets'] # # 生产时 # if partition_offsets_dict[key]['end_offsets'] < partition_offsets_dict_check[key]['end_offsets']: # partition_offsets_dict[key]['end_offsets'] = partition_offsets_dict_check[key]['end_offsets'] starting_offsets_json = json.dumps({topic_name: beginning_offsets_dict}) ending_offsets_json = json.dumps({topic_name: end_offsets_dict}) print(f"starting_offsets_json: {starting_offsets_json}, ending_offsets_json:{ending_offsets_json}") while True: try: kafka_df = self.create_kafka_df_object( consumer_type="history", topic_name=topic_name, schema=schema, starting_offsets_json=starting_offsets_json, ending_offsets_json=ending_offsets_json, ) break except Exception as e: print(f"当前批次-历史消费出现报错--继续消费, 报错信息: {e}") continue print(f"kafka_df.count():{kafka_df.count()}") if num >= partition_num: self.handle_kafka_history_templates(kafka_df=kafka_df) # 最后一批消费 self.record_offsets_by_history(end_offsets_dict=end_offsets_dict) self.start_process_instance() # 退出之前启动调度 break else: self.handle_kafka_history_templates(kafka_df=kafka_df) self.record_offsets_by_history(end_offsets_dict=end_offsets_dict) time.sleep(10) continue # break except Exception as e: print(e, traceback.format_exc()) time.sleep(10) continue # kafka_df = self.spark.read \ # .format("kafka") \ # .option("kafka.bootstrap.servers", self.kafka_servers) \ # .option("subscribe", topic_name) \ # .option("kafka.security.protocol", self.kafka_security_protocol) \ # .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \ # .option("kafka.sasl.jaas.config", # f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \ # .option("failOnDataLoss", "true") \ # .option("startingOffsets", starting_offsets_json) \ # .option("endingOffsets", ending_offsets_json) \ # .load() \ # .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \ # .select("data.*") # print(f"kafka_df.count():{kafka_df.count()}") # print(f"starting_offsets_json: {starting_offsets_json}, ending_offsets_json:{ending_offsets_json}") # # if num >= partition_num: # self.start_process_instance() # 退出之前启动调度 # break # else: # self.handle_kafka_history_templates(kafka_df=kafka_df) # continue def handle_kafka_history_templates(self, kafka_df): self.handle_kafka_history(kafka_df) self.kafka_consumption_is_finished() def handle_kafka_history(self, kafka_df): pass # 组成yarn上提交任务的任务名称 def get_app_name(self): script_name = sys.argv[0].split("/")[-1].split(".")[0] if self.test_flag != 'normal': return f"{script_name}: {self.site_name}, {self.date_type}, {self.date_info}, {self.consumer_type}, {self.test_flag}" else: return f"{script_name}: {self.site_name}, {self.date_type}, {self.date_info}, {self.consumer_type}" # 获取yarn上任务示例的applicationID def get_application_ids(self): try: application_ids = [] response = requests.get( f"http://hadoop15:8088/ws/v1/cluster/apps?state=RUNNING&name={self.app_name}") if len(response.json()['apps']) > 0: for app in response.json()['apps']['app']: application_ids.append(app['id']) return application_ids except subprocess.CalledProcessError as e: print("Error running command:", e) return application_ids # 发生错误 # 用于标记记录表中实时消费准备阶段已完成 def modify_kafka_state(self): # 正式的实时消费才修改状态 script_name = sys.argv[0].split("/")[-1].split(".")[0] if self.consumer_type == 'latest' and self.test_flag == 'normal' and script_name in ['kafka_flow_asin_detail', 'kafka_asin_detail']: if script_name == 'kafka_flow_asin_detail': kafka_field = 'kafka_flow_state' wx_users = ['chenyuanjie', 'pengyanbing'] wx_msg = f"站点: {self.site_name} 日期类型: {self.date_type} {self.date_info} asin详情实时消费数据到es准备工作已完成,可以开启详情爬取!" elif script_name == 'kafka_asin_detail': kafka_field = 'kafka_state' wx_users = ['fangxingjun', 'pengyanbing'] wx_msg = f"站点: {self.site_name}, {self.date_type}, {self.date_info} asin详情实时消费数据到redis准备工作已完成,可以开启详情爬取!" else: pass try: sql = f"UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'" DBUtil.exec_sql('mysql', 'us', sql) CommonUtil.send_wx_msg(wx_users, f"asin详情kafka消费", wx_msg) except Exception as e: print(e, traceback.format_exc()) CommonUtil.send_wx_msg(wx_users, f"\u26A0asin详情kafka实时消费\u26A0", f"站点: {self.site_name} asin详情实时消费准备失败,请等待处理!") else: pass def run_kafka(self): application_ids = self.get_application_ids() print("当前任务id列表为: ", application_ids) if len(application_ids) == 1: print("实时消费正常开启!") if self.test_flag == 'normal' and self.consumer_type == 'latest': self.kafka_consumption_is_finished() self.read_data() self.modify_kafka_state() self.handle_data() if self.consumer_type == 'latest': self.kafka_stream(processing_time=self.processing_time) else: self.kafka_history(topic_name=self.topic_name, batch_size_history=self.batch_size_history, schema=self.schema) elif len(application_ids) > 1: print("任务进程已启动,请不要重复开启!") earliest_applicaiton_id = min(application_ids) for application_id in application_ids: if application_id > earliest_applicaiton_id: cmd = f"yarn application -kill {application_id}" subprocess.run(cmd, shell=True, check=False) exit(0) else: print("任务未成功开启!") exit(0) def run_insert(self): self.read_data() self.handle_data() self.insert_data() def acquire_lock(self, lock_name, timeout=60): """ 尝试获取分布式锁, 能正常设置锁的话返回True, 不能设置锁的话返回None lock_name: 锁的key, 建议和任务名称保持一致 """ lock_value = str(uuid.uuid4()) lock_acquired = self.client.set(lock_name, lock_value, nx=True, ex=timeout) # 可以不设置超时时间 # lock_acquired = self.client.set(lock_name, lock_value, nx=True) return lock_acquired, lock_value def release_lock(self, lock_name, lock_value): """释放分布式锁""" script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ result = self.client.eval(script, 1, lock_name, lock_value) return result