Executor 프레임워크를 통한 스레드 실행 제어 :: 안드로이드 설치 및 개발[SSISO Community]
 
SSISO 카페 SSISO Source SSISO 구직 SSISO 쇼핑몰 SSISO 맛집
추천검색어 : JUnit   Log4j   ajax   spring   struts   struts-config.xml   Synchronized   책정보   Ajax 마스터하기   우측부분

안드로이드 설치 및 개발
[1]
등록일:2020-01-21 17:47:09 (0%)
작성자:
제목:Executor 프레임워크를 통한 스레드 실행 제어

개요

자바의 실행자(Executor) 프레임워크는 스레드에 대해 그리고 시스템에서 스레드가 사용하는 자원에 대한 새로운 차원의 제어를 가능하게 한다.


Executor 프레임워크에 관련된 클래스를 사용하면 시스템이 스레드 수를 관리하거나 더 이상 필요하지 않은 스레드를 취소 할 수 있다.

  • 스레드에서 실행되기를 기다리는 태스크 수를 제어하는 작업자 스레드 풀과 큐를 설정한다.
  • 비정상적으로 종료되는 스레드를 초래하는 에러를 확인한다.
  • 완료되는 스레드를 기다리고 스레드로부터 결과를 가져온다.
  • 스레드의 일괄 처리를 실행하고 고정된 순서로 결과를 가져온다.
  • 사용자가 더 빨리 결과를 확인할 수 있도록 알맞은 시간에 백그라운드 스레드를 시작한다.
9.1 Executor
Executor 프레임워크 기본 구성요소는 Executor 인터페이스다. 목표는 태스크(Runnable)의 생성을 분리하여, 응용프로그램의 동작을 가능하게 하는 것이다.

public interface Executor {
    void executor(Runnable command);
}

단순하면서도 강력한 실행환경의 기초다. 
사용 방법
public class SimpleExecutor implenments Executor {
    @Override
    public void execute(Runnable runnable) {
       new Thread(runnable).start();
    }
}

위 클래스는 내부 스레드 생성보단 많은 기능을 제공하진 않지만, 디커플링, 확장성, 메모리 참조 등과 같은 장점을 제공한다.

Executor가 제어할 수 있는 사항은 다음과 같다.
  • 태스크 큐잉
  • 태스크 실행 순서
  • 태스크 실행 유형 (직렬 또는 동시)
다음 예제는 직렬 태스크 실행자를 구현한다 (AsyncTask와 동일)
private static class SerialExecutor implements Executor {
        final ArrayDeque mTasks = new ArrayDeque();
        Runnable mActive;
        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                @Override
                public void run() {
                    try {
                        r.run();

                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

태스크 큐잉
ArrayDeque는 양방향으로 꺼낼 수 있는 큐로서 스레드에 의해 처리될 떄까지 삽입된 테스크를 보유한다.

테스크 실행 순서
모든 테스크는 mTasks.offer()를 통해 데큐의 끝에 넣어진다. 태스크들은 FIFO 순서다.

태스크 실행 유형
태스크는 직렬로 실행되고 같은 스레드에서 실행될 필요는 없다. 다른 스레드의 POOL의 Executor로 보낸다.

위 소스는 순차적인 실행을 보장하는 실행 환경을 구성한다.

9.2 스레드 풀
우리는 내부 Excutor 프레임 워크까지 구현 할 필요는 없다. 스레드 풀이라는 것은 태스크 큐와 작업자 스레드 집합의 조합이며, 생산자 큐에 태스크를 추가하고 작업자 스레드는 새로운 백그라운드 실행을 수행할 준비가 된 유휴 스레드가 있을 때마다 태스크를 소비한다.

스레드 풀의 장점
  • 작업자 스레드는 실행할 다음 태스크를 기다리기 위해서 살아 있다. 매 스레드를 생성하고 파괴할 때 생기는 오버헤드를 줄일 수 있다.
  • 스레드의 최대 개수로 정의된다. 백그라운드 스레드 때문에 발생하는 과부하를 막는다.
  • 작업자 스레드의 생명주기는 스레드 풀 생명주기에 의해 제어된다.
9.2.1 미리 정의 된 스레드 풀
Executor 프레임워크는 Executors 팩토리 클래스에서 만들어진 미리 정의된 스레드 풀 유형을 포함한다.

고정 크기
고정 크기 스레드 풀은 사용자가 정의한 개수의 작업자 스레드를 유지한다.
Executors.newFixedThreadPool(n) - n은 스레드 수

동적 크기
처리할 태스크가 있을 때 새로운 스레드를 만든다. 유휴 스레드가 없을 경우 60초 동안 기다리고 태스크 큐가 비어있을 경우 종료한다.
Executors.new CachedThreadPool() 로 생성

싱글스레드 생성자
태스크 처리를 위해서 하나의 작업자 스레드를 가진다. 태스크가 차례대로 실행되고 스레드 안전하다.
Executors.newSingleThreadExecutor()로 생성된다.

9.2.2 커스텀 스레드 풀
미리 정의된 Executor 스레드 풀은 ThreadPoolExecutor 클래스에 기반하지만, 이것을 상세한 스레드 풀의 동작을 만드는데 직접 커스터마이징 할 수 있다.

ThreadPoolExecutor 설정

ThreadPoolExecutor executor = new ThreadPoolExecutor(
  int corePoolSize, //  핵심 풀 사이즈
  int maximumPoolSize,  // 최대 풀 크기
  long keepAliveTime,  // 생존 유지 시간
  TimeUnit unit,  // 생존 유지 시간의 단위
  BlockingQueue<Runnable> workQueue);  //테스크 큐 유ㅕㅇ

핵심 풀 크기
원래 스레드 풀은 0개로 시작하지만 핵심 풀 크기에 도달하면 하한 이하로 떨어지지 않는다. 핵심 풀 크기보다 적을 때 태스크가 큐에 추가되면 유휴스레드가 있을 지라도 새로운 스레드가 생성되고, 핵심 풀보다 같거나 커질때 작업자 스레드가 생성된다. 

최대 풀 크기
동시에 실행할 수 있는 스레드의 개수

최대 유휴 시간
생존시간이 설정할 경우 비핵심 풀 스레드를 회수할 수 있다.

태스크 큐 유형
태스크를 보유하는 큐의 구조를 정한다.

9.2.3 스레드 풀 설계
어떤 용도인지 파악하고 현명하게 설정할 필요가 있다. 메모리를 많이 사용하지 않고 하드웨어 기반에서 가장 높은 속도로 작업을 처리하는 것이 중요하다.

크기
스레드의 최대 개수가 너무 작으면 충분한 속도로 태스크를 꺼내지 않아 성능저하가 온다. 반면 너무 많은 스레드도 CPU 오버헤드가 올 수 있다.
기본적으로 CPU 개수를 기준으로 하는 것이 좋다.
int N = Runtime.getRuntime().avilableProcessors()
N은 실제로 동시에 실행할 수 있는 태스크의 최대 개수.

역동성
역동성은 핵심 스레드와 생존 유지 시간에 의해 만들어진다.
핵심 스레드의 개수와 소멸하는 시간에 따라 스레드 풀의 지속 시간은 달라지게 된다.

제한 또는 무제한 태스크 큐
무제한 큐는 메모리가 고갈될 수 있는 반면 제한 큐는 자원 소비는 더 잘 관리될 수 있다.
LinkedBlockingQueue는 무제한 큐이고 , PriorityBlockingQueue ,ArrayBlockingQueue같은 경우 제한 큐이다.

스레드 설정
ThreadPoolExecutor는 작업자 스레드 개수와 풀의 생성과 종료뿐 아니라, 모든 스레드 속성을 정의한다. UI스레드와 경쟁하지 않도록 스레드 우선순위를 낮추는 것이 중요.

작업자 스레드의 구현 - ThreadFactory 인터페이스의 구현을 통해 설정된다.
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Executors.newFixedThreadPool(10, new LowPriorityThreadFactory());
    }


    static class LowPriorityThreadFactory implements ThreadFactory {
        private static int count = 1;
        private static String TAG = "LowPriorityThreadFactory";


        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("LowPrio " + count++);
            t.setPriority(4);
            t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                @Override
                public void uncaughtException(Thread t, Throwable throwable) {
                    Log.d(TAG, "Thread = " +t.getName() + ", error = " + throwable.getMessage());
                }
            });
            return t;
        }
    }
