# ##############################################################################
# This file is part of Interdiode #
# #
# Copyright (C) 2020 Matthieu Gallet <matthieu.gallet@19pouces.net> #
# All Rights Reserved #
# #
# ##############################################################################
import hashlib
import logging
import os
import random
import re
import shlex
import shutil
import subprocess
import tempfile
import time
import uuid
from typing import Dict, Optional, Tuple
from hairgap.constants import (
HAIRGAP_MAGIC_NUMBER_EMPTY,
HAIRGAP_MAGIC_NUMBER_ESCAPE,
HAIRGAP_MAGIC_NUMBER_INDEX,
)
from hairgap.utils import Config, FILENAME_PATTERN, ensure_dir
logger = logging.getLogger("hairgap")
HAIRGAP_PREFIXES = {
HAIRGAP_MAGIC_NUMBER_INDEX.encode(),
HAIRGAP_MAGIC_NUMBER_EMPTY.encode(),
HAIRGAP_MAGIC_NUMBER_ESCAPE.encode(),
}
[docs]class DirectorySender:
"""
Send the content of a directory. Must be subclassed to implement `transfer_abspath` and `index_abspath`.
.. code-block:: python
sender = DirectorySender(Config())
sender.prepare_directory()
# modify in-place the data directory! generate the index file
sender.send_directory()
"""
def __init__(self, config: Config):
self.config = config
[docs] def get_attributes(self) -> Dict[str, str]:
"""return a dict of attributes to add in the index file (like unique IDs to track transfers on the receiver side)
keys and values must be simple strings (no new-lines symbols and not contains the " = " substring).
Available keys must be added to the used :attr:`Receiver.available_attributes`.
"""
return {}
@property
def transfer_abspath(self) -> str:
"""returns the absolute path of directory to send"""
raise NotImplementedError
@property
def index_abspath(self):
"""returns the absolute path of the index file to create """
raise NotImplementedError
@property
def use_tar_archives(self):
if self.config.use_tar_archives is None:
return True
return self.config.use_tar_archives
[docs] def prepare_directory(self) -> Tuple[int, int]:
"""create an index file and return the number of files and the total size (including the index file).
**can modify in-place some files (those empty or beginning by `# *-* HAIRGAP-`)** when not `config.use_tar_archives`
result is always (1, 0) when `config.use_tar_archives` and not `config.always_compute_size` to speed up
"""
start = time.time()
if self.use_tar_archives:
r = self.prepare_directory_tar()
else:
r = self.prepare_directory_no_tar()
end = time.time()
logger.info(
"%s files, %s bytes in %s seconds (%s B/s)"
% (r[0], r[1], (end - start), r[1] / (end - start))
)
return r
def prepare_directory_tar(self) -> Tuple[int, int]:
logger.info("Preparing '%s' as a single tar archive…" % self.transfer_abspath)
ensure_dir(self.index_abspath)
with open(self.index_abspath, "w") as fd:
fd.write(HAIRGAP_MAGIC_NUMBER_INDEX)
fd.write("[hairgap]\n")
for k, v in sorted(self.get_attributes().items()):
fd.write("%s = %s\n" % (k, v.replace("\n", "")))
total_size = 0
total_files = 1
if self.config.always_compute_size:
total_size += os.path.getsize(self.index_abspath)
for root, dirnames, filenames in os.walk(self.transfer_abspath):
for filename in filenames:
file_abspath = os.path.join(root, filename)
if os.path.isfile(file_abspath):
total_files += 1
total_size += os.path.getsize(file_abspath)
logger.info(
"%s file(s), %s byte(s), prepared in '%s'."
% (total_files, total_size, self.transfer_abspath)
)
return total_files, total_size
def prepare_directory_no_tar(self) -> Tuple[int, int]:
logger.info("Preparing '%s' as multiple files…" % self.transfer_abspath)
dir_abspath = self.transfer_abspath
index_path = self.index_abspath
if self.config.split_size:
self.split_source_files(dir_abspath, self.config.split_size)
total_files, total_size = 1, 0
ensure_dir(index_path)
with open(index_path, "w") as fd:
fd.write(HAIRGAP_MAGIC_NUMBER_INDEX)
fd.write("[hairgap]\n")
for k, v in sorted(self.get_attributes().items()):
fd.write("%s = %s\n" % (k, v.replace("\n", "")))
if self.config.split_size:
fd.write("[splitted_content]\n")
fd.write("[files]\n")
for root, dirnames, filenames in os.walk(dir_abspath):
dirnames.sort()
filenames.sort()
for filename in filenames:
file_abspath = os.path.join(root, filename)
expected_sha256 = hashlib.sha256()
if not os.path.isfile(file_abspath):
continue
filesize = os.path.getsize(file_abspath)
with open(file_abspath, "rb") as in_fd:
# start by checking special contents
prefix = in_fd.read(len(HAIRGAP_MAGIC_NUMBER_INDEX.encode()))
expected_sha256.update(prefix)
for data in iter(lambda: in_fd.read(65536), b""):
expected_sha256.update(data)
# if the file starts with a special value, we must rewrite it entirely
# to escape by HAIRGAP_MAGIC_NUMBER_ESCAPE
# maybe not very efficient, but such files are expected to be small
if prefix in HAIRGAP_PREFIXES:
escaped_file_abspath = file_abspath + ".%s" % random.randint(
100000, 1000000 - 1
)
with open(escaped_file_abspath, "wb") as fd_out:
fd_out.write(HAIRGAP_MAGIC_NUMBER_ESCAPE.encode())
with open(file_abspath, "rb") as fd_in:
for data in iter(lambda: fd_in.read(65536), b""):
fd_out.write(data)
os.rename(escaped_file_abspath, file_abspath)
total_size += filesize
file_relpath = os.path.relpath(file_abspath, dir_abspath)
fd.write("%s = %s\n" % (expected_sha256.hexdigest(), file_relpath))
total_files += 1
total_size += os.path.getsize(index_path)
logger.info(
"%s file(s), %s byte(s), prepared in '%s'."
% (total_files, total_size, self.transfer_abspath)
)
return total_files, total_size
@staticmethod
def archive_and_split_directory(
config: Config,
original_path: str,
splitted_path: str,
split_size: int = 100 * 1000 * 1000,
prefix: str = "content.tar.gz.",
):
ensure_dir(splitted_path, parent=False)
tar_cmd = [config.tar, "czf", "-", "-C", original_path, "."]
split_cmd = [
config.split,
"-b",
str(split_size),
"-",
prefix,
]
esc_tar_cmd = [shlex.quote(x) for x in tar_cmd]
esc_split_cmd = [shlex.quote(x) for x in split_cmd]
cmd = "%s | %s" % (" ".join(esc_tar_cmd), " ".join(esc_split_cmd))
logger.info("Archive and split '%s' to '%s'…" % (original_path, splitted_path))
p = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
cwd=splitted_path,
)
stdout, stderr = p.communicate(b"")
if p.returncode:
logger.error("command = %s , return code = %s" % (cmd, p.returncode))
logger.error(
"stdout = %s\nstderr = %s" % (stdout.decode(), stderr.decode())
)
[docs] def split_source_files(self, dir_abspath: str, split_size: int):
"""transform some files into a single, splitted, archive
move the content of the source folder in a subfolder
create another folder in the same source folder
create a tar.gz file with the first subfolder and split it into chunks into the second subfolder
remove the first subfolder
move the content of the second subfolder to its parent
remove the second subfolder"""
logger.info("Split '%s' into %s-bytes chunks" % (dir_abspath, split_size))
names = os.listdir(dir_abspath)
if not names:
return
folder_1 = os.path.join(dir_abspath, str(uuid.uuid4()))
folder_2 = os.path.join(dir_abspath, str(uuid.uuid4()))
ensure_dir(folder_1, parent=False)
for name in names:
os.rename(os.path.join(dir_abspath, name), os.path.join(folder_1, name))
self.archive_and_split_directory(
self.config, folder_1, folder_2, split_size=split_size
)
names = os.listdir(folder_2)
shutil.rmtree(folder_1)
for name in names:
os.rename(os.path.join(folder_2, name), os.path.join(dir_abspath, name))
shutil.rmtree(folder_2)
[docs] def send_directory(self, port: Optional[int] = None):
"""send all files using hairgap.
:param port: the port to send to, overriding the default config
raise ValueError in case of error on the index or the directory to send"""
dir_abspath = self.transfer_abspath
index_path = self.index_abspath
if not os.path.isdir(dir_abspath):
logger.warning(
"Cannot send '%s' (missing directory)." % self.transfer_abspath
)
raise ValueError("missing directory '%s'" % dir_abspath)
elif not os.path.isfile(index_path):
logger.warning(
"Cannot send '%s' (missing index file '%s')."
% (self.transfer_abspath, self.index_abspath)
)
raise ValueError("Missing index '%s'" % index_path)
logger.info("Sending '%s'…" % self.transfer_abspath)
start = time.time()
if self.use_tar_archives:
self.send_directory_tar(port=port)
else:
self.send_directory_no_tar(port=port)
end = time.time()
logger.info(
"Directory '%s' sent in %s seconds."
% (self.transfer_abspath, (end - start))
)
[docs] def send_directory_tar(self, port: Optional[int] = None):
"""send all files using hairgap, using the tar method.
:param port: the port to send to, overriding the default config
"""
dir_abspath = self.transfer_abspath
index_path = self.index_abspath
tar_cmd = [
self.config.tar,
"czf",
"-",
"-C",
os.path.dirname(index_path),
os.path.basename(index_path),
"-C",
os.path.dirname(dir_abspath),
os.path.basename(dir_abspath),
]
# we use gzip, not for compression (most files are probably already compressed) but for the CRC checksum
# we cannot use more efficient algorithms like xz/bz2 (they cannot compress streams)
logger.info("Sending %s via hairgap …" % dir_abspath)
hairgap_cmd = self.get_hairgap_command(self.config, port)
logger.debug("hairgaps command: %s" % " ".join(hairgap_cmd))
logger.debug("tar command: %s" % " ".join(tar_cmd))
esc_tar_cmd = [shlex.quote(x) for x in tar_cmd]
esc_hairgap_cmd = [shlex.quote(x) for x in hairgap_cmd]
cmd = "%s|%s" % (" ".join(esc_tar_cmd), " ".join(esc_hairgap_cmd))
p = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
)
stdout, stderr = p.communicate(b"")
time.sleep(self.config.end_delay_s)
if p.returncode:
logger.error(
"Unable to run '%s' \nreturncode=%s\nstdout=%r\nstderr=%r\n"
% (" ".join(cmd), p.returncode, stdout.decode(), stderr.decode())
)
raise ValueError("Unable to send '%s'" % dir_abspath)
[docs] def send_directory_no_tar(self, port: Optional[int] = None):
"""send all files using hairgap"""
dir_abspath = self.transfer_abspath
index_path = self.index_abspath
self.send_file(self.config, index_path, port=port)
with open(index_path) as fd:
for line in fd:
matcher = re.match(FILENAME_PATTERN, line)
if not matcher:
continue
file_relpath = matcher.group(2)
actual_sha256 = matcher.group(1)
file_abspath = os.path.join(dir_abspath, file_relpath)
self.send_file(
self.config, file_abspath, sha256=actual_sha256, port=port
)
@classmethod
def send_file(
cls,
config: Config,
file_abspath: str,
sha256: Optional[str] = None,
port: Optional[int] = None,
):
if not os.path.isfile(file_abspath):
logger.warning("Missing file '%s'." % file_abspath)
raise ValueError("Missing file '%s'" % file_abspath)
empty_file_fd = None
file_size = os.path.getsize(file_abspath)
if file_size == 0:
# we cannot send empty files
empty_file_fd = tempfile.NamedTemporaryFile(delete=True)
empty_file_fd.write(HAIRGAP_MAGIC_NUMBER_EMPTY.encode())
empty_file_fd.flush()
file_abspath = empty_file_fd.name
if sha256:
msg = "Sending %s via hairgap [sha526=%s, size=%s]…" % (
file_abspath,
sha256,
file_size,
)
else:
msg = "Sending %s via hairgap to port %s…" % (
file_abspath,
port or config.destination_port,
)
logger.info(msg)
cmd = cls.get_hairgap_command(config, port)
logger.info(" ".join(cmd))
with open(file_abspath, "rb") as tmp_fd:
p = subprocess.Popen(
cmd, stdin=tmp_fd, stderr=subprocess.PIPE, stdout=subprocess.PIPE
)
stdout, stderr = p.communicate()
if p.returncode:
logger.error(
"Unable to run '%s' \nreturncode=%s\nstdout=%r\nstderr=%r\n"
% (" ".join(cmd), p.returncode, stdout.decode(), stderr.decode())
)
raise ValueError("Unable to send '%s'" % file_abspath)
logger.info(
"File '%s' sent; sleeping for %ss." % (file_abspath, config.end_delay_s)
)
if empty_file_fd is not None:
empty_file_fd.close()
time.sleep(config.end_delay_s)
@staticmethod
def get_hairgap_command(config: Config, port: Optional[int]):
cmd = [
config.hairgaps_path,
"-p",
str(port or config.destination_port),
]
if config.redundancy:
cmd += [
"-r",
str(config.redundancy),
]
if config.error_chunk_size:
cmd += [
"-N",
str(config.error_chunk_size),
]
if config.max_rate_mbps:
cmd += ["-b", str(config.max_rate_mbps)]
if config.mtu_b:
cmd += ["-M", str(config.mtu_b)]
if config.keepalive_ms:
cmd += ["-k", str(config.keepalive_ms)]
cmd.append(config.destination_ip)
return cmd