Список всех файлов в Hadoop:
! hdfs dfs -du -h
0 .Trash 5.1 K 201408_station_data.csv 622.8 M 201408_status_data.csv 19.7 M 201408_trip_data.csv 78.4 K 201408_weather_data.csv 160.0 M NASA_access_log_Aug95 2.2 K README.txt 5.1 K test.csv
Конвертируем список в Python лист:
import sh
hdfsDir = "/user/sergey/"
fileList = [ 'hdfs://'+line.split(" ")[-1] for line in sh.hdfs('dfs','-ls',hdfsDir).split('\n') if len(line.split(" ")[-1])][2:]
fileList
['hdfs:///user/sergey/201408_station_data.csv', 'hdfs:///user/sergey/201408_status_data.csv', 'hdfs:///user/sergey/201408_trip_data.csv', 'hdfs:///user/sergey/201408_weather_data.csv', 'hdfs:///user/sergey/NASA_access_log_Aug95', 'hdfs:///user/sergey/README.txt', 'hdfs:///user/sergey/test.csv']
Т.к. данный Jupyter notebook
запущен с Spark kernel
, sc (Spark context)
для нас уже подготовлен:
sc.version
'2.0.0'
Подготовим regex
выражение для поиска:
import re
string = "8/5/2013"
regex = re.compile(string)
Загрузим все файлы из интересующей нас директории в Spark
последовательно друг за другом и поищем в каждом их них интересующее выражение string
:
lst = []
for file in fileList:
rdd = sc.textFile(file)
rddFound = rdd.filter(lambda x: bool(re.search(regex, x)))
lst.append((file, rddFound.count()))
lst
[('hdfs:///user/sergey/201408_station_data.csv', 5), ('hdfs:///user/sergey/201408_status_data.csv', 0), ('hdfs:///user/sergey/201408_trip_data.csv', 0), ('hdfs:///user/sergey/201408_weather_data.csv', 0), ('hdfs:///user/sergey/NASA_access_log_Aug95', 0), ('hdfs:///user/sergey/README.txt', 0), ('hdfs:///user/sergey/test.csv', 5)]
Write a comment: