Source code for wayward.parsimonious

#!/usr/bin/env python3

# Copyright 2011-2019 University of Amsterdam
# Author: Lars Buitinck

from __future__ import annotations
# TODO: remove redundant typing imports once PEP 585 is finalized

from collections import defaultdict
from heapq import nlargest
import logging
from operator import itemgetter
from typing import Iterable, Optional, Dict, List, Tuple

import numpy as np

from wayward.logsum import logsum


logger = logging.getLogger(__name__)


[docs]class ParsimoniousLM: """ Language model for a set of documents. Constructing an object of this class fits a background model. The top method can then be used to fit document-specific models, also for unseen documents (with the same vocabulary as the background corpus). References ---------- D. Hiemstra, S. Robertson, and H. Zaragoza (2004). `Parsimonious Language Models for Information Retrieval <http://citeseer.ist.psu.edu/viewdoc/summary?doi=10.1.1.4.5806>`_. Proc. SIGIR'04. Parameters ---------- documents : iterable over iterable of str terms All documents that should be included in the corpus model. w : float Weight of document model (1 - weight of corpus model). thresh : int Don't include words that occur fewer than `thresh` times. Attributes ---------- vocab : dict of term -> int Mapping of terms to numeric indices p_corpus : array of float Log probability of terms in background model (indexed by `vocab`) p_document : array of float Log probability of terms in the last processed document model (indexed by `vocab`) """ def __init__( self, documents: Iterable[Iterable[str]], w: np.floating, thresh: int = 0 ): """Collect the vocabulary and fit the background model.""" logger.info('Building corpus model') self.w = w self.p_document: Optional[np.ndarray] = None # Vocabulary: maps terms to numeric indices vocab: Dict[str, int] self.vocab = vocab = {} # Corpus frequency count: Dict[int, int] = defaultdict(int) for d in documents: for tok in d: i = vocab.setdefault(tok, len(vocab)) count[i] += 1 cf = np.empty(len(count), dtype=np.double) for i, f in count.items(): cf[i] = f rare = (cf < thresh) cf -= rare * cf try: old_error_settings = np.seterr(divide='ignore') # log P(t|C) self.p_corpus: np.ndarray = np.log(cf) - np.log(np.sum(cf)) finally: np.seterr(**old_error_settings)
[docs] def top( self, k: int, d: Iterable[str], max_iter: int = 50, eps: float = 1e-5, w: Optional[np.floating] = None ) -> List[Tuple[str, float]]: """ Get the top `k` terms of a document `d` and their log probabilities. Uses the Expectation Maximization (EM) algorithm to estimate term probabilities. Parameters ---------- k : int Number of top terms to return. d : iterable of str terms Terms that make up the document. max_iter : int, optional Maximum number of iterations of EM algorithm to run. eps : float, optional Epsilon: convergence threshold for EM algorithm. w : float, optional Weight of document model; overrides value given to :py:class:`~.ParsimoniousLM` Returns ------- t_p : list of (str, float) Terms and their probabilities in the parsimonious model. """ tf, p_document = self._document_model(d) self.p_document = self._EM(tf, p_document, w, max_iter, eps) term_probabilities = self.get_term_probabilities(self.p_document) return nlargest(k, term_probabilities.items(), itemgetter(1))
[docs] def get_term_probabilities( self, log_prob_distribution: np.ndarray ) -> Dict[str, float]: """ Align a term distribution with the vocabulary, and transform the term log probabilities to linear probabilities. Parameters ---------- log_prob_distribution : array of float Log probability of terms which is indexed by the vocabulary. Returns ------- t_p_map : dict of term -> float Dictionary of terms and their probabilities in the (sub-)model. """ probabilities = np.exp(log_prob_distribution) probabilities[np.isnan(probabilities)] = 0. return {t: probabilities[i] for t, i in self.vocab.items()}
def _document_model(self, d: Iterable[str]) -> Tuple[np.ndarray, np.ndarray]: """ Build document model. Parameters ---------- d : iterable of str terms Returns ------- tf : array of float Term frequencies p_term : array of float Term log probabilities Initial p_term is 1/n_distinct for terms with non-zero tf, 0 for terms with 0 tf. """ logger.info('Gathering term probabilities') tf = np.zeros(len(self.vocab), dtype=np.double) # Term frequency for tok in d: term_id = self.vocab.get(tok) if term_id: tf[term_id] += 1 # ignore counts of terms with zero corpus probability tf *= np.isfinite(self.p_corpus) n_distinct = (tf > 0).sum() try: old_error_settings = np.seterr(divide='ignore') p_term = np.log(tf > 0) - np.log(n_distinct) finally: np.seterr(**old_error_settings) return tf, p_term def _EM( self, tf: np.ndarray, p_term: np.ndarray, w: Optional[np.floating], max_iter: int, eps: float ) -> np.ndarray: """ Expectation maximization. Parameters ---------- tf : array of float Term frequencies, as returned by `document_model()` p_term : array of float Term probabilities, as returned by `document_model()` max_iter : int Number of iterations to run. eps : float Epsilon: convergence threshold for EM algorithm. Returns ------- p_term : array of float A posteriori term probabilities. """ logger.info(f'EM with max_iter={max_iter}, eps={eps}') if w is None: w = self.w w_ = np.log(1 - w) w = np.log(w) p_corpus = self.p_corpus + w_ tf = np.log(tf) try: old_error_settings = np.seterr(divide='ignore') p_term = np.asarray(p_term) for i in range(1, max_iter + 1): # E-step p_term += w E = tf + p_term - np.logaddexp(p_corpus, p_term) # M-step new_p_term = E - logsum(E) diff = new_p_term - p_term p_term = new_p_term if (diff[np.isfinite(diff)] < eps).all(): logger.info(f'EM: convergence reached after {i} iterations') break finally: np.seterr(**old_error_settings) return p_term