Source code for hairgap.receiver

"""
Receive data from hairgap using a proprietary protocol.
The algorithm is simple:

.. code-block: python

    while True:
        receive_file()
        process_file()


`receive_file` launches the command hairgapr that waits for a transfer and exists when a file is received.
`process_file` read the first bytes of the file

- if they match HAIRGAP_MAGIC_NUMBER_INDEX, then this is an index file, with:
    - the transfer identifier
    - the previous transfer identifier
    - the list of following files and their sha256 (in the transfer order)
- otherwise, this is the next expected file, as read by the index file

Empty files cannot be sent by hairgap, so they are replaced by the HAIRGAP_MAGIC_NUMBER_EMPTY constant.
If a new index file is read before the last expected file of the previous index, then we start a new index:
we assume that the sender has been interrupted and has restarted the whole process.

A 5-second sleep (HAIRGAP_END_DELAY_S) is performed by the sender after each send.

Both functions can be serialized (only if we assume that the process_file function takes less than 5 seconds),
but can also be run in separate threads for handling large files.

"""
# ##############################################################################
#  This file is part of Interdiode                                             #
#                                                                              #
#  Copyright (C) 2020 Matthieu Gallet <matthieu.gallet@19pouces.net>           #
#  All Rights Reserved                                                         #
#                                                                              #
# ##############################################################################

import datetime
import hashlib
import io
import logging
import os
import re
import shlex
import shutil
import subprocess
import tarfile
import tempfile
import time
import uuid
from queue import Empty, Queue
from threading import Thread
from typing import Dict, Optional, Set

from hairgap.constants import (
    HAIRGAP_MAGIC_NUMBER_EMPTY,
    HAIRGAP_MAGIC_NUMBER_ESCAPE,
    HAIRGAP_MAGIC_NUMBER_INDEX,
)
from hairgap.utils import Config, FILENAME_PATTERN, ensure_dir, now

logger = logging.getLogger("hairgap")


