1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import random
import string
import hashlib
from collections import defaultdict
from datasketch import MinHash
from pyspark import SparkContext, SparkConf
import cloudpickle
def bigrams(text):
return [text[i:i+2] for i in range(len(text) - 1)]
# 生成虚拟商标数据集
num_trademarks = 1000000
trademarks = [''.join(random.choices(string.ascii_letters + string.digits, k=10)) for _ in range(num_trademarks)]
# 创建倒排索引
inverse_index = defaultdict(set)
for idx, trademark in enumerate(trademarks):
for bigram in bigrams(trademark):
inverse_index[bigram].add(idx)
# 配置PySpark
conf = SparkConf().setAppName("MinHash")
sc = SparkContext(conf=conf)
def sha1_hashfunc(x):
return int(hashlib.sha1(x).hexdigest(), 16)
def update_minhash(trademark, num_perm):
minhash = MinHash(num_perm, hashfunc=sha1_hashfunc, seed=42)
for bigram in bigrams(trademark):
minhash.update(bigram.encode('utf8'))
return minhash
# 创建MinHash签名矩阵
num_perm = 128
rdd = sc.parallelize(trademarks)
serialized_hashfunc = cloudpickle.dumps(sha1_hashfunc)
broadcast_serialized_hashfunc = sc.broadcast(serialized_hashfunc)
def update_minhash_with_serialized_hashfunc(trademark, num_perm, serialized_hashfunc):
hashfunc = cloudpickle.loads(serialized_hashfunc.value)
return update_minhash(trademark, num_perm, hashfunc)
minhash_signatures = rdd.map(lambda trademark: (trademark, update_minhash_with_serialized_hashfunc(trademark, num_perm, broadcast_serialized_hashfunc))).collectAsMap()
# 停止SparkContext
sc.stop()