Operating System/UNIX

[UNIX] 멀티 프로세스 및 스레드 공유 데이터 동기화

  • -

개념

협력적 멀티 프로세스 및 멀티 스레드는 공유 데이터를 동시에 접근하면 그 결과가 접근 순서에 의존되어 데이터의 일관성을 망칠 수 있다.

따라서 이들의 질서있는 실행을 보장하여, 데이터의 일관성을 유지해야 한다.

 

멀티 프로세스멀티 스레드에 대한 자세한 내용은 아래의 링크 참고

 

[UNIX] 멀티 프로세스 (Multi Process) 프로그래밍

개념 프로세스란 실행 중인 프로그램(실행 파일)이자, 현대의 컴퓨팅 시스템에서 작업의 단위이다. 프로세스는 실행되는 동안 여러 개의 새로운 프로세스들을 생성할 수 있다. 생성하는 프로세

sikpang.tistory.com

 

 

[UNIX] 멀티 스레드 (Multi Thread) 프로그래밍

개념 프로세스 내에서 실행되는 흐름을 말한다. 기본적으로 하나의 프로세스에는 하나의 스레드가 실행되지만, 여러 개의 스레드를 생성할 수 있으며, 이는 동시에 여러 개의 작업을 수행할 수

sikpang.tistory.com

 

임계구역 문제

각 프로세스는 공유 데이터에 접근하고 갱신하는 임계 구역(critical section)이 존재한다.

한 프로세스가 임계 구역에서 수행하는 동안 다른 프로세스들은 임계 구역에 들어갈 수 없다는 것이 특징이다.

임계 구역으로 진입하려면 허가를 요청해야 하는데 이 부분을 진입 구역(entry section) 이라고 하고,

임계 구역의 뒤는 퇴출 구역(exit section)이라고 한다.

 

예시

#include <stdio.h>
#include <pthread.h>
#define THREAD_SIZE 5
#define COUNT_MAX 300000

int count = 1;

void* task(void* input)
{
	int number = *(int*)input;

	while (1)
	{
		if (count >= COUNT_MAX)
			break;
		printf("Thread %d : %d\n", number, count);
		count = count + 1;
	}
	return NULL;
}

int main()
{
	pthread_t thread[THREAD_SIZE];
	int data[THREAD_SIZE];

	for (int i=0; i<THREAD_SIZE; ++i)
		data[i] = i + 1;

	for (int i=0; i<THREAD_SIZE; ++i)
		pthread_create(thread+i, NULL, task, data+i);

	for (int i=0; i<THREAD_SIZE; ++i)
		pthread_join(thread[i], NULL);

	printf("Result = %d\n", count);
	return 0;
}

 

THREAD_SIZE 만큼의 스레드를 만들고,

전역변수인 count를 1씩 증가시켜 COUNT_MAX 까지 만드는 것이 목표인 프로그램이다.

 

따라서 모든 스레드가 종료되고, count의 출력 기댓값은 COUNT_MAX이다. 

 

 

예상 외로 결과값은 COUNT_MAX보다 컸고, 300455줄 부터 count 값이 순차적으로 증가하지 않았다.

또한 출력 횟수가 COUNT_MAX를 훨씬 넘어서 300460줄 까지 찍혔다.

data race(데이터 경쟁)로 인한 데이터 일관성이 적어도 460번 이상 흐트러졌다는 뜻이다.

 

 

-fsanitize=thread 옵션을 주고 컴파일 하여 실행한 결과, data race가 발생했다고 알려준다.

 

task() 함수의 반복문 종료 조건에서는 countCOUNT_MAX보다 작았지만,

이후 모든 스레드가 증감식을 거쳐 최종 결과에 영향을 미친 것이다.

 

이러한 임계구역 문제를 해결하기 위해서 여러 소프트웨어 도구들이 존재한다.

본 포스팅에서는 Mutex lockSemaphore를 소개한다.

 

 

Mutex Lock

mutex는 mutual exclusion(상호 배제)을 축약한 말이다.

임계구역 문제를 해결할 High-level 소프트웨어 도구이고, 프로그래머가 사용하기에 가장 간단한 도구다.

 

스레드는 임계 구역에 들어가기 전에 반드시 락(Lock)을 획득해야 하고, 빠져나올 때는 락을 반환해야 한다.

락을 획득하기 위해서는 mutex의 내부 변수를 판별하여 바쁜 대기(busy waiting)스핀락(spinlock)을 거친다.

 

본 포스팅에서는 POSIX 표준을 지킨 Linux 환경 pthread 라이브러리에 포함되어 있는 mutex를 다룬다.

 

pthread_mutex_init

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

 

pthread_mutex_t 자료형의 mutex(이하 뮤텍스)를 선언하고 pthread_mutex_init 함수로 이를 초기화 시킨다.

인자로 해당 뮤텍스의 주소를 넘겨야 한다. 

 

두번째 인자로 뮤텍스의 속성을 추가 지정할 수 있다. NULL을 넘기면 기본 속성으로 초기화된다.

attr에 대한 자세한 설명 및 사용법은 메뉴얼 참고

 

pthread_mutex_lock

 #include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);

 

인자로 들어온 뮤텍스를 lock 한다.

 

만약 이미 해당 뮤텍스가 다른 스레드에 의해 lock 되어있다면,

호출한 스레드는 뮤텍스를 사용할 수 있을 때까지 block 된다.

스케줄링 정책에 따라 어떤 스레드가 뮤텍스를 획득할지를 결정한다.

 

pthread_mutex_trylock()은 lock 함수와 동일하지만,
해당 뮤텍스가 이미 잠겨있으면 block하지 않고 즉시 반환된다.

 

성공 시 0을 반환하고, 에러 시 다음의 코드를 반환한다.

 

Type Description
EAGAIN 뮤텍스의 최대 잠금 수 초과
EDEADLK lock : 데드락 컨디션 감지
EBUSY trylock : 뮤텍스가 이미 잠겨서 획득할 수 없음

 

pthread_mutex_unlock

#include <pthread.h>

int pthread_mutex_unlock(pthread_mutex_t *mutex);

 

인자로 들어온 뮤텍스를 unlock 한다.

성공 시 0을 반환하고, 에러 시 다음의 코드를 반환한다.

 

Type Description
EPERM 호출 스레드가 해당 뮤텍스를 소유하고 있지 않음

 

pthread_mutex_destroy

#include <pthread.h>

int pthread_mutex_destroy(pthread_mutex_t *mutex);

 

인자로 들어온 뮤텍스 객체를 파괴한다.

재사용 하려면 다시 init 함수를 호출해야 한다.

성공 시 0을 반환한다.

 

예제 코드

#include <stdio.h>
#include <pthread.h>
#define THREAD_SIZE 5
#define COUNT_MAX 300000

int count = 1;
pthread_mutex_t mutex;

void* task(void* input)
{
	int number = *(int*)input;

	while (1)
	{
		pthread_mutex_lock(&mutex);
		if (count >= COUNT_MAX)
			break;
		printf("Thread %d : %d\n", number, count++);
		pthread_mutex_unlock(&mutex);
	}
	pthread_mutex_unlock(&mutex);
	return NULL;
}

int main()
{
	pthread_t thread[THREAD_SIZE];
	int data[THREAD_SIZE];

	for (int i=0; i<THREAD_SIZE; ++i)
		data[i] = i + 1;
	pthread_mutex_init(&mutex, NULL);

	for (int i=0; i<THREAD_SIZE; ++i)
		pthread_create(thread+i, NULL, task, data+i);

	for (int i=0; i<THREAD_SIZE; ++i)
		pthread_join(thread[i], NULL);
	pthread_mutex_destroy(&mutex);

	printf("Result = %d\n", count);
	return 0;
}

 

위의 예시와 동일한 작업이지만, 뮤텍스로 스레드의 실행 순서를 보장해주었다.

 

 

count가 순차적으로 잘 증가하는 것을 볼 수 있고, 결과값과 기댓값이 정확히 일치했다.

 

 

-fsanitize=thread 옵션으로 컴파일 후 실행해도 data race 경고는 뜨지 않았다.

 

trylock 예제는 위의 예제에서 pthread_mutex_lock() 호출 부분에 다음 코드로 대체해주기만 하면 된다.

 

while (pthread_mutex_trylock(&mutex));

 

반복문 안에서 감시해야할 기타 다른 부분들을 감시할 수 있고

lock 호출시 나타나는 block 상태를 회피하여 더 안정적으로 프로그래밍 할 수 있다.

 

 

Semaphore

위에 소개한 mutex는 멀티 프로세스 환경에서 공유 메모리를 사용하지 않는다면 빛을 발하지 못한다.

뮤텍스를 생성한 후 fork() 시, 모든 메모리가 복사되어 뮤텍스가 개별로 존재해지기 때문이다.

고로 멀티 프로세스 환경에서도 사용할 수 있는 동기화 도구인 Semaphore(이하 세마포어)를 소개한다.

 

세마포어는 값이 0 미만으로 떨어지지 않는 정수이며,

세마포어 값을 1 증가시키는 연산과, 값을 1 감소시키는 연산이 가능하다.

세마포어의 값이 0인 경우, 값을 감소시키려는 프로세스를 세마포어의 값이 0보다 커질 때까지 block 시킨다.

 

sem_open

#include <fcntl.h>	// oflag
#include <sys/stat.h>	// mode
#include <semaphore.h>

sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);

 

새 세마포어를 만들거나 기존 세마포어를 연다.

세마포어는 name으로 식별된다.

 

두 번째 인자인 oflagopen()의 flag과 동일하다.

세 번째 인자인 modeopen()의 mode와 동일하다.

네 번째 인자인 value해당 세마포어의 값을 value로 초기화시켜준다.

 

성공 시, 생성된 세마포어의 주소를 반환하고, 실패 시 SEM_FAILED를 반환한다.

 

flag과 mode에 대한 자세한 설명은 아래의 링크 참고

 

[UNIX] 입출력 및 리다이렉션 (I/O Redirection)

유닉스 계열 운영체제에서 파일은 연속된 n개의 바이트다. 네트워크, 디스크, 터미널 같은 모든 I/O 디바이스들은 파일로 모델링되며, 모든 입력과 출력은 해당 파일을 읽거나 쓰는 형식으로 수

sikpang.tistory.com

 

sem_wait

#include <semaphore.h>

int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *restrict sem,
			const struct timespec *restrict abs_timeout);

 

인자 sem이 가리키는 세마포어 값을 1 감소시킨다.

해당 세마포어의 값이 0보다 크면 감소가 진행되어 즉시 반환한다.

해당 세마포어의 값이 0이라면 앞서 설명한 대로 호출 프로세스 및 스레드가 block 된다.

 

sem_trywait()은 감소를 즉시 수행하지 못할 경우 에러(errno가 EAGAIN)와 함께 바로 반환된다.

sem_timedwiat()은 지정한 시간만큼만 기다리고, 시간이 만료된 경우 에러(errno가 ETIMEDOUT)와 함께 반환된다.

 

성공 시 0을 반환하고, 실패 시 -1을 반환한다.

 

sem_post

 #include <semaphore.h>

int sem_post(sem_t *sem);

 

인자 sem이 가리키는 세마포어 값을 1 증가시킨다.

성공 시 0을 반환하고, 실패 시 -1을 반환한다.

 

sem_close & sem_unlink

#include <semaphore.h>

int sem_close(sem_t *sem);
int sem_unlink(const char *name);

 

sem_close()는 인자 sem이 가리키는 세마포어를 닫고 모든 리소스를 해제시킨다.

 

sem_unlink()는 인자 name으로 들어온 세마포어가 즉시 제거된다.

세마포어를 더이상 사용하지 않을 때, 제거하기 위해 호출한다.

이미 열려있는 세마포어는 모든 프로세스가 해당 세마포어를 sem_close()할 때까지 제거되지 않는다.

 

예제 코드

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <semaphore.h>
#include <sys/stat.h>
#include <string.h>
#define BUFFER_SIZE 10000
#define PROCESS_SIZE 5
#define MAX_CAPACITY 1
#define MAX_NUM 10000
#define SEMAPHORE_NAME "sem"

sem_t* sem;
char buf[BUFFER_SIZE];

int get_number()
{
	int fd = open("./file", O_RDONLY);
	read(fd, buf, BUFFER_SIZE);
	close(fd);
	return atoi(buf);
}

void write_to_file(int number)
{
	while (1)
	{
		sem_wait(sem);
		int cur = get_number();
		if (cur >= MAX_NUM)
			break;

		int fd = open("./file", O_WRONLY | O_TRUNC);
		sprintf(buf, "%d", cur+1);
		printf("Process %d : %d to %s\n", number, cur, buf);
		write(fd, buf, strlen(buf));
		close(fd);
		sem_post(sem);
	}
	sem_post(sem);
}

int main()
{
	sem = sem_open("sem", O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IXUSR, MAX_CAPACITY);

	for (int i=0; i<PROCESS_SIZE; ++i)
	{
		if (fork() == 0)
		{
			write_to_file(i+1);
			return 0;
		}
	}

	for (int i=0; i<PROCESS_SIZE; ++i)
		wait(NULL);

	sem_close(sem);
	sem_unlink(SEMAPHORE_NAME);

	printf("Result : %d\n", get_number());
	return 0;
}

 

위의 코드는 멀티 프로세스가 하나의 파일에서 숫자를 가져와

1씩 증가시켜서 다시 파일에 쓰는 행동을 MAX_NUM까지 반복한다.

 

 

출력 순서는 다르지만, 출력된 횟수를 보아

0부터 시작하여 MAX_NUM인 10000까지 중복 없이 잘 증가한 것을 볼 수 있다.

 

 

세마포어를 사용하지 않은 경우 위와 같은 결과를 얻을 수 있는데,

최종 결과값은 같지만 출력된 횟수를 보아 데이터 경쟁이 많이 일어난 것으로 확인된다.

 

참고 자료

https://man7.org/linux/man-pages/man3/pthread_mutex_destroy.3p.html
https://man7.org/linux/man-pages/man3/pthread_mutex_lock.3p.html
https://man7.org/linux/man-pages/man7/sem_overview.7.html
https://man7.org/linux/man-pages/man3/sem_open.3.html
https://man7.org/linux/man-pages/man3/sem_wait.3.html
https://man7.org/linux/man-pages/man3/sem_post.3.html
https://man7.org/linux/man-pages/man3/sem_close.3.html
https://man7.org/linux/man-pages/man3/sem_unlink.3.html

 

 

개인 공부용 포스팅인 점을 참고하시고, 잘못된 부분이 있다면 댓글로 남겨주시면 감사드리겠습니다.

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.