在Python中进行批处理文件操作通常涉及以下几个步骤:
导入必要的模块
```python
import os
import glob
from pathlib import Path
设置工作目录
```python
path = os.getcwd() 获取当前工作目录
print(path)
更改工作目录 (如果需要):
```python
os.chdir('/ZC/Download/data') 将工作路径改为指定路径
遍历文件夹和文件```python
for parent, dirnames, filenames in os.walk(path):
for filename in filenames:
print(filename)
处理文件
删除特定文件或文件夹:
```python
for dirpath, dirnames, filenames in os.walk(path):
if 'my_result' in dirpath:
shutil.rmtree(dirpath)
将文件移动到子文件夹:
```python
for dirpath, dirnames, filenames in os.walk(path):
for file in filenames:
total_path = os.path.join(dirpath, file)
root_path, file_path = total_path.split(path, 1)
if 'png' in file_path:
new_file_path = './' + file_path[:-9] + '/new_file_name/'
print(new_file_path)
使用`pathlib`进行文件操作
```python
p = Path('/Users/liuhuanshuo/Desktop/热搜数据/')
file_list = list(p.glob('/*.md')) 获取所有md文件
日志配置(可选):
```python
import logging
current_file = os.path.abspath(__file__)
log_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(log_format))
logging.getLogger().addHandler(file_handler)
异步批处理(可选):
```python
import asyncio
DEFAULT_MAX_CONCURRENT_TASKS = 2
MAX_RETRIES = 3
class AsyncBatchProcessor:
def __init__(self, max_concurrent_tasks=DEFAULT_MAX_CONCURRENT_TASKS, max_retries=MAX_RETRIES):
self.max_concurrent_tasks = max_concurrent_tasks
self.max_retries = max_retries
async def process_file(self, file_path):
异步处理文件的逻辑
pass
async def run(self, file_paths):
semaphore = asyncio.Semaphore(self.max_concurrent_tasks)
tasks = [self.process_file(file_path) for file_path in file_paths]
await asyncio.gather(*tasks)
执行批处理任务```python
from batch import AsyncBatchProcessor 假设你有一个AsyncBatchProcessor类
processor = AsyncBatchProcessor()
file_paths = glob.glob('/ZC/Download/data//*.txt') 获取所有.txt文件
processor.run(file_paths)
以上步骤展示了如何在Python中进行基本的批处理操作,包括遍历文件夹、处理文件、配置日志以及异步处理。你可以根据具体需求调整这些步骤。