minhasd.py 1.53 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
import random
import string
import hashlib
from collections import defaultdict
from datasketch import MinHash
from pyspark import SparkContext, SparkConf
import cloudpickle

def bigrams(text):
    return [text[i:i+2] for i in range(len(text) - 1)]

# 生成虚拟商标数据集
num_trademarks = 1000000
trademarks = [''.join(random.choices(string.ascii_letters + string.digits, k=10)) for _ in range(num_trademarks)]

# 创建倒排索引
inverse_index = defaultdict(set)
for idx, trademark in enumerate(trademarks):
    for bigram in bigrams(trademark):
        inverse_index[bigram].add(idx)

# 配置PySpark
conf = SparkConf().setAppName("MinHash")
sc = SparkContext(conf=conf)

def sha1_hashfunc(x):
    return int(hashlib.sha1(x).hexdigest(), 16)

def update_minhash(trademark, num_perm):
    minhash = MinHash(num_perm, hashfunc=sha1_hashfunc, seed=42)
    for bigram in bigrams(trademark):
        minhash.update(bigram.encode('utf8'))
    return minhash

# 创建MinHash签名矩阵
num_perm = 128
rdd = sc.parallelize(trademarks)
serialized_hashfunc = cloudpickle.dumps(sha1_hashfunc)
broadcast_serialized_hashfunc = sc.broadcast(serialized_hashfunc)

def update_minhash_with_serialized_hashfunc(trademark, num_perm, serialized_hashfunc):
    hashfunc = cloudpickle.loads(serialized_hashfunc.value)
    return update_minhash(trademark, num_perm, hashfunc)

minhash_signatures = rdd.map(lambda trademark: (trademark, update_minhash_with_serialized_hashfunc(trademark, num_perm, broadcast_serialized_hashfunc))).collectAsMap()

# 停止SparkContext
sc.stop()