返回介绍

A.6 第17章:加密示例

发布于 2024-02-05 21:59:46 字数 3627 浏览 0 评论 0 收藏 0

这几个脚本用于展示如何使用 futures.ProcessPoolExecutor 执行 CPU 密集型任务。

示例 A-7 使用 RC4 算法加密并解密随机的字节数组,需要 arcfour.py 模块(见示例 A-8)支持才能运行。

示例 A-7 arcfour_futures.py:futures.ProcessPoolExecutor 用法示例

import sys
import time
from concurrent import futures
from random import randrange
from arcfour import arcfour

JOBS = 12
SIZE = 2**18

KEY = b"'Twas brillig, and the slithy toves\nDid gyre"
STATUS = '{} workers, elapsed time: {:.2f}s'


def arcfour_test(size, key):
  in_text = bytearray(randrange(256) for i in range(size))
  cypher_text = arcfour(key, in_text)
  out_text = arcfour(key, cypher_text)
  assert in_text == out_text, 'Failed arcfour_test'
  return size


def main(workers=None):
  if workers:
    workers = int(workers)
  t0 = time.time()

  with futures.ProcessPoolExecutor(workers) as executor:
    actual_workers = executor._max_workers
    to_do = []
    for i in range(JOBS, 0, -1):
      size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
      job = executor.submit(arcfour_test, size, KEY)
      to_do.append(job)

    for future in futures.as_completed(to_do):
      res = future.result()
      print('{:.1f} KB'.format(res/2**10))

  print(STATUS.format(actual_workers, time.time() - t0))

if __name__ == '__main__':
  if len(sys.argv) == 2:
    workers = int(sys.argv[1])
  else:
    workers = None
  main(workers)

示例 A-8 纯粹使用 Python 实现 RC4 加密算法。

示例 A-8 arcfour.py:兼容 RC4 的算法

"""兼容RC4的算法"""

def arcfour(key, in_bytes, loops=20):

  kbox = bytearray(256)  # 创建存储键的数组
  for i, car in enumerate(key):  # 复制键和向量
    kbox[i] = car
  j = len(key)
  for i in range(j, 256):  # 重复到底
    kbox[i] = kbox[i-j]

  # [1] 初始化sbox
  sbox = bytearray(range(256))

  # 按照CipherSaber-2的建议,不断打乱sbox
  # http://ciphersaber.gurus.com/faq.html#cs2
  j = 0
  for k in range(loops):
    for i in range(256):
      j = (j + sbox[i] + kbox[i]) % 256
      sbox[i], sbox[j] = sbox[j], sbox[i]

  # 主循环
  i = 0
  j = 0
  out_bytes = bytearray()

  for car in in_bytes:
    i = (i + 1) % 256
    # [2] 打乱sbox
    j = (j + sbox[i]) % 256
    sbox[i], sbox[j] = sbox[j], sbox[i]
    # [3] 计算t
    t = (sbox[i] + sbox[j]) % 256
    k = sbox[t]
    car = car ^ k
    out_bytes.append(car)

  return out_bytes


def test():
  from time import time
  clear = bytearray(b'1234567890' * 100000)
  t0 = time()
  cipher = arcfour(b'key', clear)
  print('elapsed time: %.2fs' % (time() - t0))
  result = arcfour(b'key', cipher)
  assert result == clear, '%r != %r' % (result, clear)
  print('elapsed time: %.2fs' % (time() - t0))
  print('OK')


if __name__ == '__main__':
  test()

示例 A-9 使用 SHA-256 散列算法打乱字节数组。这个脚本使用标准库中的 hashlib 模块,而这个模块使用 C 语言编写的 OpenSSL 库。

示例 A-9 sha_futures.py:futures.ProcessPoolExecutor 用法示例

import sys
import time
import hashlib
from concurrent import futures
from random import randrange

JOBS = 12
SIZE = 2**20
STATUS = '{} workers, elapsed time: {:.2f}s'


def sha(size):
  data = bytearray(randrange(256) for i in range(size))
  algo = hashlib.new('sha256')
  algo.update(data)
  return algo.hexdigest()


def main(workers=None):
  if workers:
    workers = int(workers)
  t0 = time.time()

  with futures.ProcessPoolExecutor(workers) as executor:
    actual_workers = executor._max_workers
    to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
    for future in futures.as_completed(to_do):
      res = future.result()
      print(res)

  print(STATUS.format(actual_workers, time.time() - t0))

if __name__ == '__main__':
  if len(sys.argv) == 2:
    workers = int(sys.argv[1])
  else:
    workers = None
  main(workers)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文