目录
这里使用的是 pip 安装,很方便:
pip install hdfs
from hdfs.client import Client client = Client("http://LocalHost:Port") client.makedirs('/ml/zmingmingmng')#建立文件夹 client.delete('/ml/zmming')#删除文件夹 client.upload("/ml/zmingmingmng/zm.txt","E:/ttt/testhdfs.txt")#上传文件 client.download("/ml/zmingmingmng/zm.txt","E:/ming.txt")#下载文件
# -*- encoding=utf-8 -*- from hdfs.client import Client client = Client("http://XXX.XXX.XX.XX:50070") # 创建目录 def mkdirs(client, hdfs_path): client.makedirs(hdfs_path) # 删除hdfs文件 def delete_hdfs_file(client, hdfs_path): client.delete(hdfs_path) # 上传文件到hdfs def put_to_hdfs(client, local_path, hdfs_path): client.upload(hdfs_path, local_path, cleanup=True) # 从hdfs获取文件到本地 def get_from_hdfs(client, hdfs_path, local_path): client.download(hdfs_path, local_path, overwrite=False) # 追加数据到hdfs文件 def append_to_hdfs(client, hdfs_path, data): client.write(hdfs_path, data, overwrite=False, append=True) # 覆盖数据写到hdfs文件 def write_to_hdfs(client, hdfs_path, data): client.write(hdfs_path, data, overwrite=True, append=False) # 移动或者修改文件 def move_or_rename(client, hdfs_src_path, hdfs_dst_path): client.rename(hdfs_src_path, hdfs_dst_path) # 返回目录下的文件 def list(client, hdfs_path): return client.list(hdfs_path, status=False) if __name__ == '__main__': # 调用 kk=list(client,"/user/admin/deploy/user_lable_dimension/") for each in kk: print(each)
1 往hdfs上传文件
from hdfs.client import Client """往hdfs上传文件""" # TODO 往hdfs上传文件 client = Client("http://XXX.XXX.XX.XX:50070") # 新建文件夹 hdfs_path ="【文件要存放的目录路径,eg:/a/b/c】" client.makedirs(hdfs_path) print("uploading data...") client.upload(hdfs_path, "intersection.xlsx", overwrite=True) # 资源中心上传的文件
2 处理并存储到hdfs
# TODO 先得到结果列表。eg:i_list # TODO 把结果列表存储成文件上传到hdfs print("===============================================") i_df = pd.DataFrame(i_list) client = Client("http://XXX.XXX.XX.XX:50070") fout = "【文件要存放的路径,eg:/a/b/c.csv】" # hdfs下的目录 with client.write(fout, encoding='utf-8') as writer: i_df.to_csv(writer) print("存储成功")
3 读取hdfs上的txt文件
from hdfs.client import Client import json from kafka import KafkaConsumer import time import pyhdfs def GetEncodingSheme(_filename): """ 查看文本编码方式 """ with open(_filename, 'rb') as file: buf = file.read() result = chardet.detect(buf) return result['encoding'] def read_hdfs_file(client, filename): """读取hdfs文件内容,将每行存入数组返回""" lines = [] print("开始读取txt数据") with client.open(filename, delimiter='\n') as reader: for line in reader: lines.append(line.decode("GB2312").strip()) return lines def deleteHDFSfile(client, hdfs_path): """删除hdfs文件,删除文件夹时该文件夹必须为空""" client.delete(hdfs_path) if __name__ == "__main__": print(GetEncodingSheme('intersection.xlsx')) # GB2312 # hdfs连接 client = pyhdfs.HdfsClient(hosts="http://xxxxxx:50070,http://xxxxxx:50070", user_name="xxxxxx") # TODO 读取hdfs文件内容,将每行存入数组返回 hdfs_path = "【文件路径,eg:/a/b/c.xlsx】" # hdfs存储目录 print("===============================================") print("开始读取hdfs上的txt文件") lines = read_hdfs_file(client, hdfs_path) print(lines) print("读取完成") print("===============================================") # TODO 删除hdfs存储目录下的文件 hdfs_path = "【文件路径】" deleteHDFSfile(client, hdfs_path)
热门文章
- Python- *args 的用法
- vue的axios两种导入方法
- 12月12日→18.9M/S|2024年每天更新免费节点Mihomo Node订阅链接地址
- 佛山产瓷砖的十大品牌分别是哪些(佛山瓷砖品牌一览表)
- 1月21日→21.9M/S|2025年每天更新免费节点Mihomo Node订阅链接地址
- R 数据可视化: PCA 主成分分析图
- 如果被野猫抓伤了只是流了一点点血会怎样?(被野猫抓出一点点血)
- 1月15日→19.3M/S|2025年每天更新免费节点Mihomo Node订阅链接地址
- 宠物粮食加工厂设备生产厂家有哪些呢视频(宠物粮食加工厂设备生产厂家有哪些呢视频介绍)
- 详解Dockerfile中的copy命令