[docs]class Receiver: """ define the reception process. Can be split into two threads or can be serialize operations when files are small enough. You just have to call the `loop` method to start the reception process. Basically, the algorithm is: .. code-block:: python while True: receive_file(temporary_filepath) if is_index_file(temporary_filepath): read_list_of_expected_filenames() else: filename = expected_filenames.pop() os.rename(temporary_filepath, filename) """ available_attributes = set() # type: Set[str] def __init__(self, config: Config, threading: bool = False, port: int = None): """wait for transfer files :param config: :param threading: run the two main functions in threads. :param port: override the configured port """ self.config = config self.threading = threading self.port = port # type: int self.process_queue = Queue() self.process_thread = None self.receive_thread = None self.continue_loop = True # type: bool self.hairgap_subprocess = None self.expected_files = Queue() self.transfer_start_time = None # type: Optional[datetime.datetime] # datetime of the last index read self.transfer_received_size = 0 # type: int # sum of the size of the received files since the last index read (excepting the index) self.transfer_success_count = 0 # type: int # number of successfully received files (since the last index read) self.transfer_error_count = 0 # type: int # number of unsuccessfully received files (since the last index read) self.transfer_received_count = 0 # type: int # number of received files # transfer_received_count <= transfer_error_count + transfer_success_count # == 0 if there are errors in hairgap (and no file has been created) self.current_attributes = {} # type: Dict[str, str] # attributes of the last index self.current_split_status = False # is the last transfer split into chunks?
[docs] def receive_file(self, tmp_path) -> Optional[bool]: """receive a single file and returns True if hairgap did not raise an error False if hairgap did raise an error but Ctrl-C None if hairgap was terminated by Ctrl-C """ logger.info("Receiving %s via hairgap…" % tmp_path) ensure_dir(tmp_path, parent=True) with open(tmp_path, "wb") as fd: cmd = [ self.config.hairgapr_path, "-p", str(self.port or self.config.destination_port), ] if self.config.timeout_s: cmd += ["-t", str(self.config.timeout_s)] if self.config.mem_limit_mb: cmd += ["-m", str(self.config.mem_limit_mb)] cmd.append(self.config.destination_ip) self.hairgap_subprocess = subprocess.Popen( cmd, stdout=fd, stderr=subprocess.PIPE ) logger.debug("hairgapr command: %s" % " ".join(cmd)) __, stderr = self.hairgap_subprocess.communicate() fd.flush() returncode = self.hairgap_subprocess.returncode if returncode == 0: self.hairgap_subprocess = None logger.info("%s received via hairgap." % tmp_path) return True if returncode == -2: logger.info("Exiting hairgap…") return None else: logger.warning( "An error %d was encountered by hairgap: \n%s" % (returncode, stderr.decode()) ) self.hairgap_subprocess = None return False
def receive_loop(self): logger.info("Entering receiving loop…") while self.continue_loop: tmp_abspath = self.get_reception_filepath() try: r = self.receive_file(tmp_abspath) except Exception as e: logger.exception(e) time.sleep(1) continue if r is None: # Ctrl-C if os.path.isfile(tmp_abspath): os.remove(tmp_abspath) continue elif not r: time.sleep(1) if self.threading: self.process_queue.put((bool(r), tmp_abspath)) else: self.process_received_file(tmp_abspath) logger.info("Receiving loop exited.") def get_reception_filepath(self): return os.path.join( self.config.destination_path, "receiving", str(uuid.uuid4()) ) def process_loop(self): logger.info("Entering processing loop…") while self.continue_loop: try: valid, tmp_abspath = self.process_queue.get(timeout=1) self.process_received_file(tmp_abspath, valid=valid) except Empty: # the timeout is required to quit the thread when self.continue_loop is False continue except Exception as e: logger.exception(e) time.sleep(1) logger.info("Processing loop exited.") @staticmethod def is_gz_file(tmp_abspath: str): if not os.path.isfile(tmp_abspath): return False with open(tmp_abspath, "rb") as fd: header = fd.read(4) return header[:4] == b"\x1f\x8b\x08\x00"
[docs] def process_received_file(self, tmp_abspath: str, valid: bool = True): """ process a received file the execution time of this method must be small when threading is False (5 seconds between two communications) => must be threaded when large files are processed since we compute their sha256. :param tmp_abspath: the temporary absolute path :param valid: the file has been correctly received by hairgap :return: """ if self.config.use_tar_archives or ( self.config.use_tar_archives is None # auto-detect mode and self.expected_files.empty() and self.is_gz_file(tmp_abspath) ): try: self.process_received_file_tar(tmp_abspath, valid=valid) except Exception as e: logger.exception(e) logger.error("Invalid tar.gz file: %s (removed)" % tmp_abspath) if os.path.isfile(tmp_abspath): os.remove(tmp_abspath) else: self.process_received_file_no_tar(tmp_abspath, valid=valid)
[docs] def process_received_file_tar(self, tmp_abspath: str, valid: bool = True): """ process a tar.gz archive. a single file and a single directory are expected at the root of the received archive :param tmp_abspath: :param valid: :return: """ if not valid: if os.path.isfile(tmp_abspath): os.remove(tmp_abspath) return with tarfile.open(name=tmp_abspath, mode="r:gz") as tar_fd: index_member = None for member in tar_fd.getmembers(): # type: tarfile.TarInfo if "/" not in member.name and member.isfile(): index_member = member break if index_member is None: logger.error("index file not found in %s") return # /!\ the index file must be read before extracting other files with tempfile.NamedTemporaryFile() as dst_fd: src_fd = tar_fd.extractfile(index_member) for data in iter(lambda: src_fd.read(8192), b""): dst_fd.write(data) src_fd.close() dst_fd.flush() self.read_index(dst_fd.name) self.transfer_start() count = 0 for member in tar_fd.getmembers(): # type: tarfile.TarInfo if not member.isfile() or member.issym(): continue root, sep, rel_path = member.name.partition("/") if sep != "/": # the index file => we ignore it continue self.transfer_file_received( tmp_abspath, rel_path, expected_sha256=None, actual_sha256=None, tmp_fd=tar_fd.extractfile(member), ) count += 1 if count == 0: ensure_dir(self.get_current_transfer_directory(), parent=False) self.transfer_complete() os.remove(tmp_abspath)
def process_received_file_no_tar(self, tmp_abspath: str, valid: bool = True): empty_prefix = HAIRGAP_MAGIC_NUMBER_EMPTY.encode() index_prefix = HAIRGAP_MAGIC_NUMBER_INDEX.encode() escape_prefix = HAIRGAP_MAGIC_NUMBER_ESCAPE.encode() if os.path.isfile(tmp_abspath): with open(tmp_abspath, "rb") as fd: prefix = fd.read(len(empty_prefix)) else: prefix = b"" if prefix == escape_prefix: # must be done before the sha256 escaped_tmp_abspath = tmp_abspath + ".b" with open(escaped_tmp_abspath, "wb") as fd_out: with open(tmp_abspath, "rb") as fd_in: fd_in.read(len(escape_prefix)) for data in iter(lambda: fd_in.read(65536), b""): fd_out.write(data) os.rename(escaped_tmp_abspath, tmp_abspath) # no need to use shutil.move if prefix == empty_prefix: open(tmp_abspath, "w").close() if prefix == index_prefix: self.read_index(tmp_abspath) os.remove(tmp_abspath) self.transfer_start() if self.expected_files.empty(): # empty transfer => we mark it as complete ensure_dir(self.get_current_transfer_directory(), parent=False) self.transfer_complete() elif self.expected_files.empty(): if valid: self.transfer_file_unexpected(tmp_abspath, prefix=prefix) elif os.path.isfile(tmp_abspath): os.remove(tmp_abspath) else: expected_sha256, file_relpath = self.expected_files.get() actual_sha256_obj = hashlib.sha256() if os.path.isfile(tmp_abspath): with open(tmp_abspath, "rb") as in_fd: for data in iter(lambda: in_fd.read(65536), b""): actual_sha256_obj.update(data) self.transfer_file_received( tmp_abspath, file_relpath, actual_sha256=actual_sha256_obj.hexdigest(), expected_sha256=expected_sha256, ) if self.expected_files.empty(): # all files of the transfer have been received if self.current_split_status: self.unsplit_received_files( self.config, self.get_current_transfer_directory() ) self.transfer_complete()
[docs] def transfer_start(self): """called before the first file of a transfer the execution time of this method must be small when threading is False (5 seconds between two communications) """ pass
[docs] def transfer_complete(self): """called when all files of a transfer are received. the execution time of this method must be small when threading is False (5 seconds between two communications) You can read :attr:`current_attributes` to retrieve the attributes defined by the sender (set to `None` by default). """ pass
[docs] def get_current_transfer_directory(self) -> Optional[str]: """return a folder name where all files of a transfer can be moved to. The index file has been read and the attributes are set. This folder will be automatically created. If None, all received files will be deleted. """ raise NotImplementedError
@staticmethod def unsplit_received_files(config: Config, dir_abspath): names = os.listdir(dir_abspath) if not names: return folder_1 = os.path.join(dir_abspath, str(uuid.uuid4())) folder_2 = os.path.join(dir_abspath, str(uuid.uuid4())) ensure_dir(folder_1, parent=False) ensure_dir(folder_2, parent=False) for name in names: os.rename(os.path.join(dir_abspath, name), os.path.join(folder_1, name)) names.sort() cat_cmd = [config.cat] + names tar_cmd = [config.tar, "xz", "-C", folder_2] esc_tar_cmd = [shlex.quote(x) for x in tar_cmd] esc_cat_cmd = [shlex.quote(x) for x in cat_cmd] cmd = "%s | %s" % (" ".join(esc_cat_cmd), " ".join(esc_tar_cmd)) p = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, cwd=folder_1, ) stdout, stderr = p.communicate(b"") if p.returncode: logger.error("command = %s , return code = %s" % (cmd, p.returncode)) logger.error( "stdout = %s\nstderr = %s" % (stdout.decode(), stderr.decode()) ) names = os.listdir(folder_2) for name in names: os.rename(os.path.join(folder_2, name), os.path.join(dir_abspath, name)) shutil.rmtree(folder_1) shutil.rmtree(folder_2) # noinspection PyMethodMayBeStatic
[docs] def transfer_file_unexpected(self, tmp_abspath: str, prefix: bytes = None): """called when an unexpected file has been received. Probably an interrupted transfer… :param tmp_abspath: absolute path of the received file :param prefix: is the first bytes of the received file.""" if prefix is None: logger.error("Unexpected file received") else: logger.error("Unexpected file received, starting by %r." % prefix) if os.path.isfile(tmp_abspath): os.remove(tmp_abspath)
[docs] def transfer_file_received( self, tmp_abspath, file_relpath, actual_sha256: Optional[str] = None, expected_sha256: Optional[str] = None, tmp_fd: io.BytesIO = None, ): """called when a file is received the execution time of this method must be small if threading is False (5 seconds between two communications) :param tmp_abspath: the path of the received file :param file_relpath: the destination path of the received file :param actual_sha256: actual SHA256 (not provided in case of tar archives) :param expected_sha256: expected SHA256 (not provided in case of tar archives) :param tmp_fd: provided when tmp_abspath is not given :return: """ if tmp_fd: receive_path = self.get_current_transfer_directory() self.transfer_received_count += 1 size = 0 if receive_path: file_abspath = os.path.join(receive_path, file_relpath) ensure_dir(file_abspath, parent=True) with open(file_abspath, "wb") as dst_fd: for data in iter(lambda: tmp_fd.read(8192), b""): dst_fd.write(data) size += len(data) tmp_fd.close() else: logger.warning("No receive path defined: ignoring %s." % file_relpath) elif os.path.isfile(tmp_abspath): size = os.path.getsize(tmp_abspath) self.transfer_received_count += 1 receive_path = self.get_current_transfer_directory() if receive_path: file_abspath = os.path.join(receive_path, file_relpath) ensure_dir(file_abspath, parent=True) shutil.move(tmp_abspath, file_abspath) else: logger.warning("No receive path defined: removing %s." % tmp_abspath) os.remove(tmp_abspath) else: size = 0 self.transfer_received_size += size values = { "f": file_relpath, "as": actual_sha256, "es": expected_sha256, "s": size, } if actual_sha256 == expected_sha256: logger.info("Received file %(f)s [sha256=%(es)s, size=%(s)s]." % values) self.transfer_success_count += 1 else: logger.warning( "Received file %(f)s [sha256=%(as)s instead of sha256=%(es)s, size=%(s)s]." % values ) self.transfer_error_count += 1
def read_index(self, index_abspath): self.transfer_start_time = now() logger.info("Reading received index…") self.current_attributes = {x: None for x in self.available_attributes} self.expected_files = Queue() expected_count = 0 self.current_split_status = False with open(index_abspath) as fd: for line in fd: if line == "[splitted_content]\n": self.current_split_status = True matcher = re.match(FILENAME_PATTERN, line) if matcher: self.expected_files.put((matcher.group(1), matcher.group(2))) expected_count += 1 continue matcher = re.match(r"^(.+) = (.+)$", line) if matcher: key, value = matcher.groups() if key in self.available_attributes: self.current_attributes[key] = value continue self.transfer_received_size = os.path.getsize(index_abspath) self.transfer_received_count = 1 self.transfer_success_count = 1 self.transfer_error_count = 0 logger.info("Index read: expecting %s file(s)." % expected_count) def loop(self): if self.threading: self.process_thread = Thread(target=self.process_loop) self.process_thread.start() self.receive_thread = Thread(target=self.receive_loop) self.receive_thread.start() self.receive_thread.join() self.process_thread.join() else: self.receive_loop()