대용량 소켓 서버 프로그래밍이라는 주제로 글 하나 올린 것이 화근이 되어서 해당 검색어를 통해서 제 블로그를 방문하시는 분들이 간혹 계시는 지라, 책임감을 느끼고 포스트를 이어 갑니다.  (이럴 틈이 없이 바쁜데 ㅠ.ㅠ)


오늘은 시리즈를 염두하면서 첫 번째 주제로 "Atomic Operation"에 대해서 이야기하고자 합니다.  "Atomic Operation"을 먼저 이야기하려는 이유는, 대용량 지원을 위해서는 멀티 스레드 프로그래밍이 필수이기 때문입니다.  그리고, 멀티 스레드 프로그래밍을 효율적으로 작성하기 위해서 가장 큰 적은 역시 "Context Switching 비용"입니다. 아!!  어려워 보이는 용어가 시작부터 난무하고 있습니다 ㅠ.ㅠ


우선, "Atomic Operation"을 쉽고 무식하게 설명하자면, "한 번에 실행 할 수 있는 명령어"라고 할 수 있습니다.


[소스 1]

1.var
2.a, b : integer;
3.begin
4.a := 1;
5.b := a + 1;

위의 소스 코드를 보면 4: 라인의 경우 한 줄의 코드이기 때문에 한 번에 실행 될 것이라고 착각할 수도 있습니다.  하지만, 이것이 실제 CPU가 처리 할 때에는 [그림 1]에서처럼 여러 명령어를 순차적으로 실행해야 같은 결과를 얻을 수가 있습니다.



[그림 1]


결국 멀티 스레드 상황에서 해당 코드를 실행하기 위해서는 임계영역(Critical Section 등)을 이용해서 락을 걸어두고 실행을 마친 후 락을 푸는 작업으로 한 번에 하나의 스레드만이 코드를 실행 할 수 있도록 해야 합니다.  이때, 스레드가 락에 걸리고 풀리는 과정 중에 수반되는 작업량이 무시 못할 정도로 크며, 이로 인해서 성능 저하가 발생합니다.  이것을 "Context Switching 비용"이라고 합니다.


"b := a + 1;"과 같은 코드가 간단하고 대수롭지 않게 보일지 모르지만, 멀티 스레드 상황에서는 아주 중요한 코드가 됩니다.  이때문에 각 플랫폼 마다 해당 코드를 한 번에 실행 할 수 있는 장치를 마련하고 있습니다.  윈도우즈에서는 Interlokced 함수들이 그런 기능을 제공합니다.


[소스 2]

01.var
02.Old : TObject;
03.iIndex, iMod : integer;
04.begin
05.FCurrent := AObj;
06. 
07.iIndex := InterlockedIncrement(FIndex);
08.if iIndex >= FRingSize then begin
09.iMod := iIndex mod FRingSize;
10.InterlockedCompareExchange(FIndex, iMod, iIndex);
11.end else begin
12.iMod := iIndex;
13.end;

[소스 2]는 대용량 소켓 서버를 작성하면서 패킷 풀을 만들기 위해서 작성한 코드의 일부분입니다.  링 버퍼에 새로운 객체를 추가하기 위해서 현재 위치(FIndex)를 한 칸 앞으로 전진하고 있습니다.  얼핏 보아도 여러 줄의 코드가 실행되고 있기 때문에 임계영역을 이용하여 락을 걸어야 할 것처럼 보입니다.  하지만, Interlocked 함수들을 이용해서 락을 사용하지 않고 현재 위치를 변경하고 있습니다.


7: InterlockedIncrement를 이용해서 FIndex에 1을 더하고 결과를 iIndex에도 저장합니다.  이 동작은 한 번에 이루어지기 때문에 멀티 스레드 상황에서 안전합니다.  이후 다른 스레드가 FIndex의 값을 변경하더라도 우리는 iIndex를 참조하고 있기 때문에 우리가 얻은 값이 중간에 변경 될 염려가 없습니다.


8-9: iIndex가 링 버퍼의 크기를 넘어서게 되면, 링 버퍼 크기로 나눠서 나머지를 취해야 합니다.  (크기가 정해진 큐를 작성하는 알고리즘 참고)


10: FIndex를 계속 증가시켜면 데이터 타입의 한계로 인해서 에러가 발생합니다.  따라서, iMod 값으로 바꿔야 합니다.  하지만, 지금 FIndex를 바꾼다면 다른 스레드가 이미 바꾼 값을 다시 되돌려 놓는 등의 문제가 발생합니다.  따라서, InterlockedCompareExchange를 이용해서 FIndex의 값이 iIndex와 같다면 아직 다른 스레드가 FIndex를 변경하지 않았으므로 iMod를 대입합니다.  이 과정 역시 한 번에 실행됩니다.


이후에는 iMod를 이용해서 "FRing[iMod]"과 같이 데이터를 접근 할 수가 있습니다.  또한 이 모든 과정이 락을 걸지 않고도 멀티 스레드에서 안전하게 동작함을 확신 할 수가 있습니다.


[소스 2]에서 생각 할 수 있는 의문점은 "링 버퍼가 한 바퀴 돌았을 때, 이전 메모리 해제를 어떻게 하는 가?"입니다.  우선 실제 코드는 [소스 3]과 같으며, 전체 코드는 http://bit.ly/XWBvBJ 를 참고하시기 바랍니다.


[소스 3]

1.Old := InterlockedExchangePointer(Pointer(FRing[iMod]), AObj);
2.if Old <> nil then Old.Free;

1: InterlockedExchangePointer를 이용해서 한 번에 "FRing[iMod] := AObj;"를 대입하고, 원래의 값은 Old에 대입하게 됩니다.


2: 만약, Old가 nil이 아니면 객체를 파괴합니다.


그런데 이 Old를 다른 스레드가 사용하고 있으면 어떻게 될까요?  큰 일 납니다 ㅠ.ㅠ  따라서, 저는 충분히 큰 크기의 링 버퍼를 구현하여 모든 객체의 라이프 사이클이 스레드의 참조 기간보다 길다는 것을 보장합니다.  그렇게 할 수 없는 곳에서는 사용 할 수 없지만, 일반적인 패킷 교환 등에서는 그런 일이 벌어지지 않습니다.


또한, 위의 코드는 혹시라도 그러한 일이 벌어지면 "Access Violation" 에러가 발생하게 되지만, 실제 코드에서는 좀 더 보강한 방법을 통해서 컨넥션을 종료하는 것으로 구현되어 있습니다.  그토록 오랫 동안 패킷을 처리 못했다면 분명 문제가 있는 접속이기 때문입니다.  해당 코드에 대해서는 다음 시리즈를 기약해 보도록 하겠습니다 ㅡ.ㅡ;;  (실제 패킷 풀에서는 메모리를 해제하지 않고 재사용하며, 잘못 된 재사용을 막을 수 있도록 코드가 추가되어 있습니다)



Lock-Free에 대한 참고 자료


 


 

통상 Critical Section을 처리하는데 Lock을 많이 사용한다.

그런데 성능이 굉장히 중요한 부분에 있어서 Lock을 사용하기에는 성능 저하가 우려 되는 경우가 있다.


이럴 때 활용할 수 있는 것이 Atomic 연산자를 이용한 Critical Section처리이다.

Atomic 연산이란 기본적으로 연산 수행 중 thread 전환이 되지 않는 것을 보장하는 연산을 일컫는다.


흔히 하는 실수로 성능 때문에 lock을 사용하지 않고 flag 변수를 하나 두고,

flag 변수를 이용하여 critical section을  처리할려 하는데,  이 경우 flag변수에 대한 읽고 쓰는 연산이 

atomic 하지 않으면 critical section이 제대로 보호 받지 못한다.


예를 들어 아래와 같이 코딩을 하여 critical section 처리를 한다면...


unsigned int semaphore  = 0;

while (semaphore) {

    // flag 풀릴 때 까지 yield하면서 기다림

}

semaphore  = 1;

// critical section 처리 함

semaphore  = 0;


위의 코드를 수행하는 여러 thread에서 semaphore flag변수가 0임을 동시에 확인 하면

여러 thread가 동시에 critical section에 진입하게 되어 문제를 일으키게 된다.


이를 해결하기 위해서는 semaphore변수가 0이면 바로 1로 바꿔 버리는 연산이 atomic하게 thread 전환없이

처리되어야 한다.


unsigned int swap(unsigned _new)

{

   unsigned int old = semaphore;

   semaphore = _new;

   return old;

}

while (swap(1)) {

       // flag 풀릴 때 까지 yield하면서 기다림

}

// critical 처리함

swap(0);


위의 코드에서 swap() 함수가 atomic 하게 처리되면 critical section 보호가 된다.


Windows의 경우는 <boost/detail/interlocked.hpp> 에서 이러한 atomic operation들이 지원되어 이를 활용하면 된다.

안타깝게 windows가 아닐 경우는 "Interlocked intrinsics not available" 이라는 에러 메세지가 출력된다.


Linux의 경우는 x86 계열 CPU에서는 gcc에서 제공하는 __exchanged_and_add를 사용할 수 있다.

이 __exchange_and_add의 경우 gcc 4.0.2 이전에는 

arm-none-linux-gnueabi/include/c++/4.0.2/bits/atomicity.h

이후에는 아래와 같이 있다.

arm-none-linux-gnueabi/include/c++/4.5.1/ext/atomicity.h


최근에 Android 폰등에서 ARM CPU를 많이 사용하는데 위의 __exchange_and_add() 이 컴파일은 되기는 하나,

