test_dhparam.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import re
  2. import subprocess
  3. import backoff
  4. import docker
  5. import pytest
  6. docker_client = docker.from_env()
  7. ###############################################################################
  8. #
  9. # Tests helpers
  10. #
  11. ###############################################################################
  12. @backoff.on_exception(backoff.constant, AssertionError, interval=2, max_tries=15, jitter=None)
  13. def assert_log_contains(expected_log_line, container_name="nginxproxy"):
  14. """
  15. Check that the nginx-proxy container log contains a given string.
  16. The backoff decorator will retry the check 15 times with a 2 seconds delay.
  17. :param expected_log_line: string to search for
  18. :return: None
  19. :raises: AssertError if the expected string is not found in the log
  20. """
  21. sut_container = docker_client.containers.get(container_name)
  22. docker_logs = sut_container.logs(stdout=True, stderr=True, stream=False, follow=False)
  23. assert bytes(expected_log_line, encoding="utf8") in docker_logs
  24. def require_openssl(required_version):
  25. """
  26. This function checks that the required version of OpenSSL is present, and skips the test if not.
  27. Use it as a test function decorator:
  28. @require_openssl("2.3.4")
  29. def test_something():
  30. ...
  31. :param required_version: minimal required version as a string: "1.2.3"
  32. """
  33. def versiontuple(v):
  34. clean_v = re.sub(r"[^\d\.]", "", v)
  35. return tuple(map(int, (clean_v.split("."))))
  36. try:
  37. command_output = subprocess.check_output(["openssl", "version"])
  38. except OSError:
  39. return pytest.mark.skip("openssl command is not available in test environment")
  40. else:
  41. if not command_output:
  42. raise Exception("Could not get openssl version")
  43. openssl_version = str(command_output.split()[1])
  44. return pytest.mark.skipif(
  45. versiontuple(openssl_version) < versiontuple(required_version),
  46. reason=f"openssl v{openssl_version} is less than required version {required_version}")
  47. @require_openssl("1.0.2")
  48. def negotiate_cipher(sut_container, additional_params='', grep='Cipher is'):
  49. host = f"{sut_container.attrs['NetworkSettings']['IPAddress']}:443"
  50. try:
  51. # Enforce TLS 1.2 as newer versions don't support custom dhparam or ciphersuite preference.
  52. # The empty `echo` is to provide `openssl` user input, so that the process exits: https://stackoverflow.com/a/28567565
  53. # `shell=True` enables using a single string to execute as a shell command.
  54. # `text=True` prevents the need to compare against byte strings.
  55. # `stderr=subprocess.PIPE` removes the output to stderr being interleaved with test case status (output during exceptions).
  56. return subprocess.check_output(
  57. f"echo '' | openssl s_client -connect {host} -tls1_2 {additional_params} | grep '{grep}'",
  58. shell=True,
  59. text=True,
  60. stderr=subprocess.PIPE,
  61. )
  62. except subprocess.CalledProcessError as e:
  63. # Output a more helpful error, the original exception in this case isn't that helpful.
  64. # `from None` to ignore undesired output from exception chaining.
  65. raise Exception("Failed to process CLI request:\n" + e.stderr) from None
  66. def can_negotiate_dhe_ciphersuite(sut_container):
  67. r = negotiate_cipher(sut_container, "-cipher 'EDH'")
  68. assert "New, TLSv1.2, Cipher is DHE-RSA-AES256-GCM-SHA384\n" == r
  69. r2 = negotiate_cipher(sut_container, "-cipher 'EDH'", "Server Temp Key")
  70. assert "DH" in r2
  71. def cannot_negotiate_dhe_ciphersuite(sut_container):
  72. # Fail to negotiate a DHE cipher suite:
  73. r = negotiate_cipher(sut_container, "-cipher 'EDH'")
  74. assert "New, (NONE), Cipher is (NONE)\n" == r
  75. # Correctly establish a connection (TLS 1.2):
  76. r2 = negotiate_cipher(sut_container)
  77. assert "New, TLSv1.2, Cipher is ECDHE-RSA-AES256-GCM-SHA384\n" == r2
  78. r3 = negotiate_cipher(sut_container, grep="Server Temp Key")
  79. assert "X25519" in r3
  80. def should_be_equivalent_content(sut_container, expected, actual):
  81. expected_checksum = sut_container.exec_run(f"md5sum {expected}").output.split()[0]
  82. actual_checksum = sut_container.exec_run(f"md5sum {actual}").output.split()[0]
  83. assert expected_checksum == actual_checksum
  84. # Parse array of container ENV, splitting at the `=` and returning the value, otherwise `None`
  85. def get_env(sut_container, var):
  86. env = sut_container.attrs['Config']['Env']
  87. for e in env:
  88. if e.startswith(var):
  89. return e.split('=')[1]
  90. return None
  91. ###############################################################################
  92. #
  93. # Tests
  94. #
  95. ###############################################################################
  96. def test_default_dhparam_is_ffdhe4096(docker_compose):
  97. container_name="dh-default"
  98. sut_container = docker_client.containers.get(container_name)
  99. assert sut_container.status == "running"
  100. assert_log_contains("Setting up DH Parameters..", container_name)
  101. # `dhparam.pem` contents should match the default (ffdhe4096.pem):
  102. should_be_equivalent_content(
  103. sut_container,
  104. "/app/dhparam/ffdhe4096.pem",
  105. "/etc/nginx/dhparam/dhparam.pem"
  106. )
  107. can_negotiate_dhe_ciphersuite(sut_container)
  108. # Overrides default DH group via ENV `DHPARAM_BITS=3072`:
  109. def test_can_change_dhparam_group(docker_compose):
  110. container_name="dh-env"
  111. sut_container = docker_client.containers.get(container_name)
  112. assert sut_container.status == "running"
  113. assert_log_contains("Setting up DH Parameters..", container_name)
  114. # `dhparam.pem` contents should not match the default (ffdhe4096.pem):
  115. should_be_equivalent_content(
  116. sut_container,
  117. "/app/dhparam/ffdhe3072.pem",
  118. "/etc/nginx/dhparam/dhparam.pem"
  119. )
  120. can_negotiate_dhe_ciphersuite(sut_container)
  121. def test_fail_if_dhparam_group_not_supported(docker_compose):
  122. container_name="invalid-group-1024"
  123. sut_container = docker_client.containers.get(container_name)
  124. assert sut_container.status == "exited"
  125. DHPARAM_BITS = get_env(sut_container, "DHPARAM_BITS")
  126. assert DHPARAM_BITS == "1024"
  127. assert_log_contains(
  128. f"ERROR: Unsupported DHPARAM_BITS size: {DHPARAM_BITS}. Use: 2048, 3072, or 4096 (default).",
  129. container_name
  130. )
  131. # Overrides default DH group by providing a custom `/etc/nginx/dhparam/dhparam.pem`:
  132. def test_custom_dhparam_is_supported(docker_compose):
  133. container_name="dh-file"
  134. sut_container = docker_client.containers.get(container_name)
  135. assert sut_container.status == "running"
  136. assert_log_contains(
  137. "Warning: A custom dhparam.pem file was provided. Best practice is to use standardized RFC7919 DHE groups instead.",
  138. container_name
  139. )
  140. # `dhparam.pem` contents should not match the default (ffdhe4096.pem):
  141. should_be_equivalent_content(
  142. sut_container,
  143. "/app/dhparam/ffdhe3072.pem",
  144. "/etc/nginx/dhparam/dhparam.pem"
  145. )
  146. can_negotiate_dhe_ciphersuite(sut_container)
  147. def test_can_skip_dhparam(docker_compose):
  148. container_name="dh-skip"
  149. sut_container = docker_client.containers.get(container_name)
  150. assert sut_container.status == "running"
  151. assert_log_contains("Skipping Diffie-Hellman parameters setup.", container_name)
  152. cannot_negotiate_dhe_ciphersuite(sut_container)
  153. def test_can_skip_dhparam_backward_compatibility(docker_compose):
  154. container_name="dh-skip-backward"
  155. sut_container = docker_client.containers.get(container_name)
  156. assert sut_container.status == "running"
  157. assert_log_contains("Warning: The DHPARAM_GENERATION environment variable is deprecated, please consider using DHPARAM_SKIP set to true instead.", container_name)
  158. assert_log_contains("Skipping Diffie-Hellman parameters setup.", container_name)
  159. cannot_negotiate_dhe_ciphersuite(sut_container)
  160. def test_web5_https_works(docker_compose, nginxproxy):
  161. r = nginxproxy.get("https://web5.nginx-proxy.tld/port", allow_redirects=False)
  162. assert r.status_code == 200
  163. assert "answer from port 85\n" in r.text