#!/usr/bin/env python """This is a command line script to score documents.""" import sys import os import getopt import string import re import time import math import Numeric from Bio.Tools import stringfns from Bio.Tools import listfns from Bio.Tools.MultiProc import Task, Scheduler from Extracto import ir from Extracto import stem TF_FN, IDF_FN = ir.log_tf, ir.log_idf # Functions to calculate weights STEM_FN = None # Stemming function PROCESSES = 1 # Number of processes to run PRECISION = Numeric.Float64 # for vector cosine calculations STOPWORDS = {} # a dictionary of the stopwords USE_THREADS = 0 # whether to use threads CACHE_SIZE = 100 # number of documents to cache CLEARED_CACHE_SIZE = None # size of cleared cache USAGE = """docscore [-f docfile] [-h] [-a algorithm] [-s algorithm] [-w stopwordfile] [-c cache size] [-j num procs] [-t] docfreqfile path DESCRIPTION This program takes a directory and scores the "relatedness" between each text file in that directory. The scores are between 0 and 1, with 0 being no relation and 1 meaning that the files are identical. The scores are symmetrical. You can optionally specify a list of stopwords. This must be a text file with one word per line, e.g.: and an by [...] OPTIONS docfreqfile A file containing the document frequencies. This should be a 2 column text file where the first column contains words and the second contains the document frequencies of the words. These words should not have been stemmed. path The path of the documents in the corpora. -f docfile Specify a subset of the files to score. This file should contain a list of the filenames to score. If not provided, I will do a full NxN comparison. (Actually, N(N+1)/2, since comparisons are symmetric.) -h Print out this message. -a algorithm Specify a scoring algorithm to use. This must be either 'wilbur' or 'hersh'. I use the wilbur algorithm by default. -s algorithm Specify a stemming algorithm. This can only be 'porter'. No stemming by default. -w stopwordfile Use the stopwords from stopwordfile. By default, I use no stopwords. These stopwords should not be stemmed. -c cache size Number of files to cache at a time. 100 by default. -j num procs The number of processes to run concurrently. 1 by default. -t Use threads. By default, multiple processes will do fork/exec's. """ time_format = "%H:%M:%S" date_format = "%m/%d/%Y" full_format = "%s %s" % (date_format, time_format) def now(format=full_format): """now(format=full_format) -> string Return a timestamp with format describing the time formatting """ time_tup = time.localtime(time.time()) return time.strftime(format, time_tup) def load_stopwords(filename): """load_stopwords(filename) -> list of stopwords""" lines = open(filename).readlines() good_stopword_re = re.compile(r"^\w+$") stopwords = [] for line in lines: word = line.strip() i = word.find('#') # Everything after '#' is ignored if i >= 0: word = word[:i] i = word.find('//') # Everything after '//' is ignored if i >= 0: word = word[:i] if not word: # ignore blank lines continue if not good_stopword_re.match(word): print >>sys.stderr, "Discarding invalid stopword: %s" % stopword stopwords.append(word) return stopwords def load_document(filename): """load_document(filename) -> string""" return open(filename).read() def tokenize(text): """tokenize(text) -> list of words""" # Just do very simple tokenization on whitespace and punctuation. words = stringfns.split(text, string.whitespace + string.punctuation) if needs_processing(): words = map(process_word, words) return filter(len, words) def count_term_freq(words): """count_term_freq(words) -> counts""" return listfns.count(words) def load_doc_freq(filename): """load_doc_freq(filename) -> dict of document frequency""" # doc freq file should contain lines: # word doc_freq df = {} lines = open(filename).readlines() lines = [x.rstrip() for x in lines] lines = [x.split() for x in lines] for word, count in lines: df[word] = int(count) return df _llw_cache = {} # cache of filename -> local weights _llw_access = {} # filename -> number, where higher number is recent access _llw_insert = 0L # which access number to insert next _llw_cache_lock = None # lock to make this treadsafe def load_local_weight(path, filename): """load_local_weight(path, filename) -> dict of local weights""" global _llw_cache, _llw_access, _llw_insert, _llw_cache_lock # If I'm using threading, then create a lock object. Otherwise, # _llw_cache_lock will be None, and I can safely ignore it. if PROCESSES > 1 and USE_THREADS and not _llw_cache_lock: _llw_cache_lock = threading.Lock() if _llw_cache_lock: _llw_cache_lock.acquire() # If this file isn't in the cache, load it and add it. if not _llw_cache.has_key(filename): text = load_document(os.path.join(path, filename)) words = tokenize(text.lower()) tf = count_term_freq(words) _llw_cache[filename] = calc_local_weights(tf) # Set a recent access number for this file. _llw_access[filename] = (_llw_insert, filename) _llw_insert = _llw_insert + 1 # If the cache is full, clear half the cache. if len(_llw_cache) > CACHE_SIZE: priorities = _llw_access.values() priorities.sort() for p, file in priorities[:len(_llw_cache)-CLEARED_CACHE_SIZE]: del _llw_cache[file] del _llw_access[file] if _llw_cache_lock: _llw_cache_lock.release() return _llw_cache[filename] def needs_processing(): return STOPWORDS or STEM_FN def process_word(word): """process_word(word) -> word or '' Process the word for stopwords or stem. Ignore this word if this returns the empty string. """ if STOPWORDS.has_key(word): return '' if STEM_FN: return STEM_FN(word) return word def load_docfreqs(filename): """load_docfreqs(filename) -> dict of document frequencies""" lines = open(filename).readlines() lines = [x.rstrip() for x in lines] lines = [x.split() for x in lines] df_dict = {} for word, df in lines: word = process_word(word) if word: df_dict[word] = int(df) return df_dict def calc_global_weights(N, df_dict): """calc_global_weights(N, df_dict) -> dict of global weights""" gw = {} for word in df_dict.keys(): # Make sure the document frequency isn't greater than the size # of the corpus. if df_dict[word] > N: raise ValueError, 'word "%s" appears %d times in corpus of %d' % ( word, df_dict[word], N) gw[word] = IDF_FN(N, df_dict[word]) return gw _calc_local_weights_cache = {} def calc_local_weights(tf_dict): """calc_local_weights(tf_dict) -> dict of local weights""" lw = {} for word in tf_dict.keys(): tf = tf_dict[word] if not _calc_local_weights_cache.has_key(tf): _calc_local_weights_cache[tf] = TF_FN(tf) lw[word] = _calc_local_weights_cache[tf] return lw def score_docs(path, docname1, docname2, global_weights): """score_docs(path, docname1, docname2, global_weights) -> score Calculate the similarity score between two documents. docname1 and docname2 are the names of the documents. ndocs is the total number of documents in the corpus. global_weights is a dictionary of the global weights. """ lw1, lw2 = load_local_weight(path, docname1), \ load_local_weight(path, docname2) words = listfns.items(lw1.keys() + lw2.keys()) v1 = Numeric.zeros(len(words), PRECISION) v2 = Numeric.zeros(len(words), PRECISION) i = 0 for word in words: gw = global_weights[word] # XXX would this be faster if gw was multiplied afterwards by numpy? if lw1.has_key(word): v1[i] = gw * lw1[word] if lw2.has_key(word): v2[i] = gw * lw2[word] i = i + 1 return ir.veccos_score(v1, v2) def _score_corpus_h(path, docnums, corpus, global_weights, update_fn, ncomps, ncomps_lock): # helper function for score_corpus. Do not call! if ncomps is None: ncomps = [0] for i in docnums: for j in range(i, len(corpus)): score = score_docs(path, corpus[i], corpus[j], global_weights) if ncomps_lock: ncomps_lock.acquire() ncomps[0] += 1 if ncomps_lock: ncomps_lock.release() if update_fn: update_fn(corpus[i], corpus[j], score) return ncomps[0] def score_corpus(path, docs, corpus, global_weights, update_fn=None): """score_corpus(path, docs, corpus, global_weights[, update_fn]) -> comparisons Do pairwise comparisons of all the documents in the corpus. path is the path to the files. docs is a list of the names of the documents to compare. corpus is a list of all the files in the corpus. global_weights is a dictionary of the global weights for the terms. update_fn is an optional callback that takes as arguments the names of the documents and their similarity score. """ if PROCESSES > 1: # do multitask ncomps = [0] def add_ncomps(task_or_thread, ncomps=ncomps): # This is only necessary when using a task. Tasks pass # arguments back through the return value. Threads manipulate # the variables directly. Thus, I should check to make sure # it's a task. If it's not, don't do anything. nc = getattr(task_or_thread, 'retval', None) if not nc: return ncomps[0] += nc nprocs = PROCESSES if nprocs > len(docs): nprocs = len(docs) scheduler = Scheduler.Scheduler(nprocs, finish_fn=add_ncomps) if USE_THREADS: lock = threading.Lock() for i in range(nprocs): # figure out which ones each process should score docnums = filter(lambda x,p=nprocs,i=i: x%p==i, range(len(docs))) if USE_THREADS: t = threading.Thread(target=_score_corpus_h, args=(path, docnums, corpus, global_weights, update_fn, ncomps, lock)) else: t = Task.Task(target=_score_corpus_h, args=(path, docnums, corpus, global_weights, update_fn, None, None)) scheduler.add(t) while scheduler.run(): time.sleep(0.01) ncomps = ncomps[0] else: ncomps = _score_corpus_h(path, docs, corpus, global_weights, update_fn, None, None) return ncomps def sort_corpus(docnames, corpus): """sort_corpus(docnames, corpus) -> corpus Sort the corpus so that the documents I want to compare are in front. """ doc_dict = listfns.asdict(docnames) # Sort the corpus with a Schwartzian transform based on whether a # document is to be scored. corpus_to_sort = [] for c in corpus: if doc_dict.has_key(c): corpus_to_sort.append((0, c)) del doc_dict[c] else: corpus_to_sort.append((1, c)) # I should have found every document in docnames. If not, complain. if doc_dict: raise ValueError, "Documents %s not found in corpus." % \ ','.join(doc_dict.keys()) corpus_to_sort.sort() return map(lambda x: x[1], corpus_to_sort) def printf(s): print s sys.stdout.flush() if __name__ == '__main__': if len(sys.argv) == 1: # If they didn't specify any arguments, print >>sys.stderr, USAGE # print the instructions and quit. sys.exit(0) try: optlist, args = getopt.getopt(sys.argv[1:], "f:ha:s:w:c:j:t") except getopt.error, x: print >>sys.stderr, x sys.exit(0) if len(args) != 2: # If they gave extraneous arguments, print >>sys.stderr, USAGE # print the instructions and quit. sys.exit(0) docfreqfile, path = args stopwordfile = None docfile = None for opt, arg in optlist: if opt == '-h': print USAGE sys.exit(0) elif opt == '-f': docfile = arg elif opt == '-a': if arg == 'wilbur': TF_FN, IDF_FN = ir.log_tf, ir.log_idf elif arg == 'hersh': TF_FN, IDF_FN = ir.hersh_tf, ir.hersh_idf else: print >>sys.stderr, 'Invalid algorithm "%s"' % arg sys.exit(0) elif opt == '-s': if arg == "porter": STEM_FN = stem.porter else: print >>sys.stderr, 'Invalid stemmer "%s".' % arg sys.exit(0) elif opt == '-w': stopwordfile = arg elif opt == '-c': CACHE_SIZE = int(arg) if CACHE_SIZE < 1: print >>sys.stderr, 'Invalid cache size "%d".' % CACHE_SIZE sys.exit(0) elif opt == '-j': PROCESSES = int(arg) if PROCESSES < 1 or PROCESSES > 500: print >>sys.stderr, 'Invalid argument "%d" for processes.' % \ PROCESSES sys.exit(0) elif opt == '-t': USE_THREADS = 1 import threading # Do some checking to make sure the input is reasonable. if not os.path.isdir(path): print >>sys.stderr, 'I can\'t find the path "%s".' % path sys.exit(0) files = [stopwordfile, docfile, docfreqfile] for file in files: if file and not os.path.isfile(file): print >>sys.stderr, 'I could not find the file "%s".' % file sys.exit(0) # Initialize the TF and IDF functions. printf("Using %s, %s for TF, IDF functions." % ( TF_FN.__name__, IDF_FN.__name__)) # Initialize the stemmer if STEM_FN: printf("Using %s stemming." % STEM_FN.__name__) else: printf("No stemming.") # Initialize the concurrent processes. printf("Running %d processes concurrently." % PROCESSES) if PROCESSES > 1: if USE_THREADS: printf("Using threads.") else: printf("Using fork/exec's.") # Load the stopwords. if stopwordfile: STOPWORDS = load_stopwords(stopwordfile) STOPWORDS = listfns.asdict(STOPWORDS) printf('Read %d stopwords from "%s".' % (len(STOPWORDS), stopwordfile)) else: printf("Not using stopwords.") # Load a list of the filenames in the corpus. printf("Looking for files in corpus...") corpus = os.listdir(path) corpus = filter(lambda x,p=path: os.path.isfile(os.path.join(p, x)), corpus) printf("I found %d files in the corpus." % len(corpus)) printf("Using a cache size of %d documents." % CACHE_SIZE) CLEARED_CACHE_SIZE = int(math.ceil(CACHE_SIZE*0.25)) # Make a list of the documents to compare. if not docfile: # Do an NxN comparison of everything in the corpus. corpus.sort() docs = range(len(corpus)) else: docnames = open(docfile).readlines() docnames = [x.rstrip() for x in docnames] corpus = sort_corpus(docnames, corpus) docs = range(len(docnames)) printf("There are %d files in the comparison set." % len(docs)) # Calculate the global weights. print("Calculating global weights...") df_dict = load_docfreqs(docfreqfile) global_weights = calc_global_weights(len(corpus), df_dict) printf("Calculated global weights for %d words." % len(global_weights)) # Figure out how many comparisons I'm going to do. d, c = long(len(docs)), long(len(corpus)) total = d * (2*c - d + 1) / 2 printf("I'm going to do %d comparisons." % total) # Create a function that will print the output. maxlen = max(map(len, corpus)) def update_fn(d1, d2, score, m=maxlen): ts = now(format=time_format) printf("%s %-*s %-*s %g" % (ts, m, d1, m, d2, score)) # Finally do the comparisons. start = time.time() while 1: # do a stress test ncomps = score_corpus(path, docs, corpus, global_weights, update_fn=update_fn) #ncomps = score_corpus(path, docs, corpus, # global_weights, update_fn=update_fn) #import profile #profile.run("ncomp = score_corpus(path, docs, corpus, global_weights, update_fn=update_fn)") diff = time.time() - start printf("I did %d comparisons in %g seconds." % (ncomps, diff)) printf("That's %g comparisons/s!" % (ncomps/diff))