Advanced Text Processing with Spark - Machine Learning with Spark (2015)

Machine Learning with Spark (2015)

Chapter 9. Advanced Text Processing with Spark

In Chapter 3, Obtaining, Processing, and Preparing Data with Spark, we covered various topics related to feature extraction and data processing, including the basics of extracting features from text data. In this chapter, we will introduce more advanced text processing techniques available in MLlib to work with large-scale text datasets.

In this chapter, we will:

· Work through detailed examples that illustrate data processing, feature extraction, and the modeling pipeline, as they relate to text data

· Evaluate the similarity between two documents based on the words in the documents

· Use the extracted text features as inputs for a classification model

· Cover a recent development in natural language processing to model words themselves as vectors and illustrate the use of Spark's Word2Vec model to evaluate the similarity between two words, based on their meaning

What's so special about text data?

Text data can be complex to work with for two main reasons. First, text and language have an inherent structure that is not easily captured using the raw words as is (for example, meaning, context, different types of words, sentence structure, and different languages, to highlight a few). Therefore, naïve feature extraction is usually relatively ineffective.

Second, the effective dimensionality of text data is extremely large and potentially limitless. Think about the number of words in the English language alone and add all kinds of special words, characters, slang, and so on to this. Then, throw in other languages and all the types of text one might find across the Internet. The dimension of text data can easily exceed tens or even hundreds of millions of words, even in relatively small datasets. For example, the Common Crawl dataset of billions of websites contains over 840 billion individual words.

To deal with these issues, we need ways of extracting more structured features and methods to handle the huge dimensionality of text data.

Extracting the right features from your data

The field of natural language processing (NLP) covers a wide range of techniques to work with text, from text processing and feature extraction through to modeling and machine learning. In this chapter, we will focus on two feature extraction techniques available within MLlib: the TF-IDF term weighting scheme and feature hashing.

Working through an example of TF-IDF, we will also explore the ways in which processing, tokenization, and filtering during feature extraction can help reduce the dimensionality of our input data as well as improve the information content and usefulness of the features we extract.

Term weighting schemes

In Chapter 3, Obtaining, Processing, and Preparing Data with Spark, we looked at vector representation where text features are mapped to a simple binary vector called the bag-of-words model. Another representation used commonly in practice is called term frequency-inverse document frequency (TF-IDF).

TF-IDF weights each term in a piece of text (referred to as a document) based on its frequency in the document (the term frequency). A global normalization, called the inverse document frequency, is then applied based on the frequency of this term among all documents (the set of documents in a dataset is commonly referred to as a corpus). The standard definition of TF-IDF is shown here:

tf-idf(t,d) = tf(t,d) x idf(t)

Here, tf(t,d) is the frequency (number of occurrences) of term t in document d and idf(t) is the inverse document frequency of term t in the corpus; this is defined as follows:

idf(t) = log(N / d)

Here, N is the total number of documents, and d is the number of documents in which the term t occurs.

The TF-IDF formulation means that terms occurring many times in a document receive a higher weighting in the vector representation relative to those that occur few times in the document. However, the IDF normalization has the effect of reducing the weight of terms that are very common across all documents. The end result is that truly rare or important terms should be assigned higher weighting, while more common terms (which are assumed to have less importance) should have less impact in terms of weighting.

Note

A good resource to learn more about the bag-of-words model (or vector space model) is the book Introduction to Information Retrieval, Christopher D. Manning, Prabhakar Raghavan and Hinrich Schütze, Cambridge University Press (available in HTML form athttp://nlp.stanford.edu/IR-book/html/htmledition/irbook.html).

It contains sections on text processing techniques, including tokenization, stop word removal, stemming, and the vector space model, as well as weighting schemes such as TF-IDF.

An overview can also be found at http://en.wikipedia.org/wiki/Tf%E2%80%93idf.

Feature hashing

Feature hashing is a technique to deal with high-dimensional data and is often used with text and categorical datasets where the features can take on many unique values (often many millions of values). In the previous chapters, we often used the 1-of-K encoding approach for categorical features, including text. While this approach is simple and effective, it can break down in the face of extremely high-dimensional data.

Building and using 1-of-K feature encoding requires us to keep a mapping of each possible feature value to an index in a vector. Furthermore, the process of creating the mapping itself requires at least one additional pass through the dataset and can be tricky to do in parallel scenarios. Up until now, we have often used a simple approach of collecting the distinct feature values and zipping this collection with a set of indices to create a map of feature value to index. This mapping is then broadcast (either explicitly in our code or implicitly by Spark) to each worker.

However, when dealing with huge feature dimensions in the tens of millions or more that are common when working with text, this approach can be slow and can require significant memory and network resources, both on the Spark master (to collect the unique values) and workers (to broadcast the resulting mapping to each worker, which keeps it in memory to allow it to apply the feature encoding to its local piece of the input data).

Feature hashing works by assigning the vector index for a feature based on the value obtained by hashing this feature to a number (usually, an integer value) using a hash function. For example, let's say the hash value of a categorical feature for the geolocation ofUnited States is 342. We will use the hashed value as the vector index, and the value at this index will be 1.0 to indicate the presence of the United States feature. The hash function used must be consistent (that is, for a given input, it returns the same output each time).

This encoding works the same way as mapping-based encoding, except that we choose a size for our feature vector upfront. As the most common hash functions return values in the entire range of integers, we will use a modulo operation to restrict the index values to the size of our vector, which is typically much smaller (a few tens of thousands to a few million, depending on our requirements).

Feature hashing has the advantage that we do not need to build a mapping and keep it in memory. It is also easy to implement, very fast, and can be done online and in real time, thus not requiring a pass through our dataset first. Finally, because we selected a feature vector dimension that is significantly smaller than the raw dimensionality of our dataset, we bound the memory usage of our model both in training and production; hence, memory usage does not scale with the size and dimensionality of our data.

However, there are two important drawbacks, which are as follows:

· As we don't create a mapping of features to index values, we also cannot do the reverse mapping of feature index to value. This makes it harder to, for example, determine which features are most informative in our models.

· As we are restricting the size of our feature vectors, we might experience hash collisions. This happens when two different features are hashed into the same index in our feature vector. Surprisingly, this doesn't seem to have a severe impact on model performance as long as we choose a reasonable feature vector dimension relative to the dimension of the input data.

Note

Further information on hashing can be found at http://en.wikipedia.org/wiki/Hash_function.

A key paper that introduced the use of hashing for feature extraction and machine learning is:

Kilian Weinberger, Anirban Dasgupta, John Langford, Alex Smola, and Josh Attenberg. Feature Hashing for Large Scale Multitask Learning. Proc. ICML 2009, which is available at http://alex.smola.org/papers/2009/Weinbergeretal09.pdf.

Extracting the TF-IDF features from the 20 Newsgroups dataset

To illustrate the concepts in this chapter, we will use a well-known text dataset called 20 Newsgroups; this dataset is commonly used for text-classification tasks. This is a collection of newsgroup messages posted across 20 different topics. There are various forms of data available. For our purposes, we will use the bydate version of the dataset, which is available at http://qwone.com/~jason/20Newsgroups.

This dataset splits up the available data into training and test sets that comprise 60 percent and 40 percent of the original data, respectively. Here, the messages in the test set occur after those in the training set. This dataset also excludes some of the message headers that identify the actual newsgroup; hence, it is an appropriate dataset to test the real-world performance of classification models.

Note

Further information on the original dataset can be found in the UCI Machine Learning Repository page at http://kdd.ics.uci.edu/databases/20newsgroups/20newsgroups.data.html.

To get started, download the data and unzip the file using the following command:

>tar xfvz 20news-bydate.tar.gz

This will create two folders: one called 20news-bydate-train and another one called 20news-bydate-test. Let's take a look at the directory structure under the training dataset folder:

>cd 20news-bydate-train/

>ls

You will see that it contains a number of subfolders, one for each newsgroup:

alt.atheism comp.windows.x rec.sport.hockey soc.religion.christian

comp.graphics misc.forsale sci.crypt talk.politics.guns

comp.os.ms-windows.misc rec.autos sci.electronics talk.politics.mideast

comp.sys.ibm.pc.hardware rec.motorcycles sci.med talk.politics.misc

comp.sys.mac.hardware rec.sport.baseball sci.space talk.religion.misc

There are a number of files under each newsgroup folder; each file contains an individual message posting:

> ls rec.sport.hockey

52550 52580 52610 52640 53468 53550 53580 53610 53640 53670 53700 53731 53761 53791

...

We can take a look at a part of one of these messages to see the format:

> head -20 rec.sport.hockey/52550

From: dchhabra@stpl.ists.ca (Deepak Chhabra)

Subject: Superstars and attendance (was Teemu Selanne, was +/- leaders)

Nntp-Posting-Host: stpl.ists.ca

Organization: Solar Terresterial Physics Laboratory, ISTS

Distribution: na

Lines: 115

Dean J. Falcione (posting from jrmst+8@pitt.edu) writes:

[I wrote:]

>>When the Pens got Mario, granted there was big publicity, etc, etc,

>>and interest was immediately generated. Gretzky did the same thing for LA.

>>However, imnsho, neither team would have seen a marked improvement in

>>attendance if the team record did not improve. In the year before Lemieux

>>came, Pittsburgh finished with 38 points. Following his arrival, the Pens

>>finished with 53, 76, 72, 81, 87, 72, 88, and 87 points, with a couple of

^^

>>Stanley Cups thrown in.

...

As we can see, each message contains some header fields that contain the sender, subject, and other metadata, followed by the raw content of the message.

Exploring the 20 Newsgroups data

Now, we will start up our Spark Scala console, ensuring that we make enough memory available:

>./SPARK_HOME/bin/spark-shell --driver-memory 4g

Looking at the directory structure, you might recognize that once again, we have data contained in individual text files (one text file per message). Therefore, we will again use Spark's wholeTextFiles method to read the content of each file into a record in our RDD.

In the code that follows, PATH refers to the directory in which you extracted the 20news-bydate ZIP file:

val path = "/PATH/20news-bydate-train/*"

val rdd = sc.wholeTextFiles(path)

val text = rdd.map { case (file, text) => text }

println(text.count)

The first time you run this command, it might take quite a bit of time, as Spark needs to scan the directory structure. You will also see quite a lot of console output, as Spark logs all the file paths that are being processed. During the processing, you will see the following line displayed, indicating the total number of files that Spark has detected:

...

14/10/12 14:27:54 INFO FileInputFormat: Total input paths to process : 11314

...

After the command has finished running, you will see the total record count, which should be the same as the preceding Total input paths to process screen output:

11314

Next, we will take a look at the newsgroup topics available:

val newsgroups = rdd.map { case (file, text) => file.split("/").takeRight(2).head }

val countByGroup = newsgroups.map(n => (n, 1)).reduceByKey(_ + _).collect.sortBy(-_._2).mkString("\n")

println(countByGroup)

This will display the following result:

(rec.sport.hockey,600)

(soc.religion.christian,599)

(rec.motorcycles,598)

(rec.sport.baseball,597)

(sci.crypt,595)

(rec.autos,594)

(sci.med,594)

(comp.windows.x,593)

(sci.space,593)

(sci.electronics,591)

(comp.os.ms-windows.misc,591)

(comp.sys.ibm.pc.hardware,590)

(misc.forsale,585)

(comp.graphics,584)

(comp.sys.mac.hardware,578)

(talk.politics.mideast,564)

(talk.politics.guns,546)

(alt.atheism,480)

(talk.politics.misc,465)

(talk.religion.misc,377)

We can see that the number of messages is roughly even between the topics.

Applying basic tokenization

The first step in our text processing pipeline is to split up the raw text content in each document into a collection of terms (also referred to as tokens). This is known as tokenization. We will start by applying a simple whitespace tokenization, together with converting each token to lowercase for each document:

val text = rdd.map { case (file, text) => text }

val whiteSpaceSplit = text.flatMap(t => t.split(" ").map(_.toLowerCase))

println(whiteSpaceSplit.distinct.count)

Tip

In the preceding code, we used the flatMap function instead of map, as for now, we want to inspect all the tokens together for exploratory analysis. Later in this chapter, we will apply our tokenization scheme on a per-document basis, so we will use the mapfunction.

After running this code snippet, you will see the total number of unique tokens after applying our tokenization:

402978

As you can see, for even a relatively small set of text, the number of raw tokens (and, therefore, the dimensionality of our feature vectors) can be very high.

Let's take a look at a randomly selected document:

println(whiteSpaceSplit.sample(true, 0.3, 42).take(100).mkString(","))

Tip

Note that we set the third parameter to the sample function, which is the random seed. We set this function to 42 so that we get the same results from each call to sample so that your results match those in this chapter.

This will display the following result:

atheist,resources

summary:,addresses,,to,atheism

keywords:,music,,thu,,11:57:19,11:57:19,gmt

distribution:,cambridge.,290

archive-name:,atheism/resources

alt-atheism-archive-name:,december,,,,,,,,,,,,,,,,,,,,,,addresses,addresses,,,,,,,religion,to:,to:,,p.o.,53701.

telephone:,sell,the,,fish,on,their,cars,,with,and,written

inside.,3d,plastic,plastic,,evolution,evolution,7119,,,,,san,san,san,mailing,net,who,to,atheist,press

aap,various,bible,,and,on.,,,one,book,is:

"the,w.p.,american,pp.,,1986.,bible,contains,ball,,based,based,james,of

Improving our tokenization

The preceding simple approach results in a lot of tokens and does not filter out many nonword characters (such as punctuation). Most tokenization schemes will remove these characters. We can do this by splitting each raw document on nonword charactersusing a regular expression pattern:

val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase))

println(nonWordSplit.distinct.count)

This reduces the number of unique tokens significantly:

130126

If we inspect the first few tokens, we will see that we have eliminated most of the less useful characters in the text:

println(nonWordSplit.distinct.sample(true, 0.3, 42).take(100).mkString(","))

You will see the following result displayed:

bone,k29p,w1w3s1,odwyer,dnj33n,bruns,_congressional,mmejv5,mmejv5,artur,125215,entitlements,beleive,1pqd9hinnbmi,

jxicaijp,b0vp,underscored,believiing,qsins,1472,urtfi,nauseam,tohc4,kielbasa,ao,wargame,seetex,museum,typeset,pgva4,

dcbq,ja_jp,ww4ewa4g,animating,animating,10011100b,10011100b,413,wp3d,wp3d,cannibal,searflame,ets,1qjfnv,6jx,6jx,

detergent,yan,aanp,unaskable,9mf,bowdoin,chov,16mb,createwindow,kjznkh,df,classifieds,hour,cfsmo,santiago,santiago,

1r1d62,almanac_,almanac_,chq,nowadays,formac,formac,bacteriophage,barking,barking,barking,ipmgocj7b,monger,projector,

hama,65e90h8y,homewriter,cl5,1496,zysec,homerific,00ecgillespie,00ecgillespie,mqh0,suspects,steve_mullins,io21087,

funded,liberated,canonical,throng,0hnz,exxon,xtappcontext,mcdcup,mcdcup,5seg,biscuits

While our nonword pattern to split text works fairly well, we are still left with numbers and tokens that contain numeric characters. In some cases, numbers can be an important part of a corpus. For our purposes, the next step in our pipeline will be to filter out numbers and tokens that are words mixed with numbers.

We can do this by applying another regular expression pattern and using this to filter out tokens that do not match the pattern:

val regex = """[^0-9]*""".r

val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches)

println(filterNumbers.distinct.count)

This further reduces the size of the token set:

84912

Let's take a look at another random sample of the filtered tokens:

println(filterNumbers.distinct.sample(true, 0.3, 42).take(100).mkString(","))

You will see output like the following one:

reunion,wuair,schwabam,eer,silikian,fuller,sloppiness,crying,crying,beckmans,leymarie,fowl,husky,rlhzrlhz,ignore,

loyalists,goofed,arius,isgal,dfuller,neurologists,robin,jxicaijp,majorly,nondiscriminatory,akl,sively,adultery,

urtfi,kielbasa,ao,instantaneous,subscriptions,collins,collins,za_,za_,jmckinney,nonmeasurable,nonmeasurable,

seetex,kjvar,dcbq,randall_clark,theoreticians,theoreticians,congresswoman,sparcstaton,diccon,nonnemacher,

arresed,ets,sganet,internship,bombay,keysym,newsserver,connecters,igpp,aichi,impute,impute,raffle,nixdorf,

nixdorf,amazement,butterfield,geosync,geosync,scoliosis,eng,eng,eng,kjznkh,explorers,antisemites,bombardments,

abba,caramate,tully,mishandles,wgtn,springer,nkm,nkm,alchoholic,chq,shutdown,bruncati,nowadays,mtearle,eastre,

discernible,bacteriophage,paradijs,systematically,rluap,rluap,blown,moderates

We can see that we have removed all the numeric characters. This still leaves us with a few strange words, but we will not worry about these too much here.

Removing stop words

Stop words refer to common words that occur many times across almost all documents in a corpus (and across most corpuses). Examples of typical English stop words include and, but, the, of, and so on. It is a standard practice in text feature extraction to exclude stop words from the extracted tokens.

When using TF-IDF weighting, the weighting scheme actually takes care of this for us. As stop words have a very low IDF score, they will tend to have very low TF-IDF weightings and thus less importance. In some cases, for information retrieval and search tasks, it might be desirable to include stop words. However, it can still be beneficial to exclude stop words during feature extraction, as it reduces the dimensionality of the final feature vectors as well as the size of the training data.

We can take a look at some of the tokens in our corpus that have the highest occurrence across all documents to get an idea about some other stop words to exclude:

val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)

val oreringDesc = Ordering.by[(String, Int), Int](_._2)

println(tokenCounts.top(20)(oreringDesc).mkString("\n"))

In the preceding code, we took the tokens after filtering out numeric characters and generated a count of the occurrence of each token across the corpus. We can now use Spark's top function to retrieve the top 20 tokens by count. Notice that we need to provide the top function with an ordering that tells Spark how to order the elements of our RDD. In this case, we want to order by the count, so we will specify the second element of our key-value pair.

Running the preceding code snippet will result in the following top tokens:

(the,146532)

(to,75064)

(of,69034)

(a,64195)

(ax,62406)

(and,57957)

(i,53036)

(in,49402)

(is,43480)

(that,39264)

(it,33638)

(for,28600)

(you,26682)

(from,22670)

(s,22337)

(edu,21321)

(on,20493)

(this,20121)

(be,19285)

(t,18728)

As we might expect, there are a lot of common words in this list that we could potentially label as stop words. Let's create a set of stop words with some of these as well as other common words. We will then look at the tokens after filtering out these stop words:

val stopwords = Set(

"the","a","an","of","or","in","for","by","on","but", "is", "not", "with", "as", "was", "if",

"they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to"

)

val tokenCountsFilteredStopwords = tokenCounts.filter { case (k, v) => !stopwords.contains(k) }

println(tokenCountsFilteredStopwords.top(20)(oreringDesc).mkString("\n"))

You will see the following output:

(ax,62406)

(i,53036)

(you,26682)

(s,22337)

(edu,21321)

(t,18728)

(m,12756)

(subject,12264)

(com,12133)

(lines,11835)

(can,11355)

(organization,11233)

(re,10534)

(what,9861)

(there,9689)

(x,9332)

(all,9310)

(will,9279)

(we,9227)

(one,9008)

You might notice that there are still quite a few common words in this top list. In practice, we might have a much larger set of stop words. However, we will keep a few (partly to illustrate the impact of common words when using TF-IDF weighting a little later).

One other filtering step that we will use is removing any tokens that are only one character in length. The reasoning behind this is similar to removing stop words—these single-character tokens are unlikely to be informative in our text model and can further reducethe feature dimension and model size. We will do this with another filtering step:

val tokenCountsFilteredSize = tokenCountsFilteredStopwords.filter { case (k, v) => k.size >= 2 }

println(tokenCountsFilteredSize.top(20)(oreringDesc).mkString("\n"))

Again, we will examine the tokens remaining after this filtering step:

(ax,62406)

(you,26682)

(edu,21321)

(subject,12264)

(com,12133)

(lines,11835)

(can,11355)

(organization,11233)

(re,10534)

(what,9861)

(there,9689)

(all,9310)

(will,9279)

(we,9227)

(one,9008)

(would,8905)

(do,8674)

(he,8441)

(about,8336)

(writes,7844)

Apart from some of the common words that we have not excluded, we see that a few potentially more informative words are starting to appear.

Excluding terms based on frequency

It is also a common practice to exclude terms during tokenization when their overall occurrence in the corpus is very low. For example, let's examine the least occurring terms in the corpus (notice the different ordering we use here to return the results sorted in ascending order):

val oreringAsc = Ordering.by[(String, Int), Int](-_._2)

println(tokenCountsFilteredSize.top(20)(oreringAsc).mkString("\n"))

You will get the following results:

(lennips,1)

(bluffing,1)

(preload,1)

(altina,1)

(dan_jacobson,1)

(vno,1)

(actu,1)

(donnalyn,1)

(ydag,1)

(mirosoft,1)

(xiconfiywindow,1)

(harger,1)

(feh,1)

(bankruptcies,1)

(uncompression,1)

(d_nibby,1)

(bunuel,1)

(odf,1)

(swith,1)

(lantastic,1)

As we can see, there are many terms that only occur once in the entire corpus. Since typically we want to use our extracted features for other tasks such as document similarity or machine learning models, tokens that only occur once are not useful to learn from, as we will not have enough training data relative to these tokens. We can apply another filter to exclude these rare tokens:

val rareTokens = tokenCounts.filter{ case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet

val tokenCountsFilteredAll = tokenCountsFilteredSize.filter { case (k, v) => !rareTokens.contains(k) }

println(tokenCountsFilteredAll.top(20)(oreringAsc).mkString("\n"))

We can see that we are left with tokens that occur at least twice in the corpus:

(sina,2)

(akachhy,2)

(mvd,2)

(hizbolah,2)

(wendel_clark,2)

(sarkis,2)

(purposeful,2)

(feagans,2)

(wout,2)

(uneven,2)

(senna,2)

(multimeters,2)

(bushy,2)

(subdivided,2)

(coretest,2)

(oww,2)

(historicity,2)

(mmg,2)

(margitan,2)

(defiance,2)

Now, let's count the number of unique tokens:

println(tokenCountsFilteredAll.count)

You will see the following output:

51801

As we can see, by applying all the filtering steps in our tokenization pipeline, we have reduced the feature dimension from 402,978 to 51,801.

We can now combine all our filtering logic into one function, which we can apply to each document in our RDD:

def tokenize(line: String): Seq[String] = {

line.split("""\W+""")

.map(_.toLowerCase)

.filter(token => regex.pattern.matcher(token).matches)

.filterNot(token => stopwords.contains(token))

.filterNot(token => rareTokens.contains(token))

.filter(token => token.size >= 2)

.toSeq

}

We can check whether this function gives us the same result with the following code snippet:

println(text.flatMap(doc => tokenize(doc)).distinct.count)

This will output 51801, giving us the same unique token count as our step-by-step pipeline.

We can tokenize each document in our RDD as follows:

val tokens = text.map(doc => tokenize(doc))

println(tokens.first.take(20))

You will see output similar to the following, showing the first part of the tokenized version of our first document:

WrappedArray(mathew, mathew, mantis, co, uk, subject, alt, atheism, faq, atheist, resources, summary, books, addresses, music, anything, related, atheism, keywords, faq)

A note about stemming

A common step in text processing and tokenization is stemming. This is the conversion of whole words to a base form (called a word stem). For example, plurals might be converted to singular (dogs becomes dog), and forms such as walking and walker might become walk. Stemming can become quite complex and is typically handled with specialized NLP or search engine software (such as NLTK, OpenNLP, and Lucene, for example). We will ignore stemming for the purpose of our example here.

Note

A full treatment of stemming is beyond the scope of this book. You can find more details at http://en.wikipedia.org/wiki/Stemming.

Training a TF-IDF model

We will now use MLlib to transform each document, in the form of processed tokens, into a vector representation. The first step will be to use the HashingTF implementation, which makes use of feature hashing to map each token in the input text to an index in the vector of term frequencies. Then, we will compute the global IDF and use it to transform the term frequency vectors into TF-IDF vectors.

For each token, the index will thus be the hash of the token (mapped in turn onto the dimension of the feature vector). The value for each token will be the TF-IDF weighting for that token (that is, the term frequency multiplied by the inverse document frequency).

First, we will import the classes we need and create our HashingTF instance, passing in a dim dimension parameter. While the default feature dimension is 220 (or around 1 million), we will choose 218 (or around 260,000), since with about 50,000 tokens, we should not experience a significant number of hash collisions, and a smaller dimension will be more memory and processing friendly for illustrative purposes:

import org.apache.spark.mllib.linalg.{ SparseVector => SV }

import org.apache.spark.mllib.feature.HashingTF

import org.apache.spark.mllib.feature.IDF

val dim = math.pow(2, 18).toInt

val hashingTF = new HashingTF(dim)

val tf = hashingTF.transform(tokens)

tf.cache

Tip

Note that we imported MLlib's SparseVector using an alias of SV. This is because later, we will use Breeze's linalg module, which itself also imports SparseVector. This way, we will avoid namespace collisions.

The transform function of HashingTF maps each input document (that is, a sequence of tokens) to an MLlib Vector. We will also call cache to pin the data in memory to speed up subsequent operations.

Let's inspect the first element of our transformed dataset:

Tip

Note that HashingTF.transform returns an RDD[Vector], so we will cast the result returned to an instance of an MLlib SparseVector.

The transform method can also work on an individual document by taking an Iterable argument (for example, a document as a Seq[String]). This returns a single vector.

val v = tf.first.asInstanceOf[SV]

println(v.size)

println(v.values.size)

println(v.values.take(10).toSeq)

println(v.indices.take(10).toSeq)

You will see the following output displayed:

262144

706

WrappedArray(1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0)

WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

We can see that the dimension of each sparse vector of term frequencies is 262,144 (or 218 as we specified). However, the number on non-zero entries in the vector is only 706. The last two lines of the output show the frequency counts and vector indexes for the first few entries in the vector.

We will now compute the inverse document frequency for each term in the corpus by creating a new IDF instance and calling fit with our RDD of term frequency vectors as the input. We will then transform our term frequency vectors to TF-IDF vectors through thetransform function of IDF:

val idf = new IDF().fit(tf)

val tfidf = idf.transform(tf)

val v2 = tfidf.first.asInstanceOf[SV]

println(v2.values.size)

println(v2.values.take(10).toSeq)

println(v2.indices.take(10).toSeq)

When you examine the first element in the RDD of TF-IDF transformed vectors, you will see output similar to the one shown here:

706

WrappedArray(2.3869085659322193, 4.670445463955571, 6.561295835827856, 4.597686109673142, ...

WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

We can see that the number of non-zero entries hasn't changed (at 706), nor have the vector indices for the terms. What has changed are the values for each term. Earlier, these represented the frequency of each term in the document, but now, the new values represent the frequencies weighted by the IDF.

Analyzing the TF-IDF weightings

Next, let's investigate the TF-IDF weighting for a few terms to illustrate the impact of the commonality or rarity of a term.

First, we can compute the minimum and maximum TF-IDF weights across the entire corpus:

val minMaxVals = tfidf.map { v =>

val sv = v.asInstanceOf[SV]

(sv.values.min, sv.values.max)

}

val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) =>

(math.min(min1, min2), math.max(max1, max2))

}

println(globalMinMax)

As we can see, the minimum TF-IDF is zero, while the maximum is significantly larger:

(0.0,66155.39470409753)

We will now explore the TF-IDF weight attached to various terms. In the previous section on stop words, we filtered out many common terms that occur frequently. Recall that we did not remove all such potential stop words. Instead, we kept a few in the corpus so that we could illustrate the impact of applying the TF-IDF weighting scheme on these terms.

TF-IDF weighting will tend to assign a lower weighting to common terms. To see this, we can compute the TF-IDF representation for a few of the terms that appear in the list of top occurrences that we previously computed, such as you, do, and we:

val common = sc.parallelize(Seq(Seq("you", "do", "we")))

val tfCommon = hashingTF.transform(common)

val tfidfCommon = idf.transform(tfCommon)

val commonVector = tfidfCommon.first.asInstanceOf[SV]

println(commonVector.values.toSeq)

If we form a TF-IDF vector representation of this document, we would see the following values assigned to each term. Note that because of feature hashing, we are not sure exactly which term represents what. However, the values illustrate that the weighting applied to these terms is relatively low:

WrappedArray(0.9965359935704624, 1.3348773448236835, 0.5457486182039175)

Now, let's apply the same transformation to a few less common terms that we might intuitively associate with being more linked to specific topics or concepts:

val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation", "investment")))

val tfUncommon = hashingTF.transform(uncommon)

val tfidfUncommon = idf.transform(tfUncommon)

val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]

println(uncommonVector.values.toSeq)

We can see from the following results that the TF-IDF weightings are indeed significantly higher than for the more common terms:

WrappedArray(5.3265513728351666, 5.308532867332488, 5.483736956357579)

Using a TF-IDF model

While we often refer to training a TF-IDF model, it is actually a feature extraction process or transformation rather than a machine learning model. TF-IDF weighting is often used as a preprocessing step for other models, such as dimensionality reduction, classification, or regression.

To illustrate the potential uses of TF-IDF weighting, we will explore two examples. The first is using the TF-IDF vectors to compute document similarity, while the second involves training a multilabel classification model with the TF-IDF vectors as input features.

Document similarity with the 20 Newsgroups dataset and TF-IDF features

You might recall from Chapter 4, Building a Recommendation Engine with Spark, that the similarity between two vectors can be computed using a distance metric. The closer two vectors are (that is, the lower the distance metric), the more similar they are. One such metric that we used to compute similarity between movies is cosine similarity.

Just like we did for movies, we can also compute the similarity between two documents. Using TF-IDF, we have transformed each document into a vector representation. Hence, we can use the same techniques as we used for movie vectors to compare two documents.

Intuitively, we might expect two documents to be more similar to each other if they share many terms. Conversely, we might expect two documents to be less similar if they each contain many terms that are different from each other. As we compute cosine similarity by computing a dot product of the two vectors and each vector is made up of the terms in each document, we can see that documents with a high overlap of terms will tend to have a higher cosine similarity.

Now, we can see TF-IDF at work. We might reasonably expect that even very different documents might contain many overlapping terms that are relatively common (for example, our stop words). However, due to a low TF-IDF weighting, these terms will not have a significant impact on the dot product and, therefore, will not have much impact on the similarity computed.

For example, we might expect two randomly chosen messages from the hockey newsgroup to be relatively similar to each other. Let's see if this is the case:

val hockeyText = rdd.filter { case (file, text) => file.contains("hockey") }

val hockeyTF = hockeyText.mapValues(doc => hashingTF.transform(tokenize(doc)))

val hockeyTfIdf = idf.transform(hockeyTF.map(_._2))

In the preceding code, we first filtered our raw input RDD to keep only the messages within the hockey topic. We then applied our tokenization and term frequency transformation functions. Note that the transform method used is the version that works on a single document (in the form of a Seq[String]) rather than the version that works on an RDD of documents.

Finally, we applied the IDF transform (note that we use the same IDF that we have already computed on the whole corpus).

Once we have our hockey document vectors, we can select two of these vectors at random and compute the cosine similarity between them (as we did earlier, we will use Breeze for the linear algebra functionality, in particular converting our MLlib vectors to BreezeSparseVector instances first):

import breeze.linalg._

val hockey1 = hockeyTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]

val breeze1 = new SparseVector(hockey1.indices, hockey1.values, hockey1.size)

val hockey2 = hockeyTfIdf.sample(true, 0.1, 43).first.asInstanceOf[SV]

val breeze2 = new SparseVector(hockey2.indices, hockey2.values, hockey2.size)

val cosineSim = breeze1.dot(breeze2) / (norm(breeze1) * norm(breeze2))

println(cosineSim)

We can see that the cosine similarity between the documents is around 0.06:

0.060250114361164626

While this might seem quite low, recall that the effective dimensionality of our features is high due to the large number of unique terms that is typical when dealing with text data. Hence, we can expect that any two documents might have a relatively low overlap of terms even if they are about the same topic, and therefore would have a lower absolute similarity score.

By contrast, we can compare this similarity score to the one computed between one of our hockey documents and another document chosen randomly from the comp.graphics newsgroup, using the same methodology:

val graphicsText = rdd.filter { case (file, text) => file.contains("comp.graphics") }

val graphicsTF = graphicsText.mapValues(doc => hashingTF.transform(tokenize(doc)))

val graphicsTfIdf = idf.transform(graphicsTF.map(_._2))

val graphics = graphicsTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]

val breezeGraphics = new SparseVector(graphics.indices, graphics.values, graphics.size)

val cosineSim2 = breeze1.dot(breezeGraphics) / (norm(breeze1) * norm(breezeGraphics))

println(cosineSim2)

The cosine similarity is significantly lower at 0.0047:

0.004664850323792852

Finally, it is likely that a document from another sports-related topic might be more similar to our hockey document than one from a computer-related topic. However, we would probably expect a baseball document to not be as similar as our hockey document. Let's see whether this is the case by computing the similarity between a random message from the baseball newsgroup and our hockey document:

val baseballText = rdd.filter { case (file, text) => file.contains("baseball") }

val baseballTF = baseballText.mapValues(doc => hashingTF.transform(tokenize(doc)))

val baseballTfIdf = idf.transform(baseballTF.map(_._2))

val baseball = baseballTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]

val breezeBaseball = new SparseVector(baseball.indices, baseball.values, baseball.size)

val cosineSim3 = breeze1.dot(breezeBaseball) / (norm(breeze1) * norm(breezeBaseball))

println(cosineSim3)

Indeed, as we expected, we found that the baseball and hockey documents have a cosine similarity of 0.05, which is significantly higher than the comp.graphics document, but also somewhat lower than the other hockey document:

0.05047395039466008

Training a text classifier on the 20 Newsgroups dataset using TF-IDF

When using TF-IDF vectors, we expected that the cosine similarity measure would capture the similarity between documents, based on the overlap of terms between them. In a similar way, we would expect that a machine learning model, such as a classifier, would be able to learn weightings for individual terms; this would allow it to distinguish between documents from different classes. That is, it should be possible to learn a mapping between the presence (and weighting) of certain terms and a specific topic.

In the 20 Newsgroups example, each newsgroup topic is a class, and we can train a classifier using our TF-IDF transformed vectors as input.

Since we are dealing with a multiclass classification problem, we will use the naïve Bayes model in MLlib, which supports multiple classes. As the first step, we will import the Spark classes that we will be using:

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.classification.NaiveBayes

import org.apache.spark.mllib.evaluation.MulticlassMetrics

Next, we will need to extract the 20 topics and convert them to class mappings. We can do this in exactly the same way as we might for 1-of-K feature encoding, by assigning a numeric index to each class:

val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap

val zipped = newsgroups.zip(tfidf)

val train = zipped.map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }

train.cache

In the preceding code snippet, we took the newsgroups RDD, where each element is the topic, and used the zip function to combine it with each element in our tfidf RDD of TF-IDF vectors. We then mapped over each key-value element in our new zipped RDD and created a LabeledPoint instance, where label is the class index and features is the TF-IDF vector.

Tip

Note that the zip operator assumes that each RDD has the same number of partitions as well as the same number of elements in each partition. It will fail if this is not the case. We can make this assumption here because we have effectively created both ourtfidf RDD and newsgroups RDD from a series of map transformations on the same original RDD that preserved the partitioning structure.

Now that we have an input RDD in the correct form, we can simply pass it to the naïve Bayes train function:

val model = NaiveBayes.train(train, lambda = 0.1)

Let's evaluate the performance of the model on the test dataset. We will load the raw test data from the 20news-bydate-test directory, again using wholeTextFiles to read each message into an RDD element. We will then extract the class labels from the file paths in the same way as we did for the newsgroups RDD:

val testPath = "/PATH/20news-bydate-test/*"

val testRDD = sc.wholeTextFiles(testPath)

val testLabels = testRDD.map { case (file, text) =>

val topic = file.split("/").takeRight(2).head

newsgroupsMap(topic)

}

Transforming the text in the test dataset follows the same procedure as for the training data—we will apply our tokenize function followed by the term frequency transformation, and we will again use the same IDF computed from the training data to transform the TF vectors into TF-IDF vectors. Finally, we will zip the test class labels with the TF-IDF vectors and create our test RDD[LabeledPoint]:

val testTf = testRDD.map { case (file, text) => hashingTF.transform(tokenize(text)) }

val testTfIdf = idf.transform(testTf)

val zippedTest = testLabels.zip(testTfIdf)

val test = zippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }

Tip

Note that it is important that we use the training set IDF to transform the test data, as this creates a more realistic estimation of model performance on new data, which might potentially contain terms that the model has not yet been trained on. It would be "cheating" to recompute the IDF vector based on the test dataset and, more importantly, would potentially lead to incorrect estimates of optimal model parameters selected through cross-validation.

Now, we're ready to compute the predictions and true class labels for our model. We will use this RDD to compute accuracy and the multiclass weighted F-measure for our model:

val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))

val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()

val metrics = new MulticlassMetrics(predictionAndLabel)

println(accuracy)

println(metrics.weightedFMeasure)

Tip

The weighted F-measure is an overall measure of precision and recall performance (where, like area under an ROC curve, values closer to 1.0 indicate better performance), which is then combined through a weighted averaged across the classes.

We can see that our simple multiclass naïve Bayes model has achieved close to 80 percent for both accuracy and F-measure:

0.7915560276155071

0.7810675969031116

Evaluating the impact of text processing

Text processing and TF-IDF weighting are examples of feature extraction techniques designed to both reduce the dimensionality of and extract some structure from raw text data. We can see the impact of applying these processing techniques by comparing the performance of a model trained on raw text data with one trained on processed and TF-IDF weighted text data.

Comparing raw features with processed TF-IDF features on the 20 Newsgroups dataset

In this example, we will simply apply the hashing term frequency transformation to the raw text tokens obtained using a simple whitespace splitting of the document text. We will train a model on this data and evaluate the performance on the test set as we did for the model trained with TF-IDF features:

val rawTokens = rdd.map { case (file, text) => text.split(" ") }

val rawTF = texrawTokenst.map(doc => hashingTF.transform(doc))

val rawTrain = newsgroups.zip(rawTF).map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }

val rawModel = NaiveBayes.train(rawTrain, lambda = 0.1)

val rawTestTF = testRDD.map { case (file, text) => hashingTF.transform(text.split(" ")) }

val rawZippedTest = testLabels.zip(rawTestTF)

val rawTest = rawZippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }

val rawPredictionAndLabel = rawTest.map(p => (rawModel.predict(p.features), p.label))

val rawAccuracy = 1.0 * rawPredictionAndLabel.filter(x => x._1 == x._2).count() / rawTest.count()

println(rawAccuracy)

val rawMetrics = new MulticlassMetrics(rawPredictionAndLabel)

println(rawMetrics.weightedFMeasure)

Perhaps surprisingly, the raw model does quite well, although both accuracy and F-measure are a few percentage points lower than those of the TF-IDF model. This is also partly a reflection of the fact that the naïve Bayes model is well suited to data in the form of raw frequency counts:

0.7661975570897503

0.7628947184990661

Word2Vec models

Until now, we have used a bag-of-words vector, optionally with some weighting scheme such as TF-IDF to represent the text in a document. Another recent class of models that has become popular is related to representing individual words as vectors.

These are generally based in some way on the co-occurrence statistics between the words in a corpus. Once the vector representation is computed, we can use these vectors in ways similar to how we might use TF-IDF vectors (such as using them as features for other machine learning models). One such common use case is computing the similarity between two words with respect to their meanings, based on their vector representations.

Word2Vec refers to a specific implementation of one of these models, often referred to as distributed vector representations. The MLlib model uses a skip-gram model, which seeks to learn vector representations that take into account the contexts in which words occur.

Note

While a detailed treatment of Word2Vec is beyond the scope of this book, Spark's documentation at http://spark.apache.org/docs/latest/mllib-feature-extraction.html#word2vec contains some further details on the algorithm as well as links to the reference implementation.

One of the main academic papers underlying Word2Vec is Tomas Mikolov, Kai Chen, Greg Corrado, and Jeffrey Dean. Efficient Estimation of Word Representations in Vector Space. In Proceedings of Workshop at ICLR, 2013.

It is available at http://arxiv.org/pdf/1301.3781.pdf.

Another recent model in the area of word vector representations is GloVe at http://www-nlp.stanford.edu/projects/glove/.

Word2Vec on the 20 Newsgroups dataset

Training a Word2Vec model in Spark is relatively simple. We will pass in an RDD where each element is a sequence of terms. We can use the RDD of tokenized documents we have already created as input to the model:

import org.apache.spark.mllib.feature.Word2Vec

val word2vec = new Word2Vec()

word2vec.setSeed(42)

val word2vecModel = word2vec.fit(tokens)

Tip

Note that we used setSeed to set the random seed for model training so that you can see the same results each time the model is trained.

You will see some output similar to the following while the model is being trained:

...

14/10/25 14:21:59 INFO Word2Vec: wordCount = 2133172, alpha = 0.0011868763094487506

14/10/25 14:21:59 INFO Word2Vec: wordCount = 2144172, alpha = 0.0010640806039941193

14/10/25 14:21:59 INFO Word2Vec: wordCount = 2155172, alpha = 9.412848985394907E-4

14/10/25 14:21:59 INFO Word2Vec: wordCount = 2166172, alpha = 8.184891930848592E-4

14/10/25 14:22:00 INFO Word2Vec: wordCount = 2177172, alpha = 6.956934876302307E-4

14/10/25 14:22:00 INFO Word2Vec: wordCount = 2188172, alpha = 5.728977821755993E-4

14/10/25 14:22:00 INFO Word2Vec: wordCount = 2199172, alpha = 4.501020767209707E-4

14/10/25 14:22:00 INFO Word2Vec: wordCount = 2210172, alpha = 3.2730637126634213E-4

14/10/25 14:22:01 INFO Word2Vec: wordCount = 2221172, alpha = 2.0451066581171076E-4

14/10/25 14:22:01 INFO Word2Vec: wordCount = 2232172, alpha = 8.171496035708214E-5

...

14/10/25 14:22:02 INFO SparkContext: Job finished: collect at Word2Vec.scala:368, took 56.585983 s

14/10/25 14:22:02 INFO MappedRDD: Removing RDD 200 from persistence list

14/10/25 14:22:02 INFO BlockManager: Removing RDD 200

14/10/25 14:22:02 INFO BlockManager: Removing block rdd_200_0

14/10/25 14:22:02 INFO MemoryStore: Block rdd_200_0 of size 9008840 dropped from memory (free 1755596828)

word2vecModel: org.apache.spark.mllib.feature.Word2VecModel = org.apache.spark.mllib.feature.Word2VecModel@2b94e480

Once trained, we can easily find the top 20 synonyms for a given term (that is, the most similar term to the input term, computed by cosine similarity between the word vectors). For example, to find the 20 most similar terms to hockey, use the following lines of code:

word2vecModel.findSynonyms("hockey", 20).foreach(println)

As we can see from the following output, most of the terms relate to hockey or other sports topics:

(sport,0.6828256249427795)

(ecac,0.6718048453330994)

(hispanic,0.6519884467124939)

(glens,0.6447514891624451)

(woofers,0.6351765394210815)

(boxscores,0.6009076237678528)

(tournament,0.6006366014480591)

(champs,0.5957855582237244)

(aargh,0.584071934223175)

(playoff,0.5834275484085083)

(ahl,0.5784651637077332)

(ncaa,0.5680188536643982)

(pool,0.5612311959266663)

(olympic,0.5552600026130676)

(champion,0.5549421310424805)

(filinuk,0.5528956651687622)

(yankees,0.5502706170082092)

(motorcycles,0.5484763979911804)

(calder,0.5481109023094177)

(rec,0.5432182550430298)

As another example, we can find 20 synonyms for the term legislation as follows:

word2vecModel.findSynonyms("legislation", 20).foreach(println)

In this case, we observe the terms related to regulation, politics, and business feature prominently:

(accommodates,0.8149217963218689)

(briefed,0.7582570314407349)

(amended,0.7310371994972229)

(telephony,0.7139414548873901)

(aclu,0.7080780863761902)

(pitted,0.7062571048736572)

(licensee,0.6981208324432373)

(agency,0.6880651712417603)

(policies,0.6828961372375488)

(senate,0.6821110844612122)

(businesses,0.6814320087432861)

(permit,0.6797110438346863)

(cpsr,0.6764014959335327)

(cooperation,0.6733141541481018)

(surveillance,0.6670728325843811)

(restricted,0.6666574478149414)

(congress,0.6661365628242493)

(procure,0.6655452251434326)

(industry,0.6650314927101135)

(inquiry,0.6644254922866821)

Summary

In this chapter, we took a deeper look into more complex text processing and explored MLlib's text feature extraction capabilities, in particular the TF-IDF term weighting schemes. We covered examples of using the resulting TF-IDF feature vectors to compute document similarity and train a newsgroup topic classification model. Finally, you learned how to use MLlib's cutting-edge Word2Vec model to compute a vector representation of words in a corpus of text and use the trained model to find words with contextual meaning that is similar to a given word.

In the next chapter, we will take a look at online learning, and you will learn how Spark Streaming relates to online learning models.