Python parallel thread that consume Watchdog queue events(消耗看门狗队列事件的 Python 并行线程)
问题描述
每当外部程序 (TCPdump) 在我的目录中创建 *.pcap 文件时,我有这段代码应该将事件放入队列中.我的问题是我总是得到一个空队列,尽管我从 process() 函数得到了打印.
I have this code that should put an event in a queue each time an external program (TCPdump) creates a *.pcap file in my directory. My problem is that I always get an empty queue, although I got the print from process() function.
我做错了什么?队列是否正确定义并在两个类之间共享?
What am I doing wrong? Is the queue correctly defined and shared between the two classes?
编辑-----------------
我可能明白为什么我有一个空队列,我认为这是因为我正在打印我在 Handler 类填充之前初始化的队列.我修改了我的代码并创建了两个应该使用同一个队列的进程,但现在执行卡在 queue.put() 并且线程 ReadPcapFiles() 停止运行.
EDIT-----------------
I maybe understood why I got an empty queue, I think it is because I'm printing the queue that I initialized before it gets filled by Handler class.
I modified my code and created two processes that should consume the same queue, but now the execution stuck on queue.put() and the thread ReadPcapFiles() stop running.
这里是更新的代码:
import time
import pyshark
import concurrent.futures
import threading
import logging
from queue import Queue
from multiprocessing import Process
from watchdog.observers import Observer, api
from watchdog.events import PatternMatchingEventHandler
class Handler(PatternMatchingEventHandler):
patterns = ["*.pcap", "*.pcapng"]
def __init__(self, queue):
PatternMatchingEventHandler.__init__(self)
self.queue = queue
def process(self, event):
#print(f'event type: {event.event_type} path : {event.src_path}')
self.queue.put(event.src_path)
logging.info(f"Storing message: {self.queue.qsize()}")
print("Producer queue: ", list(self.queue.queue))
#self.queue.get()
def on_created(self, event):
self.process(event)
def StartWatcher(watchdogq, event):
path = 'C:\...'
handler = Handler(watchdogq)
observer = Observer()
while not event.is_set():
observer.schedule(handler, path, recursive=False)
print("About to start observer")
observer.start()
try:
while True:
time.sleep(1)
except Exception as error:
observer.stop()
print("Error: " + str(error))
observer.join()
def ReadPcapFiles(consumerq, event):
while not event.is_set() or not consumerq.empty():
print("Consumer queue: ", consumerq.get())
#print("Consumer queue: ", list(consumerq.queue))
# pcapfile = pyshark.FileCapture(self.queue.get())
# for packet in pcapfile:
# countPacket +=1
if __name__ == '__main__':
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
logging.getLogger().setLevel(logging.DEBUG)
queue = Queue()
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(StartWatcher,queue, event)
executor.submit(ReadPcapFiles,queue, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
旧代码:
import time
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
class Handler(PatternMatchingEventHandler):
patterns = ["*.pcap", "*.pcapng"]
def __init__(self, queue):
PatternMatchingEventHandler.__init__(self)
self.queue = queue
def process(self, event):
print(f'event type: {event.event_type} path : {event.src_path}')
self.queue.put(event.src_path)
def on_created(self, event):
self.process(event)
class Watcher():
def __init__(self, path):
self.queue = Queue()
self.observer = Observer()
self.handler = Handler(self.queue)
self.path = path
def start(self):
self.observer.schedule(self.handler, self.path, recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
self.queue.get()
print(list(self.queue.queue))
except Exception as error:
self.observer.stop()
print("Error: " + str(error))
self.observer.join()
if __name__ == '__main__':
watcher = Watcher('C:\...')
watcher.start()
推荐答案
这对我有用(我从 this回答,谢谢!)但请注意,我认为这是一种解决方法,所以如果有人对此有更好的解决方案或者可以更好地解释 Python 中这种行为的原因,请不要犹豫回答!
This is working for me (I got the main idea from this answer, thanks!) but notice that I consider this a workaround, so if someone has a better solution to this or can better explain the reason of such behavior in Python, please do not hesitate to answer!
我的猜测是我有两个主要问题:
- 我正在另一个线程中启动看门狗进程(这以某种方式阻塞了我的队列消耗线程).
- Python 线程确实不能真正并行工作,因此需要启动一个独立的进程.
My guess is that I had two main problems:
- I was starting Watchdog process inside another thread (and that was blocking somehow my queue consuming thread).
- Python threading does not work really in parallel and therefore starting an independent process was necessary.
这是我的代码:
import time
import pyshark
import threading
import logging
import os
from queue import Queue
from multiprocessing import Process, Pool
from watchdog.observers import Observer, api
from watchdog.events import PatternMatchingEventHandler
from concurrent.futures import ThreadPoolExecutor
class Handler(PatternMatchingEventHandler):
patterns = ["*.pcap", "*.pcapng"]
def __init__(self, queue):
PatternMatchingEventHandler.__init__(self)
self.queue = queue
def process(self, event):
self.queue.put(event.src_path)
logging.info(f"Storing message: {self.queue.qsize()}")
print("Producer queue: ", list(self.queue.queue))
def on_created(self, event):
#wait that the transfer of the file is finished before processing it
file_size = -1
while file_size != os.path.getsize(event.src_path):
file_size = os.path.getsize(event.src_path)
time.sleep(1)
self.process(event)
def ConsumeQueue(consumerq):
while True:
if not consumerq.empty():
pool = Pool()
pool.apply_async(ReadPcapFiles, (consumerq.get(), ))
else:
time.sleep(1)
def ReadPcapFiles(get_event):
createdFile = get_event
print(f"This is my event in ReadPacapFile {createdFile}")
countPacket = 0
bandwidth = 0
pcapfile = pyshark.FileCapture(createdFile)
for packet in pcapfile:
countPacket +=1
bandwidth = bandwidth + int(packet.length)
print(f"Packet nr {countPacket}")
print(f"Byte per second {bandwidth}")
if __name__ == '__main__':
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
logging.getLogger().setLevel(logging.DEBUG)
queue = Queue()
path = 'C:\...'
worker = threading.Thread(target=ConsumeQueue, args=(queue, ), daemon=True)
print("About to start worker")
worker.start()
event_handler = Handler(queue)
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
print("About to start observer")
observer.start()
try:
while True:
time.sleep(1)
except Exception as error:
observer.stop()
print("Error: " + str(error))
observer.join()
这篇关于消耗看门狗队列事件的 Python 并行线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:消耗看门狗队列事件的 Python 并行线程
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01