Commit 34ac3c02 by fangxingjun

Merge branch 'master' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

# Conflicts:
#	Pyspark_job/sqoop_import/ods_self_asin_detail.py
parents c4dfe3ab 6ae08443
"""
author: 方星钧(ffman)
description: 清洗6大站点对应的 单周的zr,sp,sb,ac,bs,er,tr等7大类型数据表(计算zr,sp类型表的page_rank+合并7张表)
table_read_name: ods_st_rank_zr/sp/sb/ac/bs/er/tr
table_save_name: dim_st_asin_info
table_save_level: dim
version: 3.0
created_date: 2022-05-10
updated_date: 2022-11-07
"""
import json
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType
class DwtSelfAsinDetailRating(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2026-02-27'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.date_info_last_180day = (datetime.strptime(date_info, "%Y-%m-%d") - timedelta(days=179)).strftime("%Y-%m-%d")
self.db_save = f'dwt_self_asin_detail_rating'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
self.df_self_asin_detail = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=2)
def read_data(self):
sql = f"""
select asin, rating, total_comments, date_info from ods_self_asin_detail WHERE
site_name ='{self.site_name}'
and date_type ='{self.date_type}'
and date_info >='{self.date_info_last_180day}'
and date_info <= '{date_info}'
and asin_type not like '%4%'
"""
self.df_self_asin_detail = self.spark.sql(sql).cache()
print("df_self_asin_detail数量",self.df_self_asin_detail.count())
# sql = f"""
# SELECT COUNT(DISTINCT date_info) AS total_days,
# MIN(date_info) AS min_date,
# MAX(date_info) AS max_date
# FROM ods_self_asin_detail
# WHERE date_info >= '{self.date_info_last_180day}'
# AND date_info <= '{date_info}'
# """
#
# self.spark.sql(sql).show()
def handle_data_asin_comments(self):
self.df_self_asin_detail = self.df_self_asin_detail.withColumn(
"total_comments", F.col("total_comments").cast("float").cast("int")
).withColumn(
"rating", F.col("rating").cast("float"))
self.df_self_asin_detail = self.df_self_asin_detail.groupBy("asin", "date_info").agg(
F.max("total_comments").alias("total_comments"), # 取当天最大的评论数
F.max("rating").alias("rating") # 不是分组的字段 不聚合就会丢失
).na.fill({"total_comments": 0}) # 将 Null 评论数填充为 0
# 1. 计算增量(lag 窗口函数,按日期排序取前一条有数据的记录,自动跳过缺失日期)
window_spec = Window.partitionBy("asin").orderBy("date_info")
self.df_self_asin_detail = self.df_self_asin_detail.withColumn(
"prev_comments", F.lag("total_comments").over(window_spec) # 按 asin 分组,按 date_info 排序 找前一天的评论数
).withColumn(
"daily_increment",
F.when(F.col("prev_comments").isNull(), 0) # 如果没有上一行 → 增量为 0
.otherwise(F.col("total_comments") - F.col("prev_comments")) # 否则 今天 - 前一天 评论数
)
# 2. 按 asin 分组,收集每天的数据到列表 单个asin 180天的数据字段放在一个列表
# [{"date_info":"10-19","rating":4.0,...}, {"date_info":"10-22",...}, ...]
df_grouped = self.df_self_asin_detail.groupBy("asin").agg(
F.collect_list(
F.struct("date_info", "rating", "total_comments", "daily_increment")
).alias("data_list")
)
# df_grouped.show(truncate=False,vertical=True)
# 3. UDF:将列表转为 JSON 格式 {"日期": [评分, 评论数, 增量], ...}
@F.udf(StringType())
def to_json(data_list):
if not data_list:
return None
# 按日期排序
sorted_data = sorted(data_list, key=lambda x: x["date_info"])
result = {}
# '{"2025-10-19": [4.0, 100, 0], "2025-10-22": [4.1, 108, 8], "2025-10-23": [4.2, 112, 4]}'
for item in sorted_data:
rating = float(item["rating"]) if item["rating"] is not None else 0.0
rating = round(rating, 1)
total_comments = int(item["total_comments"]) if item["total_comments"] is not None else 0
daily_increment = int(item["daily_increment"]) if item["daily_increment"] is not None else 0
result[item["date_info"]] = [rating, total_comments, daily_increment]
return json.dumps(result, ensure_ascii=False)
# 4. 应用 UDF,每个 ASIN 一条记录
self.df_self_asin_detail = df_grouped.withColumn(
"rating_and_comments_info", to_json("data_list") # 对每行的 data_list 调用 to_json
).select("asin", "rating_and_comments_info")
def handle_data(self):
# rating_and_comments_info
self.handle_data_asin_comments()
self.df_save = self.df_self_asin_detail
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
print("df_save数量为",self.df_save.count())
self.df_save.show( truncate=False,vertical=True) # vertical=True 会让每一行数据竖着显示,字段名和值一一对应
# def save_data(self):
# pass
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
handle_obj = DwtSelfAsinDetailRating(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
def export_data(site_name, date_type, date_info):
engine = get_remote_engine(
site_name="us", # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['asin', 'rating_and_comments_info', 'date_info']
import_table = f'{site_name}_self_rating_180day_copy'
target_table = f'{site_name}_self_rating_180day'
sql_drop = f"drop table if exists {import_table};"
sql_create = f"create table {import_table} like {target_table};"
engine.execute(sql_drop)
engine.execute(sql_create)
print(f"sql_drop: {sql_drop}")
print(f"sql_create: {sql_create}")
engine.sqoop_raw_export(
hive_table='dwt_self_asin_detail_rating',
import_table=import_table,
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
engine.swap_tables(
source_table=import_table,
target_table=target_table
)
if __name__ == '__main__':
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
export_data(site_name, date_type, date_info)
......@@ -28,6 +28,7 @@ site_name_db_dict = {
"it": "selection_it",
}
db_type_alias_map = {
"mysql": "mysql", # 阿里云mysql
"postgresql_14": "postgresql_14", # pg14爬虫库-内网
......@@ -41,7 +42,8 @@ db_type_alias_map = {
DEFAULT_SERVERS = [
# "http://192.168.200.210:7777", # 内网-h5
"http://192.168.200.210:7778", # 内网-h5
"http://61.145.136.61:7777", # 外网
# "http://192.168.200.210:7778", # 内网-h5
# "http://192.168.200.210:7778", # 内网-测试大数据-h5
# "http://192.168.10.216:7777", # 内网-测试大数据-h6
# "http://192.168.10.216:7777", # 内网-h6
......@@ -273,6 +275,25 @@ class RemoteEngine:
print(f"更新 {rows_updated} 行")
return df
def swap_tables(self,
source_table: str,
target_table: str,
dry_run: bool = False):
payload = {
"db": self.db_type,
"site_name": self.site_name,
"source_table": source_table,
"target_table": target_table,
"dry_run": dry_run
}
resp = self._request("swap_tables", payload)
if not resp.get("ok"):
raise RuntimeError(resp)
return resp
def sqoop_raw_import(self,
# site_name: str,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment