pwd()
'/home/sergey/myvagrant'

Spark позволяет обрабатывать большие объемы рапределенных данных, которые хранятся в RAM на Hadoop фермах.

В данном примере я покажу, каким образом можно применить данную технологию для подсчета частоты употребления слов в англоязычной версии «Война и Мир» Толстого. Логическим продолжением этого упражнения может быть решение таких практических задач как:

  • recommender systems (collaborative filtering or clustering)
  • spam/ham filtering
  • анализ настроений (новости, Твиттер)

Общая логика решения задачи:

  • Шаг 1. Инициализация RDD и загрузка данных в Hadoop
  • Шаг 2. Токенизация
  • Шаг 3. Подсчет частоты

Шаг 1. Инициализация RDD.

В качестве интерфейса к Spark будем использовать библиотеку Python pySpark

from pyspark import SparkContext
sc = SparkContext()

Затем, создадим RDD c 2-мя workers и загрузим туда текстовый файл:

pwRDD = sc.textFile('peace_and_war.txt',2)
pwRDD.take(5)
['WAR AND PEACE', '', 'By Leo Tolstoy/Tolstoi', '', 'CONTENTS']

Шаг 2. Токенизация.

Как мы видим каждая строка файла представлена массивом текстовых переменных. Для того, чтобы слова можно было посчитать, тестовые переменные необходимо:

  • привести к нижнему регистру
  • избавиться от пунктуации
  • разбить текстовые переменные на отдельные слова

Для этого определим функцию tokenize():

import re
def tokenize(line):
    line = line.lower()
    line = re.sub('[^a-z0-9\s]', ' ', line)
    tokens = line.split()
    return tokens

Токенизируем pwRDD при помощи полученной функции и map(lambda x: (x,1)), которая превратит отдельные слова в tuples вида (слово, 1)

pwWordsRDD = pwRDD.flatMap(lambda x: tokenize(x)).map(lambda x: (x,1))
pwWordsRDD.take(5)
[('war', 1), ('and', 1), ('peace', 1), ('by', 1), ('leo', 1)]

Шаг 3. Вычисление частоты

Сначала рассчитаем общее число употреблений каждого слова. reduceByKey() является предпочтительным способом, т.к. меньше нагружает вычислительную сеть:

pairsCountedRDD = pwWordsRDD.reduceByKey(lambda a,b: a+b)
pairsCountedRDD.take(5)
[('reinstated', 2),
 ('reminiscence', 1),
 ('repelling', 1),
 ('incitement', 1),
 ('circle', 53)]

Затем, вычислим частоту:

total = pwWordsRDD.count()
pairsFrequencyRDD = pairsCountedRDD.map(lambda x: (x[0],x[1]/total))
pairsFrequencyRDD.take(5)
[('reinstated', 3.4878570257648e-06),
 ('reminiscence', 1.7439285128824e-06),
 ('repelling', 1.7439285128824e-06),
 ('incitement', 1.7439285128824e-06),
 ('circle', 9.24282111827672e-05)]

И покажем наиболее употребимые слова:

pairsFrequencyRDD.top(10, key=lambda x: x[1])
[('the', 0.06024401047752251),
 ('and', 0.038762299055837106),
 ('to', 0.029080007952314017),
 ('of', 0.025965351628306054),
 ('a', 0.018398445810909318),
 ('he', 0.017441029057336882),
 ('in', 0.01565873411717107),
 ('that', 0.014282774520506856),
 ('his', 0.01392352524685308),
 ('was', 0.012833569926301582)]

В качестве альтернативного варианта, который может зависеть от характера решаемой задачи, можно определить stoplist со словами, частоту которых мы не хотели бы считать, и исключить эти слова на этапе токенизации.

Write a comment:

*

Your email address will not be published.

© 2014 In R we trust.
Top
Follow us: