import random import string import hashlib from collections import defaultdict from datasketch import MinHash from pyspark import SparkContext, SparkConf import cloudpickle def bigrams(text): return [text[i:i+2] for i in range(len(text) - 1)] # 生成虚拟商标数据集 num_trademarks = 1000000 trademarks = [''.join(random.choices(string.ascii_letters + string.digits, k=10)) for _ in range(num_trademarks)] # 创建倒排索引 inverse_index = defaultdict(set) for idx, trademark in enumerate(trademarks): for bigram in bigrams(trademark): inverse_index[bigram].add(idx) # 配置PySpark conf = SparkConf().setAppName("MinHash") sc = SparkContext(conf=conf) def sha1_hashfunc(x): return int(hashlib.sha1(x).hexdigest(), 16) def update_minhash(trademark, num_perm): minhash = MinHash(num_perm, hashfunc=sha1_hashfunc, seed=42) for bigram in bigrams(trademark): minhash.update(bigram.encode('utf8')) return minhash # 创建MinHash签名矩阵 num_perm = 128 rdd = sc.parallelize(trademarks) serialized_hashfunc = cloudpickle.dumps(sha1_hashfunc) broadcast_serialized_hashfunc = sc.broadcast(serialized_hashfunc) def update_minhash_with_serialized_hashfunc(trademark, num_perm, serialized_hashfunc): hashfunc = cloudpickle.loads(serialized_hashfunc.value) return update_minhash(trademark, num_perm, hashfunc) minhash_signatures = rdd.map(lambda trademark: (trademark, update_minhash_with_serialized_hashfunc(trademark, num_perm, broadcast_serialized_hashfunc))).collectAsMap() # 停止SparkContext sc.stop()