我需要锁定一个文件写在Python。它将被多个Python进程同时访问。我在网上找到了一些解决方案,但大多数都无法达到我的目的,因为它们通常只是基于Unix或Windows的。
当前回答
如果你只需要Mac/POSIX,这应该在没有外部包的情况下工作。
import sys
import stat
import os
filePath = "<PATH TO FILE>"
if sys.platform == 'darwin':
flags = os.stat(filePath).st_flags
if flags & ~stat.UF_IMMUTABLE:
os.chflags(filePath, flags & stat.UF_IMMUTABLE)
如果你想解锁一个文件,只要修改一下,
if flags & stat.UF_IMMUTABLE:
os.chflags(filePath, flags & ~stat.UF_IMMUTABLE)
其他回答
这里有一个跨平台的文件锁定模块:Portalocker
尽管正如Kevin所说,从多个进程同时写入文件是您希望尽可能避免的事情。
如果可以将问题硬塞到数据库中,则可以使用SQLite。它支持并发访问并处理自己的锁定。
在操作系统级别协调对单个文件的访问充满了您可能不想解决的各种问题。
最好的办法是有一个单独的进程来协调对该文件的读写访问。
我曾经处理过这样的情况,我在同一个目录/文件夹中运行同一个程序的多个副本,并记录了错误。我的方法是在打开日志文件之前向磁盘写入一个“锁定文件”。程序在继续之前检查是否存在“锁文件”,如果“锁文件”存在,则等待轮到它。
代码如下:
def errlogger(error):
while True:
if not exists('errloglock'):
lock = open('errloglock', 'w')
if exists('errorlog'): log = open('errorlog', 'a')
else: log = open('errorlog', 'w')
log.write(str(datetime.utcnow())[0:-7] + ' ' + error + '\n')
log.close()
remove('errloglock')
return
else:
check = stat('errloglock')
if time() - check.st_ctime > 0.01: remove('errloglock')
print('waiting my turn')
编辑—— 在考虑了上面关于过期锁的一些评论之后,我编辑了代码,添加了一个检查“锁文件”是否过期的检查。在我的系统上计算这个函数的几千次迭代,得到的平均值为0.002066…几秒钟前:
lock = open('errloglock', 'w')
到之后:
remove('errloglock')
所以我想我将从5倍的量开始,以表示过时并监控问题的情况。
此外,当我在处理计时时,我意识到我有一些并不真正必要的代码:
lock.close()
这是我在公开声明之后立即得到的,所以我在这次编辑中删除了它。
我一直在寻找几种解决方案,我的选择是 oslo.concurrency
它功能强大,文档也相对完善。它是基于紧固件的。
其他的解决方案:
Portalocker:需要pywin32,这是一个exe安装,所以不可能通过pip 紧固件:缺乏记录 lockfile:弃用 flufl。lock:用于POSIX系统的nfs安全文件锁定。 simpleflock:上次更新2013-07 佐。lockfile:上一次更新2016-06(截至2017-03) lock_file: 2007-10年最后一次更新
这招对我很管用: 不占用大文件,分布在几个小文件中 创建文件Temp,删除文件A,然后将文件Temp重命名为A。
import os
import json
def Server():
i = 0
while i == 0:
try:
with open(File_Temp, "w") as file:
json.dump(DATA, file, indent=2)
if os.path.exists(File_A):
os.remove(File_A)
os.rename(File_Temp, File_A)
i = 1
except OSError as e:
print ("file locked: " ,str(e))
time.sleep(1)
def Clients():
i = 0
while i == 0:
try:
if os.path.exists(File_A):
with open(File_A,"r") as file:
DATA_Temp = file.read()
DATA = json.loads(DATA_Temp)
i = 1
except OSError as e:
print (str(e))
time.sleep(1)
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录