기본적으로 UI스레드보다 낮은 우선순위(4)로 지정하여 UI스레드의 태스크를 방해하지 않도록 하자.

ThreadPoolExecutor 확장
프로그램이 실행자 또는 실행자 태스크를 추적할 수 있도록 확장될 수 있다.

void beforeExecute(Thread t, Runnable r)
스레드를 실행하기 전 런타임 라이브러리에 의해 실행

void afterExecute(Runnable r, Throwable t)
스레드가 정상 종료 후 런타임 라이브러리에 의해 실행 된다.

void terminated() 
대기 중인 태스크가 없을 때 런타임 라이브러리에 의해 실행

예제 ) 기초적인 커스텀 스레드 풀
public class TaskTrackingThreadPool extends ThreadPoolExecutor {
        private AtomicInteger mTaskCount = new AtomicInteger(0);

        public TaskTrackingThreadPool() {
            super(3,3,0L,TimeUnit.SECONDS, new LinkedBlockingQueue());
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            mTaskCount.getAndIncrement();
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            mTaskCount.getAndIncrement();
        }

        public int getNbrOfTasks() {
            return mTaskCount.get();
        }
    }

9.2.4 생명 주기
생명 주기는 Executor를 상속받고 ThreadPoolExecutor에 의해 구현된 ExecutorService 인터페이스를 통해 관리되고 확산된다.

실행 -> shutdown() -> 중단 -> 풀과 큐가 빔 -> 정리 -> 종료
       -> shutdownNow() -> 멈춤

실행
스레드 풀이 생성된 초기 상태로 들어온 태스크를 받아서 작업자 스레드에서 태스크를 실행

중단
ExecutorService.shutdown 호출 후 상태, 현재 실행중인 태스크와 큐에 있는 테스크 처리는 계속 하지만 새로운 태스크 추가 거부

멈춤
ExecutorService.shutdownNow 호출 . 작업자 스레드가 중지되고 큐 안의 모든 태스크 제거

종료
최종 상태. 남아있는 태스크와 작업자 스레드가 없다.

생명주기는 되살릴 수 없으므로 스레드 풀이 실행상태를 벗어나면 다시 재사용 할 수 없다. 한가지 방법은 멈춤 상태에서는 재 조정이 가능하다.

9.2.5 스레드 풀의 중단
실행자는 필요 이상으로 오래 태스크를 처리하면 안된다. 이유 없이 백그라운드에서 많은 메모리를 차지하고 있기 때문이다. 명시적 종료는 실행자를 종료하기 위해 필요하다

void shutdown() : 새로운 태스크 거부, 그러나 기존 풀에 있는 작업은 실행.
List<Runnable> shutdownNow() : 새로운 태스크 거부, 기존 풀 작업 중단 -> List<Runnable>로 반환됨

9.2.6 스레드 풀 사용의 사례와 위험성
스레드풀을 잘못 정의할 때 발생하는 위험성을 설명한다.

큐에 저장하기 보다 스레드 생성을 선호
allowCoreThreadTimeOut(true)는 시스템이 유휴 핵심 풀 스레드를 환수하게 할 수 있다. 따라서 스레드 풀은 큐 보다는 스레드 생성을 선호하도록 정의할 수 있다.

int N = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
  N*2, N*2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
executor.allowCoreThredTimeOut(true);

미리 로드된 태스크 큐 처리
미리 로드된 태스크 큐는 ThreadPoolExecutor 인스턴스에 prestartAllCoreThreads() 또는 prestartCoreThread()로 핵심 스레드를 미리 시작할 수 있다. 첫번째는 모든 스레드를 한번에 실행하고 두번째는 싱글스레드로 차례로 시작한다.
 BlockingQueue preloadedQueue = new LinkedBlockingQueue();
        final String[] alphabat = {"Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta"};
        for (int i = 0; i < alphabat.length; i++) {
            final int j = i;
            preloadedQueue.add(new Runnable() {
                @Override
                public void run() {
                    //긴 동작들
                }
            });
        }
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, preloadedQueue);
        executor.prestartAllCoreThreads();


0개의 핵심 스레드의 위험
핵심 풀 크기에 도달 되자마자 태스크를 새롭게 생성된 스레드에서 실행되는 대신 큐에 들어간다. 0개의 핵심 스레드와 10개의 테스크를 보유할 수 있는 큐는 스레드 생성을 11번째 태스크가 삽입 될 때까지 어떤 태스크도 실행되지 않음

9.3 태스크 관리
태스크 관리 방법을 살펴보자

9.3.1 태스크 표현
태스크는 반복적으로 수행될 수 있지만 실행 자체는 독립적이어야 한다. 이 절에는 Future 인터페이스를 통하여 태스크를 관리하는 Callable 인터페이스를 소개한다.

Callable은 스레드의 결과를 가져오는 인터페이스 이다.

public interface Callable<V> {
   public <V> call() throws Exception;
}

ExecutorService에 의해 Callable 태스크가 처리되면 Callale 태스크는 태스크를 보낸 이후에 사용될 수 있는 Future 인터페이스를 통해 관찰되고 제어될 수 있다.

boolean cancel(boolean mayinterruptIfRunning)
V get()
V get(long timeout, TimeUnit unit)
boolean isCancelled()
boolean isDone()

태스크 안에 선언된 제네릭 타입 V로 결과를 가져온다.

9.3.2 태스크 보내기
스레드 풀은 기본적으로 스레드가 없는 빈큐이고, 스레드 풀이 새로운 태스크에 반응하는 방식은 스레드를 기다리는 큐에 상태에 따라 결정된다.
  • 핵심 풀 크기에 도달하지 않는 경우, 새로운 스레드가 생성될 수 있다.
  • 핵심 풀 크기에 도달했고 큐에 빈 슬롯이 있는 경우, 태스크는 큐에 추가될 수 있다.
  • 핵심 풀 크기에 도달했으나 큐가 가득 찬 경우, 태스크는 거부된다.
동시에 실행하는 여러 태스크가 있을 때, execute, submit 메서드로 하나씩 실행 가능하나, 안드로이드에서는 invokeAll, invokeAny를 통해 일괄로 보낼 수 있다.

개별적 보내기
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
    public void run() {
        doLongRunningOperation();
   }
});

이 때, Callabel을 통해 결과 값을 가져올 수 있다.

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Objet> future = executor.submit(new Collabel<Object>() {
   public Object call() throws Exception {
       Object object = doLongRunningOperation();
       return object;
  }
});

//차단적 호출 : Callable로부터 object 반환
Object result = future.get();


invokeAll
ExecutorService.InvokeAll은 동시에 여러 개의 독립적인 태스크를 실행하며, 모든 비동기 계산이 완료되거나 시간제한이 만료될 떄까지 스레드 호출을 차단단다.

List<Future<T>> invokeAll(Collection<? extends Callable<T>> task)
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

예제
@UiThread
    public void onButtonClick(View v) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                List> tasks = new ArrayList>();
                tasks.add(new Callable() {
                    @Override
                    public String call() throws Exception {
                        return getFirstDataFromNetwork();
                    }
                });
                tasks.add(new Callable() {
                    @Override
                    public String call() throws Exception {
                        return getSecondDataFromNetwork();
                    }
                });

                ExecutorService executorService = Executors.newFixedThreadPool(2);
                try {
                    List> futures = executorService.invokeAll(tasks);
                    String mashedData = mashupResult(futures);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executorService.shutdown();
            }
        });
    }

    private String getFirstDataFromNetwork() {
        // 네트워크 호출
        return "1";
    }

    private String getSecondDataFromNetwork() {
        //네트워크 호출
        return "2";
    }

    private String mashupResult(List> futures) throws InterruptedException {
        for (Future future : futures) {
            // data = futures.get() 으로 결과 얻음
        }
        return "data";
    }
 
