Procházet zdrojové kódy

replace crypto

tags/current_release
adam j hartz před 5 měsíci
rodič
revize
82a62913f9
9 změnil soubory, kde provedl 37 přidání a 358 odebrání
  1. +2
    -0
      CHANGELOG.md
  2. +0
    -10
      LICENSE.included_software
  3. +7
    -3
      catsoop/__AUTH__/login/login.py
  4. +0
    -1
      catsoop/base_context.py
  5. +10
    -23
      catsoop/cslog.py
  6. +0
    -174
      catsoop/fernet.py
  7. +0
    -145
      catsoop/test/crypto_tests.py
  8. +17
    -1
      catsoop/util.py
  9. +1
    -1
      requirements.txt

+ 2
- 0
CHANGELOG.md Zobrazit soubor

@@ -14,6 +14,8 @@ _Work toward next release. Currently under development._

* Upgraded KaTeX to v0.11.1.

* Replaced Fernet encryption with simpler encryption scheme.

**DEPRECATED:**

**REMOVED:**


+ 0
- 10
LICENSE.included_software Zobrazit soubor

@@ -39,16 +39,6 @@ For more information, please refer to <http://unlicense.org/>
########


CAT-SOOP contains a slightly-modified version of fernet.py from the
cryptography Python library (https://github.com/pyca/cryptography). It was
modified to work with raw bytes instead of with URL-safe base64-encoded
strings. The original file was dual-licensed under the BSD 3-clause license OR
the Apache License (version 2.0).


########


CAT-SOOP contains a slightly-modified version of highlight.js, a Javascript
library for syntax highlighting (https://highlightjs.org/). In particular,
syntax highlighting of Python code was modified to include more built-in names.


+ 7
- 3
catsoop/__AUTH__/login/login.py Zobrazit soubor

@@ -555,7 +555,9 @@ def check_password(context, provided, uname, iterations=500000):
pass_hash = user_login_info.get("password_hash", None)
if pass_hash is not None:
if context["csm_cslog"].ENCRYPT_KEY is not None:
pass_hash = context["csm_cslog"].FERNET.decrypt(pass_hash)
pass_hash = context["csm_util"].simple_decrypt(
context["csm_cslog"].ENCRYPT_KEY, pass_hash
)
salt = user_login_info.get("password_salt", None)
hashed_pass = compute_password_hash(
context, provided, salt, iterations, encrypt=False
@@ -592,8 +594,10 @@ def compute_password_hash(
hash_ = hashlib.pbkdf2_hmac(
"sha512", _ensure_bytes(password), _ensure_bytes(salt), iterations
)
if encrypt and (context["csm_cslog"].ENCRYPT_KEY is not None):
hash_ = context["csm_cslog"].FERNET.encrypt(hash_)
if encrypt:
enc_key = context["csm_cslog"].ENCRYPT_KEY
if enc_key is not None:
hash_ = context["csm_util"].simple_encrypt(enc_key, hash_)
return hash_




+ 0
- 1
catsoop/base_context.py Zobrazit soubor

@@ -403,7 +403,6 @@ cs_all_pieces = [
"cslog",
"dispatch",
"errors",
"fernet",
"groups",
"language",
"loader",


+ 10
- 23
catsoop/cslog.py Zobrazit soubor

@@ -48,10 +48,6 @@ import contextlib
from collections import OrderedDict
from datetime import datetime, timedelta

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .fernet import RawFernet

_nodoc = {
"passthrough",
"FileLock",
@@ -83,6 +79,7 @@ def passthrough():
yield


from . import util
from . import base_context
from filelock import FileLock

@@ -101,8 +98,6 @@ if ENCRYPT_PASS is not None:
ENCRYPT_KEY = hashlib.pbkdf2_hmac(
"sha256", ENCRYPT_PASS.encode("utf8"), SALT, 100000, dklen=32
)
XTS_KEY = hashlib.pbkdf2_hmac("sha256", ENCRYPT_PASS.encode("utf8"), SALT, 100000)
FERNET = RawFernet(ENCRYPT_KEY)


def log_lock(path):
@@ -115,7 +110,7 @@ def compress_encrypt(x):
if COMPRESS:
x = lzma.compress(x)
if ENCRYPT_KEY is not None:
x = FERNET.encrypt(x)
x = util.simple_encrypt(ENCRYPT_KEY, x)
return x


@@ -128,7 +123,7 @@ def prep(x):

def decompress_decrypt(x):
if ENCRYPT_KEY is not None:
x = FERNET.decrypt(x)
x = util.simple_decrypt(ENCRYPT_KEY, x)
if COMPRESS:
x = lzma.decompress(x)
return x
@@ -141,20 +136,11 @@ def unprep(x):
return pickle.loads(decompress_decrypt(x))


def _e(x, seed): # not sure seed is the right term here...
x = x.encode("utf8") + bytes([0] * (16 - len(x)))
b = hashlib.sha512(seed.encode("utf8") + ENCRYPT_KEY + SALT).digest()[-16:]
c = Cipher(algorithms.AES(XTS_KEY), modes.XTS(b), backend=default_backend())
e = c.encryptor()
return base64.urlsafe_b64encode(e.update(x) + e.finalize()).decode("utf8")


def _d(x, seed): # not sure seed is the right term here...
x = base64.urlsafe_b64decode(x)
b = hashlib.sha512(seed.encode("utf8") + ENCRYPT_KEY + SALT).digest()[-16:]
c = Cipher(algorithms.AES(XTS_KEY), modes.XTS(b), backend=default_backend())
d = c.decryptor()
return (d.update(x) + d.finalize()).rstrip(b"\x00").decode("utf8")
def _e(x, person):
p = hashlib.sha512(person.encode("utf-8")).digest()[:9]
return base64.urlsafe_b64encode(
hashlib.blake2b(x.encode("utf-8"), person=b"catsoop%s" % p).digest()
).decode("utf-8")


def get_log_filename(db_name, path, logname):
@@ -169,7 +155,7 @@ def get_log_filename(db_name, path, logname):
"""
if ENCRYPT_KEY is not None:
seed = path[0] if path else db_name
path = [_e(i, seed + i) for i in path]
path = [_e(p, seed + repr(path[:ix])) for ix, p in enumerate(path)]
db_name = _e(db_name, seed + db_name)
logname = _e(logname, seed + repr(path))
if path:
@@ -248,6 +234,7 @@ def overwrite_log(db_name, path, logname, new, lock=True):

def _read_log(db_name, path, logname, lock=True):
fname = get_log_filename(db_name, path, logname)
print(fname)
# get an exclusive lock on this file before reading it
cm = log_lock([db_name] + path + [logname]) if lock else passthrough()
with cm:


+ 0
- 174
catsoop/fernet.py Zobrazit soubor

@@ -1,174 +0,0 @@
# This file is part of CAT-SOOP
# Copyright (c) 2011-2020 by The CAT-SOOP Developers <catsoop-dev@mit.edu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# This file is a modified version of the file available at:
# https://github.com/pyca/cryptography/blob/master/src/cryptography/fernet.py

# This original file, part of the cryptography Python3 package
# (https://cryptography.io/en/latest/) was dual licensed under the terms of the
# Apache License, Version 2.0, and the BSD License. See
# https://github.com/pyca/cryptography/blob/master/LICENSE for complete
# details.
"""
Fernet-style encryption forked from the
[`cryptography`](https://cryptography.io/en/latest/) package. Implements
Fernet encryption, but without the bade64-encoding step (produces raw binary
data).
"""

import binascii
import os
import struct
import time

import six

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.hmac import HMAC

_nodoc = {
"InvalidSignature",
"default_backend",
"hashes",
"padding",
"Cipher",
"algorithms",
"modes",
"HMAC",
"InvalidToken",
}


class InvalidToken(Exception):
pass


_MAX_CLOCK_SKEW = 60


class RawFernet(object):
"""
Class (forked from the `Fernet` class in the `cryptography` package) that
implements a raw binary form of Fernet encryption.
"""

def __init__(self, key, backend=None):
if backend is None:
backend = default_backend()

if len(key) != 32:
raise ValueError("Fernet key must be 32 bytes.")

self._signing_key = key[:16]
self._encryption_key = key[16:]
self._backend = backend

@classmethod
def generate_key(cls):
return os.urandom(32)

def encrypt(self, data):
current_time = int(time.time())
iv = os.urandom(16)
return self._encrypt_from_parts(data, current_time, iv)

def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")

padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()

basic_parts = b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return basic_parts + hmac

def decrypt(self, token, ttl=None):
timestamp, data = RawFernet._get_unverified_token_data(token)
return self._decrypt_data(data, timestamp, ttl)

def extract_timestamp(self, token):
timestamp, data = RawFernet._get_unverified_token_data(token)
# Verify the token was not tampered with.
self._verify_signature(data)
return timestamp

@staticmethod
def _get_unverified_token_data(token):
if not isinstance(token, bytes):
raise TypeError("token must be bytes.")

try:
data = token
except (TypeError, binascii.Error):
raise InvalidToken

if not data or six.indexbytes(data, 0) != 0x80:
raise InvalidToken

try:
timestamp, = struct.unpack(">Q", data[1:9])
except struct.error:
raise InvalidToken
return timestamp, data

def _verify_signature(self, data):
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(data[:-32])
try:
h.verify(data[-32:])
except InvalidSignature:
raise InvalidToken

def _decrypt_data(self, data, timestamp, ttl):
current_time = int(time.time())
if ttl is not None:
if timestamp + ttl < current_time:
raise InvalidToken

if current_time + _MAX_CLOCK_SKEW < timestamp:
raise InvalidToken

self._verify_signature(data)

iv = data[9:25]
ciphertext = data[25:-32]
decryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).decryptor()
plaintext_padded = decryptor.update(ciphertext)
try:
plaintext_padded += decryptor.finalize()
except ValueError:
raise InvalidToken
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()

unpadded = unpadder.update(plaintext_padded)
try:
unpadded += unpadder.finalize()
except ValueError:
raise InvalidToken
return unpadded

+ 0
- 145
catsoop/test/crypto_tests.py Zobrazit soubor

@@ -1,145 +0,0 @@
# This file is part of CAT-SOOP
# Copyright (c) 2011-2020 by The CAT-SOOP Developers <catsoop-dev@mit.edu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Tests for CAT-SOOP's crypto
"""

import os
import time
import base64
import random
import string
import unittest

from contextlib import contextmanager

from catsoop.fernet import RawFernet, InvalidToken
from cryptography.fernet import Fernet

from ..test import CATSOOPTest

# -----------------------------------------------------------------------------


@contextmanager
def spoof_time(current_time=None):
if current_time == None:
current_time = random.uniform(0, 4102444800)

old_time_func = time.time
time.time = lambda: current_time
yield current_time
time.time = old_time_func


@contextmanager
def spoof_urandom(pattern=None):
if pattern == None:
pattern = os.urandom(1000)

def _spoofed(x):
out = bytearray(pattern)
while len(out) < x:
out.extend(pattern)
return bytes(out[:x])

old_urandom = os.urandom
os.urandom = _spoofed
yield pattern
os.urandom = old_urandom


class Test_Fernet(CATSOOPTest):
"""
Test for the RawFernet class
"""

test_key = (
b"s\x0f\xf4\xc7\xaf=F\x92>\x8e\xd4Q\xee\x81<\x87\xf7\x90\xb0"
b"\xa2&\xbc\x96\xa9-\xe4\x9b^\x9c\x05\xe1\xee"
)
test_tok = (
b"\x80\x00\x00\x00\x00\x1d\xc0\x9e\xb0\x00\x01\x02\x03\x04\x05"
b"\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f-6\xd5\xcaFUb\x99\xfd\xe10"
b"\x08c8\x04\xb2\xc5\xff\x90\x95\xf5\xd3\x8f\x9a\xb8nUC\xe0&"
b"\x86\xf0;>\xc9q\xb9\xabG\xae#VjT\xe0\x8c*\x0c"
)
test_msg = b"hello"
test_ctime = 499162800
test_iv = bytes(range(16))
test_ttl = 60

def test_generate(self):
"""
Test that the proper token is generated in a specific case
"""
r = RawFernet(self.test_key)
tok = r._encrypt_from_parts(self.test_msg, self.test_ctime, self.test_iv)
self.assertEqual(tok, self.test_tok)
with spoof_time(self.test_ctime):
with spoof_urandom(self.test_iv):
self.assertEqual(r.encrypt(self.test_msg), self.test_tok)

def test_verify(self):
"""
Test proper verification of a token
"""
with spoof_time(self.test_ctime):
r = RawFernet(self.test_key)
out = r.decrypt(self.test_tok, ttl=self.test_ttl)
self.assertEqual(out, self.test_msg)

def test_ttl_handling(self):
"""
Try a couple of verifications to
"""
with spoof_time(self.test_ctime + 61):
# try to decrypt with expire message
r = RawFernet(self.test_key)
with self.assertRaises(InvalidToken):
out = r.decrypt(self.test_tok, ttl=self.test_ttl)

# now decrypt with no ttl
out = r.decrypt(self.test_tok)
self.assertEqual(out, self.test_msg)

def test_compare_fernet(self):
"""
A test to compare our binary Fernet implementation against the base
Fernet implementation from the cryptography library. 50 times, encrypt
a random message with a random key and make sure our result matches
that of the regular Fernet implementation.
"""
chars = (
string.ascii_lowercase + string.ascii_uppercase + string.digits
).encode("utf-8")
for i in range(50):
message = bytes(
random.choice(chars) for i in range(random.randint(100, 10000))
)
with spoof_time():
with spoof_urandom():
key = Fernet.generate_key()
raw_key = base64.urlsafe_b64decode(key)
secret_base = Fernet(key).encrypt(message)
secret_catsoop = RawFernet(raw_key).encrypt(message)
self.assertEqual(
base64.urlsafe_b64decode(secret_base), secret_catsoop
)


if __name__ == "__main__":
unittest.main()

+ 17
- 1
catsoop/util.py Zobrazit soubor

@@ -21,12 +21,28 @@ import os
import ast
import hashlib

from collections import OrderedDict
from datetime import datetime, timedelta
from collections import OrderedDict
from nacl.bindings import (
crypto_secretbox,
crypto_secretbox_open,
)


from . import base_context


def simple_encrypt(key, msg):
nonce = os.urandom(24)
cipher = crypto_secretbox(msg, nonce, key)
return b"%s%s" % (nonce, cipher)


def simple_decrypt(key, cipher):
nonce = cipher[:24]
return crypto_secretbox_open(cipher[24:], nonce, key)


def catsoop_loc_hash():
return hashlib.md5(base_context.cs_url_root.encode("utf-8")).hexdigest()



+ 1
- 1
requirements.txt Zobrazit soubor

@@ -1,11 +1,11 @@
bs4
cheroot
cryptography
filelock
python-jose
markdown>=3
mpmath
ply
pynacl
uwsgi
websockets
lxml


Načítá se…
Zrušit
Uložit