Closed comsyspro closed 7 months ago
@comsyspro :
Please find "my current version" of filewatcher.py file here working with watchdog version 2.2.1 (watchdog==2.2.1) - I will make an update to the documentation, but this should help for the time:
`
import atexit
import logging
import os
import socket
import sys
import time
from cachetools import TTLCache
from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import close_old_connections
from django.utils.translation import gettext as _
from pyas2 import settings
from pyas2.models import Organization, Partner
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserverVFS
logger = logging.getLogger("filewatcher")
DAEMONPORT = 16388
PAUSED = False
CACHE = TTLCache(maxsize=2048, ttl=1200)
class FileWatchHandle(PatternMatchingEventHandler):
"""
FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the
directory will be sent.
"""
def __init__(self, tasks, dir_watch):
super(FileWatchHandle, self).__init__(ignore_directories=True)
self.tasks = tasks
self.dir_watch = dir_watch
def handle_event(self, event):
global PAUSED
if PAUSED:
return
else:
self.tasks.add(
(
self.dir_watch["organization"],
self.dir_watch["partner"],
event.src_path,
)
)
logger.info(f' "{event.src_path}" created. Adding to Task Queue.')
def on_modified(self, event):
self.handle_event(event)
def on_created(self, event):
self.handle_event(event)
class WatchdogObserversManager:
"""
Creates and manages a list of watchdog observers as daemons. All daemons will have the same
settings. By default, subdirectories are not searched.
:param: force_vfs : if the underlying filesystem is a network share, OS events cannot be
used reliably. Polling to be done, which is expensive.
"""
def __init__(self, is_daemon=True, force_vfs=False):
self.observers = []
self.is_daemon = is_daemon
self.force_vfs = force_vfs
def add_observer(self, tasks, dir_watch):
if self.force_vfs:
new_observer = PollingObserverVFS(stat=os.stat, listdir=os.scandir)
else:
new_observer = Observer()
new_observer.daemon = self.is_daemon
new_observer.schedule(
FileWatchHandle(tasks, dir_watch), dir_watch["path"], recursive=False
)
new_observer.start()
self.observers.append(new_observer)
def stop_all(self):
for observer in self.observers:
observer.stop()
def join_all(self):
for observer in self.observers:
observer.join()
class Command(BaseCommand):
help = _(
"Daemon process that watches the outbox of all as2 partners and "
"triggers sendmessage when files become available"
)
@staticmethod
def send_message(organization, partner, filepath):
global CACHE
max_attempts = 1
attempt = 1
if filepath in CACHE:
logger.info(f' "{filepath}" already in cache, skipping.')
return
else:
CACHE.__setitem__(key=filepath, value=None)
filesize_probe_counter = 1
filesize_probe_max = 10
while filesize_probe_counter <= filesize_probe_max:
if os.path.getsize(filepath) > 10:
# give os time to finish writing if not done already
time.sleep(1)
break
if filesize_probe_counter >= filesize_probe_max:
logger.info(
_(
f"Max attempts reached {filesize_probe_max}, giving up. "
f"Filesize stayed below 10 bytes for {filepath}. Leave it for bulk cleanup to handle."
)
)
CACHE.__delitem__(key=filepath)
return
else:
time.sleep(1)
filesize_probe_counter += 1
while attempt <= max_attempts:
try:
call_command(
"sendas2message", organization, partner, filepath, delete=True
)
if attempt > 1:
logger.info(_(f"Successfully retried on attempt {attempt}"))
break
# TODO: Retrying should only be considered when neither the retry of the AS2 server, nor the cleanup
# job would be picking up the file (as an AS2 message ID was already created and it might cause
# duplicate submission or wrong async responses). The cases where a retry should be done from here
# are currently not clear/known.
except Exception as e:
if attempt >= max_attempts:
logger.info(
_(
f"Max attempts reached {max_attempts}, giving up. "
f"Exception detail: {e}"
)
)
close_old_connections()
else:
logger.info(
_(
f"Hit exception on attempt {attempt}/{max_attempts}. "
f"Retrying in 5 seconds. Exception detail: {e}"
)
)
# https://developpaper.com/django-database-connection-loss-problem/
close_old_connections()
time.sleep(5)
attempt += 1
def clean_out(self, dir_watch_data):
global PAUSED
PAUSED = True
for dir_watch in dir_watch_data:
files = [
f
for f in os.listdir(dir_watch["path"])
if os.path.isfile(os.path.join(dir_watch["path"], f))
]
for file in files:
logger.info(
f"Send as2 message '{file}' "
f"from '{dir_watch['organization']}' "
f"to '{dir_watch['partner']}'"
)
self.send_message(
dir_watch["organization"],
dir_watch["partner"],
os.path.join(dir_watch["path"], file),
)
PAUSED = False
def handle(self, *args, **options):
logger.info(_("Starting PYAS2 send Watchdog daemon."))
engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
engine_socket.bind(("127.0.0.1", DAEMONPORT))
except socket.error:
engine_socket.close()
raise CommandError(_("An instance of the send daemon is already running"))
else:
atexit.register(engine_socket.close)
tasks = set()
# initialize the list containing the outbox directories
dir_watch_data = []
# build the paths for partners and organization and attach them to dir_watch_data
for partner in Partner.objects.all():
for org in Organization.objects.all():
outbox_folder = os.path.join(
settings.DATA_DIR,
"messages",
partner.as2_name,
"outbox",
org.as2_name,
)
if not os.path.isdir(outbox_folder):
os.makedirs(outbox_folder)
dir_watch_data.append(
{
"path": outbox_folder,
"organization": org.as2_name,
"partner": partner.as2_name,
}
)
if not dir_watch_data:
logger.error(_("No partners have been configured!"))
sys.exit(0)
logger.info(_("Process existing files in the directory."))
# process any leftover files in the directories
self.clean_out(dir_watch_data)
"""Add WatchDog Thread Here"""
logger.info(_(f"PYAS2 send Watchdog daemon started."))
watchdog_file_observers = WatchdogObserversManager(
is_daemon=True, force_vfs=True
)
for dir_watch in dir_watch_data:
watchdog_file_observers.add_observer(tasks, dir_watch)
try:
logger.info(_("Watchdog awaiting tasks..."))
start_time = time.time()
last_clean_time = time.time()
while True:
if tasks:
task = tasks.pop()
logger.info(
f"Send as2 message '{task[2]}' "
f"from '{task[0]}' "
f"to '{task[1]}'"
)
self.send_message(task[0], task[1], task[2])
if (
time.time() - start_time > 86400
): # 24 hours * 60 minutes * 60 seconds
logger.info("Time out - 24 hours are through")
raise KeyboardInterrupt
time.sleep(2)
if time.time() - last_clean_time > 600: # every 10 minutes
logger.info("Clean up start.")
self.clean_out(dir_watch_data)
last_clean_time = time.time()
logger.info("Clean up done.")
except (Exception, KeyboardInterrupt) as msg:
logger.info(f'Error in running task: "{msg}".')
logger.info("Stopping all running Watchdog threads...")
watchdog_file_observers.stop_all()
logger.info("All Watchdog threads stopped.")
logger.info("Waiting for all Watchdog threads to finish...")
watchdog_file_observers.join_all()
logger.info("All Watchdog threads finished. Exiting...")
sys.exit(0)
`
thank you. i'll try it, so this should automate the send direction. but could you also say something how to convert the received files back or parse them back to the original file by scripting it in a similar filewatcher? at the moment i can only download the files in the web gui but i want to develop some scripts which detects new received messages and convert them back to original files and move them to other folders or post process them for different use cases. for example if a image file was received i want to convert the image to a graycolor image and save it into a specific user folder. all done by raw scripting without gui.
You need to set a Command on Message Receipt in the partner setting. It can be any command callable from the command line. As per documentation you can simply pass the filename to this command and then it will do whatever your command will do with that file: https://django-pyas2.readthedocs.io/en/latest/detailed-guide/partners.html#advanced-settings
a sample would be:
echo "$receiver got a message from $sender"
taken from here: https://github.com/abhishek-ram/django-pyas2/issues/81
ok, but i invastigated that for example when i sent an image "test.jpg" with filesize 1000kb then it is received as "test.somerandom.jpg" or "somestring.msg" with a different filesize like 1070kb and also this jpg file can't be opened. there is also a header file and i want to know how can i rebuild the original file? at the moment i don't understand how or in which format the files are saved to the reciever and what have to be done to parse them back or write them back in a binary file like the original.
Indeed strange. Did you try with django-pyas2 on sender and receiver side? Can you share your settings for this behavior in particular to signature/encryption? Did you try without signature/encryption?
what i found out now is that setting content type of partner to "binary" the recieved file is exactly the same. so the question is why should i use for example content type "application/edi-consent"?
is it secure enough/sufficient to use http:// or is it better to run on https:// or should you proxy pass http:// in production environment (for example with nginx proxy manager)?
i also switched to https with this tutorial. should this be the right way? https://timonweb.com/django/https-django-development-server-ssl-certificate/
how to put pyas2 into production mode (this warning appears after doing https tutorial)?
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
I cannot comment on your first question as of yet, but glad you found a working solution.
On production deployment, there are differing views. I use both in production, HTTP as well as HTTPS. Nginx or HAProxy as a reverse proxy applying HTTPS encryption. Then as WSGI server I use gunicorn. The internet is full of deployment guides for Django, depending on infrastructure, host providers etc - here the "official" one. https://docs.djangoproject.com/en/4.2/howto/deployment/
However, the tutorial you are pointing to is django's own development webserver, which you should definitely NOT use in production.
today i tried the django-pyas2 setup with 2 partners for the first time. all works fine over the web gui. so for testing i sent a image jpg file. in the messgae folder "inbox" and "store" you can then find the image file. but it is not like the original file and the filesize differs from the original file. so how can you convert the message by scripting it (not using the web gui for downloading)?
and is there also a filewatcher with detects new messages to start scripts for auto processing like mentioned here? https://django-pyas2.readthedocs.io/en/latest/detailed-guide/extending.html this script gets an error message pointing to:
is there a documentation with more examples how to create automated scripts for building custom post processing tasks with the messages?