invokeAny
이 메서드는 첫번째로 마친 태스크에서 결과를 반환하고 나머지 부분은 무시한다.

9.3.3 태스크 거부하기
테스크는 두가지 이유로 실패할 수 있다. 작업자 스레드 큐가 모두 차 있거나 실행자가 종료된 상태일 경우이다.

응용 프로그램에서 RejectedExceptionHandler를 구현을 제공하여 거부 처리에 대해 커스터 마이징 할 수 있다.

ThreadPoolExecutor는 거부 작업을 위해 다음 4가지 핸들러를 정의한다.

AbortPolicy : RejectedExecutionException을 던짐으로 태스크를 거부한다.
CallerRunsPolicy : 호출자의 스레드에서 동기적으로 태스크를 실행한다.
DiscardOldestPolicy : 큐에서 가장 오래된 태스크를 제거하고 거부된 태스크를 삽입한다.
DiscardPolicy : 태스크의 거부를 조용히 무시한다.

9.4 ExecutorCompletionService
스레드 풀은 큐와 작업자 스레드는 관리하지만 완료된 결과는 관리하지 않는다.그 결과는 ExecutorCompletionService에 의해 이루어 진다.
즉, Future 객체는 소비자 스레드에서 사용 가능한 큐에 배치되어 ExecutorCompletionService에 의해 반환된 결과 값을 처리한다.

예제를 보면서 이해 해 보자.
    private LinearLayout layoutImages;

    private class ImageDownloadTask implements Callable {
        @Override
        public Bitmap call() throws Exception {
            return downloadRemoteImage();
        }

        private Bitmap downloadRemoteImage() {
            // 이미지 다운로드 실행
            return null;
        }
    }

    private class DownloadCompletionService extends ExecutorCompletionService {
        private ExecutorService mExecutor;

        public DownloadCompletionService(ExecutorService executor) {
            super(executor);
            mExecutor = executor;
        }

        public void shutdown() {
            mExecutor.shutdown();
        }

        public boolean isTerminated() {
            return mExecutor.isTerminated();
        }
    }

    private class ConsumerThread extends Thread {
        private DownloadCompletionService mEcs;

        private ConsumerThread(DownloadCompletionService ecs) {
            this.mEcs = ecs;
        }

        @Override
        public void run() {
            super.run();
            try {
                while (!mEcs.isTerminated()) {
                    Future future = mEcs.poll(1, TimeUnit.SECONDS);
                    if (future != null) {
                        addImage(future.get());
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    private void addImage(final Bitmap image) {
        runOnUiThread(new Runnable() {
            @Override
            public void run() {
                ImageView iv = new ImageView(MainActivity.this);
                iv.setImageBitmap(image);
                layoutImages.addView(iv);
            }
        });
    }

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        DownloadCompletionService ecs = new DownloadCompletionService(Executors.newCachedThreadPool());
        new ConsumerThread(ecs).start();
        for (int i = 0; i < 5; i++) {
            ecs.submit(new ImageDownloadTask());
        }
        ecs.shutdown();

9.5 마치며

Executor 프레임 워크는 안드로이드 비동기 기술의 토대를 제공한다. 해당 기법을 이용하면 더 정교한 방법으로 동시성을 관리하게 해준다. 다음시간에는 안드로이드 대표 기술인 AsyncTask에 대해서 배워보자

[본문링크] Executor 프레임워크를 통한 스레드 실행 제어
[1]
코멘트(이글의 트랙백 주소:/cafe/tb_receive.php?no=34926
작성자
비밀번호

 

SSISOCommunity

[이전]

Copyright byCopyright ⓒ2005, SSISO Community All Rights Reserved.