정작 테스트해보면 atomic하지 않아 critical section보호가 제대로 되지 않거나,

아니면 일반적인 pthread mutex로 구현되어 성능이 좋지 않은 것을 확인하였다.


ARMv6 이상 CPU에서는 kernel쪽을 뒤져 보면 아래와 같은 atomic operation inline assembly함수를 찾을 수 있다.

inline unsigned long testAndSwap(volatile int *ptr, unsigned long old, unsigned long  _new)

{

    unsigned long oldval, res;

    do {

        __asm__ __volatile__("@ testAndSwap\n"

        "ldrex  %1, [%2]\n"

        "mov    %0, #0\n"

        "teq    %1, %3\n"

        "strexeq %0, %4, [%2]\n"

            : "=&r" (res), "=&r" (oldval)

            : "r" (ptr), "Ir" (old), "r" (_new)

            : "cc");

    } while (res);

    return oldval;

}


ARMv5te 이하의 CPU의 경우에는 multi-core를 지원하지 않는 CPU로 kernel쪽에서는 

단순히 interrupt를 disable시킨 상태에서 flag 변수 test and swap으로 구현되어 있는데,  

이러한 방식은 user space에서는 활용할 수 없다.


그런데 swap assembly command가 지원되어서 이를 이용하여 swap 함수를 만들어서 활용할 수 있다.

inline unsigned long swap(volatile int *ptr, unsigned long _new)

{

      unsigned long oldval;

        __asm__ __volatile__("@ swap\n"

        "swp %0, %1, [%2]\n"

            : "=&r" (oldval)

            : "r" (_new), "r" (ptr) );

        return oldval;

}


ARM Cortext-A8 CPU에서 간단히 구현해서 pthread_mutex_trylock()과 성능 비교를 해보면 

위의 atomic 연사자를 이용한 critical section 보호가 4배이상 빠른 것을 확인할 수 있었다.


그러나 성능 최적화에서 무엇보다도 중요한 것은 Profiling을 먼저 해보고 

어떤 SW 부분이 bottleneck인지 확인하고 성능 최적화에 들어가야 한다.


성능에 크게 문제가 되지 않을 경우에는 표준적인 pthread_mutex_trylock()등을 활용하면

critical section진입을 위해서 yield하면서 busy waiting하지 않고 suspend되고 resume되는 이점이 있어서

전체적으로 성능상에 오히려 이점이 될 수도 있다.


 


C++11 atomic을 이용해서 프로그래밍할 때 어떻게 해야 하는지 맛 보기 위해 간단한 예제를 작성해 봤습니다. multiple thread에서 공유 integer 변수를 loop 한번 당 매번 1씩 경쟁적으로 증가시키는 간단한 프로그램입니다.


일반 integer 변수와 atomic integer 변수로 비교해 보았고, 일반 integer 변수의 경우 역시 예상대로 정확한 결과가 나오지 않았고, atomic integer 변수는 정확한 결과가 나왔습니다.


또, 일반 integer 변수는 상당히 빠르게 수행되지만 atomic integer 변수는 상당히 느리다는 것을 확인할 수 있었습니다.


테스트 프로그램은 다음과 같습니다. C++11 프로그램인 만큼 몇 가지 C++11 스타일을 적용했습니다


#include <iostream>

#include <atomic>

#include <vector>

// C++11 <thread>가 있으나 버그가 있어 사용하지 못하고 

// boost::thread 사용

#include <boost/thread.hpp>


using namespace std;

using boost::thread;


enum {

    NTHREADS = 8,      // Core i7 Desktop이라 8로 설정

    NRUN = 10000000,

    NLOOP = NRUN * 10

};


// atomic integer type. atomic<int>라고도 할 수 있음. 

// atomic하게 증가됨

atomic_int an{0};

int n{0};


void increment_atomic(int id, int nloop) {

    cout << __PRETTY_FUNCTION__ 

         << "(" << id << ") starts" << endl;

    while (nloop-- > 0) {

        // relaxed memory ordering.

 // 단순 카운트인 경우에 충분

        an.fetch_add(1, memory_order_relaxed);

        // an.fetch_add(1, memory_order_seq_cst);

        // or ++an; 일반 integer처럼 prefix ++ 지원

 // 이 때 memory ordering은 memory_order_seq_cst

        if (nloop % NRUN == 0)

            cout << __PRETTY_FUNCTION__ 

                 << "(" << id << ") running..." << endl;

    }

    cout << __PRETTY_FUNCTION__ 

         << "(" << id << ") exits" << endl;

}


void increment_int(int id, int nloop) {

    cout << __PRETTY_FUNCTION__ 

         << "(" << id << ") starts" << endl;

    while (nloop-- > 0) {

        ++n;

        if (nloop % NRUN == 0)

            cout << __PRETTY_FUNCTION__ 

                 << "(" << id << ") running..." << endl;

    }

    cout << __PRETTY_FUNCTION__ 

         << "(" << id << ") exits" << endl;

}


void test_func(void (*incr)(int, int), void (*print)()) {

    vector<thread> thrs{};


    cout << "Creating threads" << endl;

    for (int i = 0; i < NTHREADS; ++i) {

        // 개선된 push_back()이라 할 수 있는 

        // emplace_back() 활용

        thrs.emplace_back(thread(incr, i, NLOOP));

    }


    cout << "Waiting for all threads to terminate" << endl;

    // lambda 함수 사용

    for_each(thrs.begin(), thrs.end(), [](thread& thr) {

        thr.join();

    });


    print();

}


void usage() {

    cout << "Usage:" << endl;

    cout << "test-atomic atomic|int" << endl;

}


int main(int argc, char* argv[]) {

    if (argc == 2)     {

        string type{argv[1]};


        if (type == "atomic")         {

            // lambda 함수 사용

            test_func(increment_atomic, []() {

                cout << "atomic int = " << an << endl;

            });

        }

        else if (type == "int")         {

            // lambda 함수 사용

            test_func(increment_int, []() {

                cout << "int = " << n << endl;

            });

        }

        else         {

            usage();

        }

    }

    else     {

        usage();

    }

}


컴파일 및 실행 환경은 다음과 같습니다.

$ uname -a
Linux ubuntu 3.2.0-26-generic #41-Ubuntu SMP Thu Jun 14 17:49:24 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux
$ g++ --version
g++-4.6.real (Ubuntu/Linaro 4.6.3-1ubuntu5) 4.6.3
Copyright (C) 2011 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

boost thread와 컴파일할 때는 다음과 같이 library를 명시해 줘야 합니다.

$ g++ --std=c++0x -o test-atomic test_atomic.cpp -lboost_thread-mt

실행한 결과는 다음과 같습니다.

$ ./test-atomic
Usage:
test-atomic atomic|int
$ time ./test-atomic atomic
Creating threads
void increment_atomic(int, int)(void increment_atomic(int, int)(void increment_atomic(int, int)(void increment_atomic(int, int)void increment_atomic(int, int)((24) starts) starts1
) starts
3) starts
0) starts

void increment_atomic(int, int)(6) starts
Waiting for all threads to terminate
void increment_atomic(int, int)(7) starts
void increment_atomic(int, int)(5) starts
void increment_atomic(int, int)(6) running...
void increment_atomic(int, int)(3) running...
void increment_atomic(int, int)(5) running...
void increment_atomic(int, int)(0) running...
void increment_atomic(int, int)(1) running...
void increment_atomic(int, int)(7) running...
void increment_atomic(int, int)(2) running...
void increment_atomic(int, int)(4) running...
void increment_atomic(int, int)(5) running...
...
...
void increment_atomic(int, int)(0) exits
void increment_atomic(int, int)(7) running...
void increment_atomic(int, int)(7) exits
void increment_atomic(int, int)(4) running...
void increment_atomic(int, int)(4) exits
void increment_atomic(int, int)(6) running...
void increment_atomic(int, int)(6) exits
void increment_atomic(int, int)(1) running...
void increment_atomic(int, int)(1) exits
void increment_atomic(int, int)(2) running...
void increment_atomic(int, int)(2) exits
atomic int = 800000000

real 0m14.613s
user 1m40.774s
sys 0m0.032s

$ time ./test-atomic int
Creating threads
void increment_int(int, int)(1) starts
void increment_int(int, int)(2) starts
void increment_int(int, int)(0) starts
void increment_int(int, int)(3) starts
void increment_int(int, int)(4) starts
void increment_int(int, int)(5) starts
Waiting for all threads to terminate
void increment_int(int, int)(6) starts
void increment_int(int, int)(7) starts
void increment_int(int, int)(4) running...
void increment_int(int, int)(1) running...
void increment_int(int, int)(7) running...
void increment_int(int, int)(2) running...
void increment_int(int, int)(4) running...
void increment_int(int, int)(5) running...
void increment_int(int, int)(6) running...
...
...
void increment_int(int, int)(3) running...
void increment_int(int, int)(2) running...
void increment_int(int, int)(2) exits
void increment_int(int, int)(0) running...
void increment_int(int, int)(0) exits
void increment_int(int, int)(6) running...
void increment_int(int, int)(3) running...
void increment_int(int, int)(6) running...
void increment_int(int, int)(3) running...
void increment_int(int, int)(3) exits
void increment_int(int, int)(6) running...
void increment_int(int, int)(6) exits
int = 117287057

real 0m2.518s
user 0m16.345s
sys 0m0.004s

atomic type 을 이용하여 lock-less data structure를 구현할 수 있다고 하더군요.


+ Recent posts