ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 파이썬 데이터 압축 전송
    Python 2025. 5. 24. 16:43

    데이터 전송시에는 항상 압축에 대한 유혹이 생긴다. 

    압축해서 보내면 전송량도 줄어들테고, 저장용량도 줄어들텐데...

    문제는 압축에 소요되는 비용과 수신단에서 다시 풀어야 하는 비용이 항상 걸린다. 

     

    파이썬에서 UDP 소켓을 이용해서 데이터 전송시에 압축해서 보내고, 수신측에서 압축을 풀어서 사용하는 예제를 한번 보자. 

    일단은 압축량보다 압축/해제에 소요되는 시간을 최소화하는 형태로 해보자. 

     

    지피티에게 예제를 물어보자. 

     

    UDP 소켓을 사용해서 데이터를 압축 후 전송하고, 수신 측에서 압축 해제하는 Python 예제를 만들어 드릴게요.

    • 압축은 빠른 속도를 위해 zlib을 사용합니다 (compressobj).
    • 압축 해제는 속도가 다소 느려도 괜찮다고 하셨으니, 기본적인 zlib.decompress로 충분합니다.

    Udp_sender.py

    import socket
    import zlib
    
    # UDP 설정
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    
    # 전송할 데이터
    message = b"Hello, this is a large amount of data for compression test! " * 20
    
    # 빠른 압축 (zlib compressobj)
    compressor = zlib.compressobj(level=1)  # level=1: 빠른 압축
    compressed_data = compressor.compress(message) + compressor.flush()
    
    print(f"Original size: {len(message)} bytes")
    print(f"Compressed size: {len(compressed_data)} bytes")
    
    # 소켓 생성 및 데이터 전송
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(compressed_data, (UDP_IP, UDP_PORT))

     

    Udp_receiver.py

    import socket
    import zlib
    
    # UDP 설정
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    BUFFER_SIZE = 4096
    
    # 소켓 생성 및 바인딩
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((UDP_IP, UDP_PORT))
    
    print(f"Listening on {UDP_IP}:{UDP_PORT}")
    
    while True:
        compressed_data, addr = sock.recvfrom(BUFFER_SIZE)
        print(f"Received compressed data ({len(compressed_data)} bytes) from {addr}")
    
        try:
            decompressed_data = zlib.decompress(compressed_data)
            print("Decompressed data:")
            print(decompressed_data.decode('utf-8'))
        except zlib.error as e:
            print("Decompression failed:", e)

     

    📝 참고사항

    • zlib.compressobj(level=1) 은 속도 우선 압축입니다. 압축률은 낮지만 굉장히 빠릅니다.
    • 수신 측에서는 zlib.decompress()로 해제하므로 CPU가 좀 더 소모될 수 있습니다.

    필요하다면 lz4, snappy, zstandard 등도 사용할 수 있어요. 더 빠른 속도를 원하면 추천드릴게요!
    어떤 데이터에 쓸지 살짝 알려주시면 더 최적화된 조합도 도와드릴 수 있어요.

     

    흠.. 일단 코드 자체는 간단해보인다. 

    송신측에서는 compressor.compress(message)를 호출해서 메시지를 압축하고, 

    수신측에서는 수신한 데이터를 zlib.decompress(compressed_data)를 호출해서 압축을 푼다. 

     

    다만, 이 코드만 가지고는 얼마나 압축되었는지, 그리고 압축/해재에 얼마나 시간이 소요되는지 알 수 가 없다. 

    대충 물어보니 zlib(level=1)의 평균 압축률은 35~45% 정도 된다고 한다. 

    • 데이터: 반복되는 텍스트 ("Hello, this is a large amount of data for compression test! " * 20)
    • zlib level=1:
      • 압축률: 약 35~45%
      • 압축 속도: 빠름
      • 압축 해제 속도: 아주 빠름

    흠.. zlib 말고는 다른 것은 없나? 한번 더 물어보니 아래와 같은 것들이 있다고 한다. 

    순서 알고리즘 압축속도 압축률 특징
    1 LZ4 🟢 매우 빠름 🔴 낮음 (~50%) 실시간 처리용 (e.g. 게임, 네트워크)
    2 Snappy 🟢 매우 빠름 🔴 낮음 (~50%) 구글 개발, 로그/네트워크 용도
    3 zlib (level=1) 🟢 빠름 🟡 보통 (~60%) 표준적인 압축, 속도 압축률 균형
    4 Zstandard (level=3~5) 🟡 중간 🟢 좋음 (70~80%) 페이스북 개발, 커스터마이즈 유연
    5 bz2 / zlib (level=9) 🔴 느림 🟢 매우 좋음 (~80% 이상) 최대 압축률 원할 때

     

    위의 표를 보니 lz4가 가장 적당한 것 같다. 왜냐하면 우리는 실시간 전송을 위해서 압축률이 좀 낮더라도 빠르게 압축/해제 하는 알고리즘이 필요하기 때문이다. 

     

    그러면 위의 예제를 lz4로 변경해보자. 

    먼저 lz4 를 설치해줘야 한다. 

    pip install lz4

     

    압축해서 전송하는 코드는 아래와 같다. (udp_sender_lz4.py)

    이때 압축에 소요되는 시간과 압축률도 함께 출력하도록 해보자. 

    import socket
    import time
    import lz4.frame
    
    # UDP 설정
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    
    # 전송할 데이터
    message = b"Hello, this is a large amount of data for compression test! " * 20
    
    # LZ4 압축 (기본 preset 사용, 매우 빠름)
    start = time.perf_counter()
    compressed_data = lz4.frame.compress(message)
    end = time.perf_counter()
    
    print(f"Original size: {len(message)} bytes")
    print(f"Compressed size: {len(compressed_data)} bytes")
    print(f"[LZ4] Compression time: {(end - start) * 1000:.3f} ms")
    
    # UDP 소켓으로 데이터 전송
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(compressed_data, (UDP_IP, UDP_PORT))

     

    수신쪽 코드는 아래와 같다. (udp_receiver_lz4.py)

    import socket
    import time
    import lz4.frame
    
    # UDP 설정
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    BUFFER_SIZE = 4096
    
    # 소켓 바인딩
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((UDP_IP, UDP_PORT))
    
    print(f"Listening on {UDP_IP}:{UDP_PORT}")
    
    while True:
        compressed_data, addr = sock.recvfrom(BUFFER_SIZE)
        print(f"Received compressed data ({len(compressed_data)} bytes) from {addr}")
    
        try:
            start = time.perf_counter()
            decompressed_data = lz4.frame.decompress(compressed_data)
            end = time.perf_counter()
            print(f"[LZ4] Decompressed size: {len(decompressed_data)} bytes")
            print(f"[LZ4] Decompression time: {(end - start) * 1000:.3f} ms")
            
            print(decompressed_data.decode('utf-8'))
        except Exception as e:
            print("Decompression failed:", e)

     

    맨 위의 코드랑 거의 유사하다. 

    lz4.frame.compress(message) 로 압축후 전송하고, 

    수신 데이터를 lz4.frame.decompress(compressed_data) 로 해제한다. 

     

    receiver를 먼저 실행한 후, sender를 실행해보면 아래와 같은 결과가 나온다. 

    1200bytes 를 보내기 위해 압축을 했는데, 최종적으로 98bytes로 압축이 되었고, 압축에 소요된 시간은 14ms 였다. 

    수신측에서는 98bytes를 받아서 1200bytes로 복원했으며, 이때 소요된 시간은 39ms 였다. 

     

    흠.. 이거 1200바이트가 98바이트로 압축된다? 너무 많이 압축되는거 아닌가? 

    생각해보니 데이터가 단순 문자열의 반복이여서 압축이 잘 된것 같다. 

     

    완전 랜덤 문자열이라면 어떻게 될까? 그리고 압축 알고리즘 중에 다른 것도 테스트 해볼 수 있을까? 

    전송부분을 빼고 랜덤데이터에 대해서 압축/해제 성능을 분석해보자. 

     

    아래 코드를 한번 돌려보자. 

    아래코드는 처음에 사용했던 zlib(level=1)과 lz4 알고리즘을 완전 랜덤 데이터에 대해서 압축/해제 성능을 비교하는 코드이다. 

    import os
    import time
    import zlib
    import lz4.frame
    
    # 압축 테스트 함수
    def test_compression(name, compress_fn, decompress_fn, data):
        # 압축
        t0 = time.perf_counter()
        compressed = compress_fn(data)
        t1 = time.perf_counter()
    
        # 압축 해제
        t2 = time.perf_counter()
        decompressed = decompress_fn(compressed)
        t3 = time.perf_counter()
    
        assert decompressed == data, f"[{name}] Decompressed data does not match original!"
    
        return {
            "name": name,
            "original_size": len(data),
            "compressed_size": len(compressed),
            "compression_time_ms": (t1 - t0) * 1000,
            "decompression_time_ms": (t3 - t2) * 1000,
            "compression_ratio": len(compressed) / len(data)
        }
    
    # 압축 알고리즘 설정
    algorithms = [
        {
            "name": "zlib (level=1)",
            "compress": lambda data: zlib.compressobj(level=1).compress(data) + zlib.compressobj().flush(),
            "decompress": zlib.decompress
        },
        {
            "name": "lz4",
            "compress": lz4.frame.compress,
            "decompress": lz4.frame.decompress
        }
    ]
    
    # 테스트 실행
    data = os.urandom(1200)
    results = []
    
    for algo in algorithms:
        result = test_compression(algo["name"], algo["compress"], algo["decompress"], data)
        results.append(result)
    
    # 결과 출력
    print("\n📊 압축 알고리즘 비교 결과")
    print(f"{'알고리즘':<15} | {'압축률':<8} | {'압축(ms)':<10} | {'해제(ms)':<10} | {'압축크기(Bytes)':<16}")
    print("-" * 70)
    for r in results:
        print(f"{r['name']:<15} | {r['compression_ratio']*100:6.2f}% | "
              f"{r['compression_time_ms']:<10.3f} | {r['decompression_time_ms']:<10.3f} | {r['compressed_size']:>8} B")

     

    위의 코드를 돌려보니 아래와 같은 결과가 나왔다. 

    어라? 압축률이 101% ?? 그리고 압축크기가 1223B ??

     

    어라? 1200바이트를 압축했더니 1223바이트가 나왔다?

    이거 뭐야. 압축에 시간은 시간대로 쓰고, 압축 결과물은 원본보다 더 크고... 이러면 압축을 쓸 이유가 없다!!!

    그런데 왜 이렇게 되지? 

    ❓ 왜 압축률이 오히려 커질까?

    1200바이트짜리 os.urandom()은 완전한 랜덤 데이터입니다.
    이런 데이터는 패턴이 없어서 압축이 거의 불가능하거나 오히려 커지는 경우가 많습니다.

    압축 알고리즘은 패턴을 찾아서 그걸 짧은 코드로 바꾸는 방식인데,
    랜덤 데이터는 그런 패턴이 전혀 없어서 오히려 압축 포맷의 헤더 정보가 더해지며 커질 수 있어요.

     

    흠. 다른 알고리즘은 어떨까나? 알고리즘을 추가해서 다시 해보자. 

    먼저 다른 알고리즘을 설치해주자. 

    pip install lz4 zstandard python-snappy

     

    그리고 10회 반복해서 결과를 출력하도록 하자. 

    import os
    import time
    import zlib
    import lz4.frame
    import zstandard as zstd
    import snappy
    
    def test_compression(name, compress_fn, decompress_fn, data, iterations=10):
        total_compress_time = 0
        total_decompress_time = 0
        total_compressed_size = 0
    
        for _ in range(iterations):
            t0 = time.perf_counter()
            compressed = compress_fn(data)
            t1 = time.perf_counter()
    
            t2 = time.perf_counter()
            decompressed = decompress_fn(compressed)
            t3 = time.perf_counter()
    
            assert decompressed == data, f"[{name}] Decompressed data mismatch!"
    
            total_compress_time += (t1 - t0)
            total_decompress_time += (t3 - t2)
            total_compressed_size += len(compressed)
    
        return {
            "name": name,
            "original_size": len(data),
            "compressed_size": total_compressed_size / iterations,
            "compression_time_ms": (total_compress_time / iterations) * 1000,
            "decompression_time_ms": (total_decompress_time / iterations) * 1000,
            "compression_ratio": (total_compressed_size / iterations) / len(data)
        }
    
    # 테스트 데이터 (패턴 있는 것으로 변경하면 압축률 높아짐)
    data = os.urandom(1200)  # 랜덤 데이터
    # data = (b"hello world " * 100)  # 반복 데이터로도 테스트 가능
    
    # 알고리즘 정의
    algorithms = [
        {
            "name": "zlib (level=1)",
            "compress": lambda d: zlib.compress(d, level=1),
            "decompress": zlib.decompress
        },
        {
            "name": "lz4",
            "compress": lz4.frame.compress,
            "decompress": lz4.frame.decompress
        },
        {
            "name": "zstd",
            "compress": lambda d: zstd.ZstdCompressor(level=3).compress(d),
            "decompress": lambda d: zstd.ZstdDecompressor().decompress(d)
        },
        {
            "name": "snappy",
            "compress": snappy.compress,
            "decompress": snappy.decompress
        }
    ]
    
    # 실행
    results = [test_compression(a["name"], a["compress"], a["decompress"], data, iterations=10) for a in algorithms]
    
    # 출력
    print("\n📊 압축 알고리즘 비교 결과 (10회 평균)")
    print(f"{'알고리즘':<15} | {'압축률':<8} | {'압축(ms)':<10} | {'해제(ms)':<10} | {'압축크기(Bytes)':<16}")
    print("-" * 75)
    for r in results:
        print(f"{r['name']:<15} | {r['compression_ratio']*100:6.2f}% | "
              f"{r['compression_time_ms']:<10.3f} | {r['decompression_time_ms']:<10.3f} | {int(r['compressed_size']):>8} B")

     

    위 코드의 실행결과는 아래와 같다.

    여전히 압축후에 100%가 넘는다.

     

    여전히 압축을 했는데, 압축률이 100%가 넘는다. 즉, 압축 전의 데이터보다 더 크기가 커졌다. 

    물론 앞에서 말했듯이 이 결과는 완전랜덤 데이터에 대해서는 압축을 수행하면, 이런 결과가 나올수있다는 것이다. 

     

    실제 데이터 전송시에는 완전랜덤은 아닐테고 하니 이정도까지 최악의 성능은 나오지 않을 것으로 보인다. 

     

    정리를 해보면.. lz4 알고리즘을 사용하여 압축/해제할 경우에, 

    반복되는 문자열이 많은 경우, 압축이 잘 되고 (1200 --> 98), 압축/해제에 소요되는 시간이 커진다. (압축:14ms/해제:34ms)

    반대로 완전 랜덤 문자열인 경우, 압축이 거의 안되고(1200 --> 1223), 압축/해제 소요시간은 줄어든다(압축/해제: 모두 3ms)

     

    전송하려는 데이터가 반복과 완전랜덤의 중간이라고 가정하면, 

    대략 1200bytes는 600bytes로 압축될 것으로 예상되고, 이때 소요시간은 대략 10~20ms 정도 잡으면 될 것 같다. 

     

    실제 어플리케이션에 적용해봐야 알겠지만, 

    이정도면 적용해볼 가치는 있는 것으로 보인다. 

     

    그럼 이만~~

Designed by Tistory.