Файловая система HDFS (Hadoop Distributed File System) существует независимо от файловой системы хоста, работающего как правило, на базе *nix ОС. Тем не менее, набор команд для управления HDFS очень похож на стандартный набор командной строки *nix OS, регламентируемый POSIX стандартом ( ls, mkdir, rm, cp, du, df ) и может быть сгруппирован следующим образом:

  • Просмотр файловой структуры:
    • hadoop fs -ls , hadoop fs -du, hadoop fs -df
  • Копирование файлов:
    • hadoop fs -cp , hadoop fs -get, hadoop fs -put и их аналоги hadoop fs -copyFromLocal , hadoop fs -copyToLocal
  • Создание директорий:
    • hadoop fs -mkdir
  • Удаление файлов и директорий:
    • hadoop fs -rm, hadoop fs -rm -R, hadoop fs -rmdir
  • Просмотр содержимого файла:
    • hadoop fs -cat , hadoop fs -tail
  • Вспомогательные команды:
    • hadoop fs -expunge — команда для очистки Корзины Hadoop хранилища
    • hdfs fsck — команды для проверки целостности файловой системы
    • hadoop fs -count — количество файлов, директорий и объем хранимой информации
  • Помощь по командам:
    • hadoop fs -help

Загрузка данных в Hadoop

Запустим Hadoop кластер

! start-dfs.sh
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/sergey/hadoop/logs/hadoop-sergey-namenode-sergey-U50Vg.out
localhost: starting datanode, logging to /home/sergey/hadoop/logs/hadoop-sergey-datanode-sergey-U50Vg.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /home/sergey/hadoop/logs/hadoop-sergey-secondarynamenode-sergey-U50Vg.out
Какие диски монтированы в системе HDFS:
! hadoop fs -du -h /
897.5 K  /tmp
0        /user
Запишем данные из локальной файловой системы в Hadoop хранилище:
! hadoop fs -put ./data/201408_status_data.csv /user/sergey/
Проверим содержимое HDFS директории /user/sergey/:
! hadoop fs -ls -h /user/sergey/
Found 2 items
drwxr-xr-x   - sergey supergroup          0 2016-08-30 14:23 /user/sergey/.Trash
-rw-r--r--   1 sergey supergroup    622.8 M 2016-09-02 19:48 /user/sergey/201408_status_data.csv

Чтение данных из HDFS в Spark

При загрузке данного Jupyter notebook c ядром pySpark (Spark 2.0) spark context создается автоматически:
sc.version
'2.0.0'
Для чтения данных .csv формата создадим HiveContext:
from pyspark import HiveContext
sqlContext = HiveContext(sc)
и создадим схему:
from pyspark.sql.types import *
! hadoop fs -cat /user/sergey/201408_status_data.csv | head -n5
"station_id","bikes_available","docks_available","time"
"2","12","15","2014-03-01 00:00:02"
"2","12","15","2014-03-01 00:01:03"
"2","12","15","2014-03-01 00:02:03"
"2","12","15","2014-03-01 00:03:02"
cat: Unable to write to output stream.
customSchema = StructType([
        StructField('station_id', IntegerType(), True),
        StructField('bikes_available', IntegerType(), True),
        StructField('docks_available', IntegerType(), True),
        StructField('AT',TimestampType(), True),
    ])
Прочитаем данные:
df = sqlContext.read.csv("hdfs:/user/sergey/201408_status_data.csv",
                         header=True,
                         schema=customSchema)
df.printSchema()
root
 |-- station_id: integer (nullable = true)
 |-- bikes_available: integer (nullable = true)
 |-- docks_available: integer (nullable = true)
 |-- AT: timestamp (nullable = true)

Просмотрим первые 5 рядов данных, загруженных в DataFrame:
df.show(5, truncate = False)
+----------+---------------+---------------+---------------------+
|station_id|bikes_available|docks_available|AT                   |
+----------+---------------+---------------+---------------------+
|2         |12             |15             |2014-03-01 00:00:02.0|
|2         |12             |15             |2014-03-01 00:01:03.0|
|2         |12             |15             |2014-03-01 00:02:03.0|
|2         |12             |15             |2014-03-01 00:03:02.0|
|2         |12             |15             |2014-03-01 00:04:03.0|
+----------+---------------+---------------+---------------------+
only showing top 5 rows

Количество строк:
df.count()
18342211
! wc -l ./data/201408_status_data.csv
18342211 ./data/201408_status_data.csv
Все ли поля прочитаны верно?
badDF = df.filter( df['station_id'].isNull() |
                   df['bikes_available'].isNull() |
                   df['docks_available'].isNull()
                   
                 )
Write a comment:

*

Your email address will not be published.

© 2014 In R we trust.
Top
Follow us: