Чтение RSS
Рефераты:
 
Рефераты бесплатно
 

 

 

 

 

 

     
 
Эффективная многопоточность

Эффективная многопоточность

Алексей Ширшов

Введение

Итак, снова многопоточность. Вы скажете, какая избитая тема, уж сколько можно про это писать! Да, написано про нее немало. Практически каждый программист, который с нею сталкивался (то есть хоть раз в жизни вызвал функцию CreateThread), может заявить, что он про нее знает все или почти все. Но это глубокое заблуждение. Создание эффективных многопоточных серверов (а делать многопоточного клиента особого смысла нет) – дело сложное и требующее хороших знаний системных механизмов: многопоточности, синхронизации, асинхронного ввода/вывода и много другого. В этой статье я коснусь темы организации пула потоков для эффективной обработки клиентских запросов.

Зачем это нужно

Зачем организовывать пул потоков? Вопрос очень широко распространен, и на него давно существует ответ. Для тех, кто этот ответ знает, данный раздел не будет чем-то новым, так что можете его пропускать.

При приходе клиентского запроса у сервера имеется несколько вариантов действий:

Обрабатывать все запросы в одном потоке;

Обрабатывать каждый запрос в отдельном потоке;

Организовать пул потоков.

Рассмотрим каждый из сценариев.

Обработка всех запросов в одном потоке

Сразу понятно, что это решение подходит только для очень ограниченного числа случаев, в которых количество клиентов невелико, и обращаются они к серверу не часто. Эта самая простая схема работы: минимум потоков, минимум ресурсов, и не нужно ничего синхронизировать. Главное, что нужно сделать – построить очередь входящих запросов, чтобы они не терялись при последовательной обработке. Это несложно, к тому же можно взять уже готовые решения: например, СОМ-сервер STA singleton.

Обработка каждого запроса в отдельном потоке

Это, пожалуй, самая популярная у разработчиков схема. В ней для каждого клиентского запроса создается отдельный поток. Решение это простое и для многих случаев удовлетворительное, так как при этом нужно заботиться, по большому счету, только о синхронизации общих переменных потоков (а их может и не быть). Такая схема работает следующим образом: первичный поток приложения прослушивает клиентские запросы и при поступлении каждого создает новый поток, передавая ему клиентский пакет (данные или команду). Созданный поток выполняет соответствующую обработку, передает результаты обратно клиенту, или же помещает их в БД (или еще куда-нибудь), и завершает свое существование.

Давайте задумаемся, что произойдет, если клиентов окажется слишком много. Сервер для каждого из них будет создавать поток, а это, с точки зрения системы, непростая операция, требующая определенного времени и ресурсов. Виртуальное адресное пространство процесса также уменьшается как минимум на принятый по умолчанию для потока размер стека. Все это очень плохо. Сервер тратит время и ресурсы на создание потока, который обрабатывает клиентский запрос всего за доли секунды и затем уничтожается. При этом мы должны учитывать, что физически одновременно выполняться могут только количество потоков, не превышающее числа процессоров на компьютере. На ОС Windows NT/2000 при 100 одновременно запущенных потоках наш сервер будет работать очень неоптимально, что отрицательно скажется на времени обработки запроса.

Основные недостатки такой модели:

частое создание и завершение потоков;

малое время работы потока;

нерегулируемое количество потоков;

в большинстве случаев отсутствие очереди клиентских запросов;

большое количество переключений контекстов рабочих потоков.

Для решения этих проблем и предназначен пул потоков.

Организация пула потоков

Что такое пул потоков? В жизни мы очень часто встречаемся с организацией пула. Например, когда вы идете в столовую, вы встречаетесь с пулом подносов. Да-да, не смейтесь. Подносы организованы в пул (попробуйте объяснить это поварам :) ); клиентов может быть намного меньше, чем подносов, и наоборот. Когда подносов много, они лежат без дела, когда подносов мало, клиенты ждут, пока они освободятся. Число подносов, то есть размер пула, заранее определяется так, чтобы в большинстве случаев клиенты не ждали подносов. Однако случаются часы пик, когда клиентов очень много. Просто нереально выделить отдельный поднос каждому клиенту, да и не нужно это. Клиент все равно будет стоять в очереди к кассе, так что траты на подносы не принесут реальных выгод. Это, конечно, очень далекая и несовершенная аналогия, но она показывает, что в природе и жизни пул чего-либо очень часто используется как наиэффективнейшая схема обслуживания запросов.

Рассмотрим механизм работы пула потоков. Имеется главный поток приложения, прослушивающий клиентские запросы. Пул потоков создается заранее или при поступлении первого запроса. Минимальный размер пула обычно выбирается равным 1, однако это непринципиально. При поступлении запроса главный поток выбирает поток из пула и передает ему запрос. Если количество потоков в пуле достигло максимума, запрос помещается в очередь. Если количество потоков меньше максимального, и все они заняты обработкой, создается новый поток, который получает клиентский пакет на обработку. Если количество потоков равно максимальному и все потоки занимаются обработкой, то есть активны, пакет ставится в очередь и ждет освобождения одного из потоков. Алгоритмы добавления потоков в пул и определения оптимального размера пула сильно зависят от решаемой задачи. Более подробно об этом будет сказано позже.

При использовании RPC-транспорта (в случае с СОМ-серверами) о пуле потоков заботиться не нужно. СОМ-сервер MTA singleton – лучшее решение для СОМ в том смысле, что ничего не нужно делать по поводу организации пула потоков. Система (точнее СОМ-runtime) все делает сама. Однако, если вы используете чистый RPC, вам придется все организовывать самому.

Примитивы операционной системы

Сразу оговорюсь, что в качестве операционной системы я буду рассматривать Windows NT версии 3.1 и выше. Для функций, которые появились позже, версия ОС будет оговариваться отдельно. Линейка Windows 9x не предоставляет никаких средств для организации пула потоков.

В операционной системе есть три механизма организации очереди запросов (очередь запросов – неотъемлемая часть пула): DPC – deferred procedure call (отложенный вызов процедуры), APC – asynchronous procedure call (асинхронный вызов процедуры) и объект ядра queue (очередь), которая доступна приложениям пользовательского режима (user mode) в виде более сложного объекта "порт завершения ввода/вывода". DPC используется только в режиме ядра (kernel mode) в основном драйверами устройств для более эффективной обработки запросов ввода/вывода. DPC мы рассматривать не будем, так как эта тема больше касается программирования драйверов устройств, а мы собираемся писать прикладную программу пользовательского режима. APC, в отличии от DPC, всегда выполняется в контексте какого либо потока (с каждым потоком ассоциирована своя очередь APC-запросов) и может генерировать страничные ошибки (page faults), ожидать перехода объекта ядра в сигнальное состояние, и так далее.

ПРИМЕЧАНИЕ

А почему функции DPC не могут генерировать страничные ошибки? Дело в том, что DPC и APC ставятся в очередь системой с помощью программного прерывания и обрабатываются на определенном уровне прерываний IRQL – interrupt request level. IRQL DPC совпадает с IRQL dispatch, на котором обрабатываются страничные ошибки (он даже называется DPC/dispatch, чтобы отразить это). Как только система поднимает текущий уровень до DPC/dispatch, все прерывания с меньшим или равным уровнем маскируются (блокируются). После обработки DPC система понижает уровень и, если в очереди находиться еще один DPC-запрос, вновь генерируется программное прерывание. Если при обработке DPC-запроса случится обращение к странице памяти, не находящейся в физической памяти, система не сможет «подкачать» эту страницу с диска. Уровень прерывания IRQL APC ниже DPC/dispatch, так что APC могут свободно наслаждаться всеми прелестями виртуального адресного пространства процесса.

APC бывают двух видов: режима ядра и пользовательского режима. APC режима ядра отличается от APC пользовательского режима тем, что система может прервать работу потока для вызова процедуры без его ведома, тогда как для исполнения APC пользовательского режима поток должен находится в специальном «тревожном» (alertable) ожидании, как бы давая согласие на исполнение процедуры. Объект "очередь" и его производный объект "порт завершения ввода/вывода" специально предназначены для организации пула и, кроме очереди запросов, могут управлять ассоциированными с ними потоками. Давайте рассмотрим APC пользовательского режима и порт завершения ввода/вывода более подробно.

APC пользовательского режима

Этот механизм можно использовать, если нужно выполнить какую-либо операцию (функцию) в контексте определенного потока. Для выполнения функции поток должен «дать согласие», перейдя в состояние тревожного ожидания (alertable wait state). Если поток находится в таком состоянии, то, как только мы поставим в очередь APC-запрос с указанием адреса функции и произвольного параметра для нее, поток перейдет к выполнению данной функции, после чего выйдет из состояния ожидания. APC пользовательского режима могут использовать функции ReadFileEx, WriteFileEx, а также SetWaitableTimer, о которой мы поговорим отдельно. Функции ReadFileEx и WriteFileEx предназначены специально для асинхронных операций – для них вы обязаны открывать файл (файл в самом общем смысле) в асинхронном режиме, указывая флаг FILE_FLAG_OVERLAPPED, а также для каждой операции создавать структуру OVERLAPPED. В качестве последнего параметра обе функции принимают адрес специальной функции завершения – FileIOCompletionRoutine. После завершения асинхронной операции, если поток находится в тревожном ожидании, эта функция будет вызвана с помощью механизма APC. В тревожное ожидание поток может перейти с помощью «расширенных» функций ожидания, которые оканчиваются на Ex. Это SleepEx, WaitForSingleObjectEx, WaitForMultipleObjectsEx и другие. Для того чтобы вручную поместить APC-запрос в очередь потока, нужно воспользоваться функцией QueueUserAPC. Вот ее прототип:

DWORD QueueUserAPC(

  PAPCFUNC pfnAPC,  // APC функция

  HANDLE hThread,   // хендл потока

  ULONG_PTR dwData  // параметр APC функции

);

Рассмотрим небольшой пример ее использования (проверка ошибок устранена для повышения наглядности).

const int _SOME_MAGIC_VALUE = 5;

DWORD CALLBACK trd1(LPVOID p)

{

  HANDLE hEvent = (HANDLE)p;

  SetEvent(hEvent);

  int i = 0;

  while(i < _SOME_MAGIC_VALUE){

    SleepEx(INFINITE, true);

    cout << i++ << endl;

  }

  return 0;

}

VOID CALLBACK APCProc(ULONG_PTR dwParam)

{

  cout << "APC Proc #" << dwParam;

  cout << " threadid :" << GetCurrentThreadId() << endl;

}

int main()

  HANDLE hEvent = CreateEvent(0, false, false, NULL);

  DWORD trd_id = 0;

  HANDLE hThread = CreateThread(0, 0, trd1, hEvent, 0, &trd_id);

  cout << "Thread id is 0x" << hex << trd_id << endl;

  WaitForSingleObject(hEvent, INFINITE);

  for(int i = 0;i < _SOME_MAGIC_VALUE;i++){

    QueueUserAPC(APCProc, hThread, i);

  }

  WaitForSingleObject(hThread, 1000);

  CloseHandle(hThread);

  return 0;

}

Несмотря на кажущуюся простоту, пример довольно сложен, и не всегда можно предсказать, что будет на экране после его завершения. Давайте разберем его в форме вопрос-ответ.

Почему я синхронизирую потоки с помощью события?

Если этого не сделать, то все пять APC-запросов выполнятся еще до того, как функция потока trd1 получит управление. Это произойдет потому, что сама система в процессе создания потока использует механизм APC-вызовов для инициализации потока. С его помощью, например, происходит вызов всех функций DllMain с параметром DLL_THREAD_ATTACH, если, конечно, вы не вызывали DisableThreadLibraryCalls для какой-либо библиотеки.

Почему на экран выводится странный результат?

Thread id is 0x68c

APC Proc #0 threadid :68c

0

APC Proc #1 threadid :68c

APC Proc #2 threadid :68c

APC Proc #3 threadid :68c

APC Proc #4 threadid :68c

1

Как я уже говорил, для каждого потока система организует очередь из APC-запросов, так что в момент обработки первого запроса потоком система успевает добавить в очередь все остальные запросы, которые и выполняются при следующем вызове SleepEx. Результат сильно зависит от загруженности системы, так что у вас он может быть другим: например, все запросы успеют выполниться за одну итерацию.

Почему, если закомментировать тело APCProc, на экран выводится следующее?

Thread id is 0x7b4

0

1

2

3

4

Так как теперь эта процедура фактически ничего не делает, система не успевает добавить новый запрос в очередь до завершения обработки предыдущего, так что каждый SleepEx обрабатывает «свой» APC-запрос.

Теперь вам должно быть понятно, как использовать данный механизм для организации пула потоков. Вот примерный сценарий для фиксированного количества потоков в пуле: в главном потоке приложения создаются несколько рабочих потоков, каждый из которых сразу переходит в состояние тревожного ожидания специально созданного основным потоком события. Когда приходит клиентский запрос, главный поток передает APC-запрос одному из рабочих потоков. Рабочий поток пробуждается и выполняет функцию, поставленную в очередь главным потоком. При этом он не покидает функции WaitForSingleObjectEx. То есть выполнение APC-запроса производится как бы внутри функции WaitForSingleObjectEx. После завершения выполнения запроса управление передается функции WaitForSingleObjectEx, которая, в свою очередь, передает управление основному коду потока, возвращая WAIT_IO_COMPLETION.

При получении управления рабочий поток должен проанализировать значение, возвращенное этой функцией. Если оно равно WAIT_IO_COMPLETION, то причиной выхода из функции WaitForSingleObjectEx было завершение обработки APC-запроса – поток при этом должен снова перейти в состояние ожидания события. Если же возвращается значение WAIT_OBJECT_0, то причиной выхода была установка события в сигнальное состояние главным потоком приложения. При этом рабочий поток должен завершиться.

Это очень простая схема (например, рабочие потоки вместо ожидания могут выполнять какую-то другую полезную работу), но она довольно неплохо объясняет механизм использования APC для организации пула.

SetWaitableTimer

Эта функция появилась с версии 4.0. Она позволяет активировать таймер, который через заданный период времени в 100-наносекундных интервалах или при наступлении заданного абсолютного времени переходит в сигнальное состояние. Кроме этого, можно указать процедуру завершения, которая будет вызвана с помощью APC-запроса в данном потоке. Для процедуры завершения можно указать дополнительный параметр. Функция хороша тем, что она не привязана к окнам и циклу выборки сообщений, как, например, SetTimer. С ее помощью можно использовать таймеры в любых приложениях, включая консольные и сервисы. Однако у SetWaitableTimer есть и некоторые недостатки:

функция завершения всегда вызывается в потоке, вызвавшем SetWaitableTimer;

поток должен быть в состоянии тревожного ожидания, чтобы обработать APC-запрос;

второй APC-запрос начнет обрабатываться только после окончания обработки предыдущего запроса, то есть запросы обрабатываются последовательно.

Все эти проблемы решает объект "очередь таймеров", о котором речь пойдет позже.

Порт завершения ввода/вывода

Это, безусловно, один из самых мощных и сложных объектов исполнительной системы. Он специально предназначен для оптимизации обработки клиентских запросов в серверных приложениях. Он не только организует очередь запросов, но и эффективно управляет их обработкой.

Основная идея порта завершения ввода/вывода (в дальнейшем просто порта) состоит в том, чтобы эффективно расходовать процессорное время при обработке клиентских запросов. Обработка должна вестись параллельно несколькими потоками, но строго до определенного момента, когда число потоков будет равняться максимальному значению. После этого новые потоки перестают создаваться, а запросы ставятся в очередь к существующим потокам. Давайте разберемся в этом поподробнее.

Как работает порт

При создании порта указывается максимальное количество активных потоков, способных обрабатывать клиентские запросы параллельно. Так как количество реально работающих параллельно потоков на компьютере равно количеству процессоров, то указание большего максимального количества активных потоков не выгодно. Почему? Дело в том, что для исполнения нескольких потоков на одном процессоре, системе приходится постоянно переключать процессор между потоками, эмулируя, таким образом, параллельность, однако это переключение, называемое переключением контекстов – довольно дорогая операция. Избежать ее можно только одним способом – не создавать параллельно работающие потоки в количестве большем, чем число процессоров. Таким образом, при создании порта, казалось бы, нужно указывать в качестве максимального количества активных потоков число процессоров в системе, но здесь есть одна тонкость. Допустим, у нас однопроцессорный компьютер и, соответственно, клиентские запросы мы обрабатываем в одном потоке. Что будет, если клиентский запрос придет в момент выполнения синхронной операции с диском или в момент ожидания какого-либо объекта этим потоком? Он будет ждать, пока поток не закончит свою работу, но ведь процессор в это время бездействует, потому что поток заблокирован на синхронной операции или на каком-либо объекте. Когда процессор бездействует, а клиентский запрос не обрабатывается – это плохо. Мы приходим к выводу о том, что всегда должен существовать резервный поток, который подхватывал бы запросы в момент, когда «основной» поток выполняет блокирующие операции, и процессор бездействует.

Работа с файлами (в самом широком смысле слова) очень тесно связана с многопоточностью и обработкой запросов на сервере. Сокет или pipe – это тоже файлы. Чтобы обрабатывать запросы через эти каналы параллельно, нужен порт. Давайте рассмотрим функцию создания порта и связи его с файлом (зачем-то разработчики из Microsoft объединили две эти функции в одну; в исполнительной системе эти две функции выполняют сервисы NtCreateIoCompletion и NtSetInformationFile, соответственно).

HANDLE CreateIoCompletionPort (

  HANDLE FileHandle,              // хендл файла

  HANDLE ExistingCompletionPort,  // хендл порта завершения ввода/вывода

  ULONG_PTR CompletionKey,        // ключ завершения

  DWORD NumberOfConcurrentThreads // максимальное число параллельных потоков

);

Для простого создания порта нужно в качестве первого параметра передать INVALID_HANDLE_VALUE, а в качестве второго и третьего – 0. Для связывания файла с портом нужно указать первые три параметра и проигнорировать четвертый.

После того, как файл (под файлом здесь подразумевается объект подсистемы Win32, который реализуется с помощью объекта "файл исполнительной системы", к таковым относятся файлы, сокеты, почтовые ящики, именованные каналы и проч.) связан с портом, окончания всех асинхронных запросов ввода/вывода попадают в очередь порта и могут быть обработаны пулом потоков. Следующие функции могут быть использованы с портом завершения для обработки асинхронных операций ввода/вывода:

ConnectNamedPipe – ожидает подключения клиента к именованному каналу.

DeviceIoControl – низкоуровневый ввод/вывод.

LockFileEx – блокировка региона файла.

ReadDirectoryChangesW – ожидание изменений в директории.

ReadFile – чтение файла.

TransactNamedPipe – Комбинированное чтение и запись по именованному каналу, осуществляемые за одну сетевую операцию.

WaitCommEvent – ожидание события последовательного интерфейса (СОМ-порт).

WriteFile – запись в файл.

Если вы не хотите, чтобы окончание асинхронного ввода/вывода обрабатывалось портом (например, когда вам не важен результат операции), нужно использовать следующий трюк [1]. Нужно установить поле hEvent структуры OVERLAPPED равным описателю события с установленным первым битом. Делается это примерно так:

OVERLAPPED ov = {0};

ov.hEvent = CreateEvent(...);

ov.hEvent = (HANDLE)((DWORD_PTR)(ov.hEvent) | 1);

И не забывайте сбрасывать младший бит при закрытии хендла события.

Добавлять поток к пулу (подключать его к обработке запросов) можно с помощью следующей функции:

BOOL GetQueuedCompletionStatus(

  // хендл порта завершения ввода/вывода

  HANDLE CompletionPort,

  // количество переданных байт

  LPDWORD lpNumberOfBytes,

  // ключ завершения

  PULONG_PTR lpCompletionKey,

  // структура OVERLAPPED

  LPOVERLAPPED *lpOverlapped,

  // значение таймаута

  DWORD dwMilliseconds

);

Эта функция блокирует поток до тех пор, пока порт не передаст потоку пакет запроса или не истечет таймаут.

Поместить пакет запроса в порт можно с помощью функции PostQueuedCompletionStatus.

BOOL PostQueuedCompletionStatus(

  HANDLE CompletionPort,            // хендл порта завершения ввода/вывода

  DWORD dwNumberOfBytesTransferred, // количество переданных байт

  ULONG_PTR dwCompletionKey,        // ключ завершения

  LPOVERLAPPED lpOverlapped         // структура OVERLAPPED

);

Пакет запроса не обязательно должен быть структурой OVERLAPPED или производной от нее [2].

Давайте соберем всю информацию воедино. Порт завершения – объект, организующий несколько очередей из клиентских запросов и потоков, их обрабатывающих. Поток добавляется в очередь ожидающих запрос потоков порта при вызове функции GetQueuedCompletionStatus. При поступлении запроса порт разблокирует первый поток в очереди ждущих потоков и передает ему этот запрос (в виде структуры OVERLAPPED и ключа завершения). Поток при этом перемещается в очередь активных потоков (число активных потоков увеличивается на 1). Предположим, у нас максимальное число активных потоков равно 1, тогда при поступлении следующего запроса другой поток из очереди ожидающих активирован не будет. После обработки клиентского запроса поток вновь вызывает GetQueuedCompletionStatus и ставится в начало списка ожидающих потоков. Почему поток ставится именно в начало списка? Дело в том, что потоки берутся из начала списка, и при низкой активности могут использоваться не все потоки. При этом стеки и контексты не используемых потоков могут быть выгружены на диск за ненадобностью.

Если в процессе обработки запроса поток обратился к блокирующей функции, число активных потоков уменьшается на 1, как если бы поток перешел снова в очередь ожидающих потоков. Это дает возможность при приходе следующего клиентского запроса задействовать следующий поток из очереди ожидающих. Когда первый поток закончит блокирующую операцию, число активных потоков превысит максимальное, и при следующем вызове функции GetQueuedCompletionStatus один из этих потоков заблокируется, а второй получит пакет запроса (если он имеется).

Очередь

Запись добавляется при:

Запись удаляется при:

Список устройств, ассоциированных с портом

вызове CreateIoCompletionPort

закрытии хенда файла

Очередь клиентских запросов (FIFO)

завершении асинхронной операции файла, ассоциированного с портом, или вызове функции PostQueuedCompletionStatus

передаче портом запроса потоку на обработку

Очередь ожидающих потоков

вызове функции GetQueuedCompletionStatus

начале обработки клиентского запроса потоком

Список работающих потоков

начале обработки клиентского запроса потоком

вызове потоком GetQueuedCompletionStatus или какую-либо блокирующей функции

Список приостановленных потоков

вызове потоком какой-либо блокирующей функции

выходе потока из какой-либо блокирующей функции

Таблица 1. Список очередей порта завершения ввода/вывода [1].

Недокументированные возможности порта и его низкоуровневое устройство

Как всегда это бывает у Microsoft, порт завершения обладает многими недокументированными возможностями:

У порта завершения ввода/вывода может быть имя, и соответственно, он доступен для других процессов. Совершенно непонятно, почему разработчики решили скрыть эту, на мой взгляд, нужную особенность порта. Имя можно задать в параметре ObjectAttributes функции NtCreateIoCompletion.

Вторая особенность вытекает из первой: с портом может быть связан дескриптор безопасности, который также задается в параметре ObjectAttributes функции NtCreateIoCompletion.

Открывается порт с помощью функции NtOpenIoCompletion. При вызове функции нужно указать имя порта и уровень доступа. В качестве уровня доступа можно указывать все стандартные и следующие специальные права [2] (таблица 2).

Символическое обозначение

Константа

Описание

IO_COMPLETION_QUERY_STATE

1

Необходим для запроса состояния объекта "порт"

IO_COMPLETION_MODIFY_STATE

2

Необходим для изменения состояния объекта "порт"

Таблица 2.

У порта можно запрашивать количество необработанных запросов с помощью функции NtQueryIoCompletion. Хотя в [3] утверждается, что эта функция определяет, находится ли порт в сигнальном состоянии, на самом деле она возвращает количество клиентских запросов в очереди. Это довольно важная информация, которую почему-то опять решили от нас скрыть.

Давайте более детально рассмотрим, как создается и функционирует порт завершения ввода/вывода [4].

При создании порта функцией CreateIoCompletionPort вызывается внутренний сервис NtCreateIoCompletion. Объект "порт" представлен следующей структурой [5]:

typedef stuct _IO_COMPLETION

{

  KQUEUE Queue;

} IO_COMPLETION;

То есть, по существу, объект "порт завершения" является объектом "очередь исполнительной системы" (KQUEUE). Вот как представлена очередь:

typedef stuct _KQUEUE

{

  DISPATCHER_HEADER Header;

  LIST_ENTRY EnrtyListHead;     //очередь пакетов

  DWORD CurrentCount;

  DWORD MaximumCount;

  LIST_ENTRY ThreadListHead;   //очередь ожидающих потоков

} KQUEUE;

Итак, для порта выделяется память, и затем происходит его инициализация с помощью функции KeInitializeQueue. (все, что касается такого супернизкого устройства порта, взято из [4], остальное – из DDK и [3]).

Когда происходит связывание порта с объектом "файл", Win32-функция CreateIoCompletionPort вызывает NtSetInformationFile. Класс информации для этой функции устанавливается как FileCompletionInformation, а в качестве параметра FileInformation передается указатель на структуру IO_COMPLETION_CONTEXT [5] или FILE_COMPLETION_INFORMATION [3].

typedef struct _IO_COMPLETION_CONTEXT

{

  PVOID Port;

  PVOID Key;

} IO_COMPLETION_CONTEXT;

typedef struct _FILE_COMPLETION_INFORMATION

{

  HANDLE IoCompletionHandle;

  ULONG CompletionKey;

} FILE_COMPLETION_INFORMATION, *PFILE_COMPLETION_INFORMATION;

Указатель на эту структуру заносится в поле CompletionConext структуры FILE_OBJECT (смещение 0x6C).

После завершения асинхронной операции ввода/вывода для ассоциированного файла диспетчер ввода/вывода проверяет поле CompletionConext и, если оно не равно 0, создает пакет запроса (из структуры OVERLAPPED и ключа завершения) и помещает его в очередь с помощью вызова KeInsertQueue. Когда поток вызывает функцию GetQueuedCompletionStatus, на самом деле вызывается функция NtRemoveIoCompletion. NtRemoveIoCompletion проверяет параметры и вызывает функцию KeRemoveQueue, которая блокирует поток, если в очереди отсутствуют запросы, или поле CurrentCount структуры KQUEUE больше или равно MaximumCount. Если запросы есть, и число активных потоков меньше максимального, KeRemoveQueue удаляет вызвавший ее поток из очереди ожидающих потоков и увеличивает число активных потоков на 1. При занесении потока в очередь ожидающих потоков поле Queue структуры KTHREAD (смещение 0xE0) устанавливается равным адресу очереди (порта завершения). Зачем это нужно? Когда вызываются функции блокировки потока (WaitForSingleObject и др.), планировщик проверяет это поле, и если оно не равно 0, вызывает функцию KeActivateWaiterQueue, которая уменьшает число активных потоков порта на 1. Когда поток пробуждается после вызова блокирующих функций, планировщик выполняет те же действия, только вызывает при этом функцию KeUnwaitThread, которая увеличивает счетчик активных потоков на 1.

Когда вы помещаете запрос в порт завершения функцией PostQueuedCompletionStatus, на самом деле вызывается функция NtSetIoCompletion, которая после проверки параметров и преобразования хендла порта в указатель, вызывает KeInsertQueue.

Организуем пул

Итак, мы знаем, как работает порт завершения ввода/вывода, когда потоки добавляются в пул и когда удаляются. Но сколько потоков должно быть в пуле? В два раза больше, чем число процессоров. Это очень общая рекомендация, и для некоторых задач она не подходит. По большому счету имеется только два критерия, по которым можно определять, нужно создавать новый поток или нет. Эти критерии – загруженность процессора и число пакетов запросов. Если число пакетов превышает определенное количество, и загруженность процессора невысока, есть смысл создать новый поток. Если пакетов мало, или процессор занят более чем на 90 процентов, дополнительный поток создавать не следует. Удалять поток из пула нужно, если он давно не обрабатывал клиентские запросы (просто подсчитать, сколько раз GetQueuedCompletionStatus вернула управление по таймауту). При удалении потока нужно следить, чтобы закончились все асинхронные операции ввода/вывода, начатые этим потоком.

Надо сказать, что определение загруженности процессора, количества пакетов в очереди порта и наличия у потока незавершенных операций ввода/вывода – задачи не самые простые. Например, вы можете использовать WMI для определения загруженности процессора, но при этом не сможете определить, есть ли у потока незавершенные операции ввода/вывода. Ниже я приведу функции получения вышеперечисленных показателей только недокументированными способами (здесь используется заголовочный файл ntdll.h из [3]):

// Функция получения загруженности процессора

double GetCPUUsage()

{

  #define Li2Double(x) ((double)((x).HighPart) * 4.294967296E9

    + (double)((x).LowPart))

  

  typedef NTSTATUS (NTAPI ZwQuerySystemInformation_t)(

    IN NT::SYSTEM_INFORMATION_CLASS SystemInformationClass,

    OUT PVOID SystemInformation,

    IN ULONG SystemInformationLength,

    OUT PULONG ReturnLength OPTIONAL

  );

  static ZwQuerySystemInformation_t* ZwQuerySystemInformation = 0;

  if(!ZwQuerySystemInformation)

  {

    ZwQuerySystemInformation = (ZwQuerySystemInformation_t*)GetProcAddress(

      GetModuleHandle(_T("ntdll.dll")), _T("NtQuerySystemInformation"));

  }

  double dbIdleTime = 0;

  static NT::LARGE_INTEGER liOldIdleTime = {0, 0};

  static NT::LARGE_INTEGER liOldSystemTime = {0, 0};

  // Получаем число процессоров

  NT::SYSTEM_BASIC_INFORMATION sysinfo = {0};

  NT::NTSTATUS status = ZwQuerySystemInformation(NT::SystemBasicInformation,

    &sysinfo, sizeof sysinfo, 0);

   

  if(status != NO_ERROR)

        return -1;

   

  // Получаем системное время

  NT::SYSTEM_TIME_OF_DAY_INFORMATION timeinfo = {0};

    status = ZwQuerySystemInformation(NT::SystemTimeOfDayInformation,

    &timeinfo, sizeof timeinfo, 0);

   

  if(status!=NO_ERROR)

        return -1;

  // Получаем время простоя

  NT::SYSTEM_PERFORMANCE_INFORMATION perfinfo = {0};

    status = ZwQuerySystemInformation(NT::SystemPerformanceInformation,

    &perfinfo, sizeof perfinfo, 0);

   

  if(status != NO_ERROR)

        return -1;

  // если это первый вызов, значение вычислить нельзя

  if(liOldIdleTime.QuadPart != 0)

  {

    // Время простоя

    dbIdleTime = Li2Double(perfinfo.IdleTime) - Li2Double(liOldIdleTime);

    // Системное время

    const double dbSystemTime = Li2Double(timeinfo.CurrentTime)

      - Li2Double(liOldSystemTime);

    dbIdleTime = dbIdleTime / dbSystemTime;

    dbIdleTime = 100.0 - dbIdleTime * 100.0

      / (double)sysinfo.NumberProcessors + 0.5;

  }

  // сохраняем полученные значения

  liOldIdleTime = perfinfo.IdleTime;

  liOldSystemTime = timeinfo.CurrentTime;

  // Если это первый вызов, получаем загруженность CPU за последние

  // 200 милисекунд

  if(dbIdleTime == 0)

  {

    Sleep(200);

    dbIdleTime = GetCPUUsage();

  }

 

  return dbIdleTime;

}

// Возвращает true, если поток имеет незавершенные операции ввода/вывода

bool HasThreadIoPending(HANDLE hThread = GetCurrentThread())

{

  typedef NTSTATUS (NTAPI ZwQueryInformationThread_t)(

    IN HANDLE ThreadHandle,

    IN NT::THREADINFOCLASS ThreadInformationClass,

    OUT PVOID ThreadInformation,

    IN ULONG ThreadInformationLength,

    OUT PULONG ReturnLength OPTIONAL

  );

  static ZwQueryInformationThread_t* ZwQueryInformationThread = 0;

  if(!ZwQueryInformationThread)

  {

    ZwQueryInformationThread = (ZwQueryInformationThread_t*)GetProcAddress(

      GetModuleHandle(_T("ntdll.dll")), _T("NtQueryInformationThread"));

  }

  ULONG io = 0;

  ZwQueryInformationThread(hThread, NT::ThreadIsIoPending, &io, 4, 0);

  return io > 0;

}

// Возвращает количество необработанных запросов в очереди порта

DWORD GetIoCompletionLen(HANDLE hIoPort)

{

  typedef NTSTATUS (NTAPI ZwQueryIoCompletion_t)(

    IN HANDLE IoCompletionHandle,

    IN NT::IO_COMPLETION_INFORMATION_CLASS IoCompletionInformationClass,

    OUT PVOID IoCompletionInformation,

    IN ULONG IoCompletionInformationLength,

    OUT PULONG ResultLength OPTIONAL

  );

  static ZwQueryIoCompletion_t* ZwQueryIoCompletion = 0;

  if(!ZwQueryIoCompletion)

  {

    ZwQueryIoCompletion = (ZwQueryIoCompletion_t*)GetProcAddress(

      GetModuleHandle(_T("ntdll.dll")), _T("NtQueryIoCompletion"));

  }

  NT::IO_COMPLETION_BASIC_INFORMATION ioinfo = {0};

  DWORD dwRetLen = 0;

  ZwQueryIoCompletion(hIoPort, NT::IoCompletionBasicInformation,

    &ioinfo, sizeof ioinfo, &dwRetLen);

 

  return ioinfo.SignalState;

}

Как видите, не простое это дело – создавать эффективный пул потоков, однако кое-что ребята из Microsoft могут нам предложить. В Windows 2000 появились новые функции, которые полностью берут на себя всю черновую работу по созданию и удалению потоков в пуле. О них – следующий раздел.

Встроенная поддержка пула потоков

В Windows 2000 появились новые функции, которые условно можно разделить на четыре группы:

помещение запроса в очередь;

вызов функции при окончании асинхронной операции ввода/вывода;

периодический вызов функции;

вызов функции при переходе объекта в сигнальное состояние.

Рассмотрим их по порядку.

Помещение запроса в очередь

Передать на выполнение потоку из пула какую-либо функцию можно с помощью сервиса QueueUserWorkItem. Эта с виду простая функция делает очень много: она создает порт завершения ввода/вывода, создает и уничтожает потоки в пуле и многое другое. Вот ее описание:

BOOL QueueUserWorkItem(

  LPTHREAD_START_ROUTINE Function,  // адрес функции

  PVOID Context,                    // произвольный параметр

  ULONG Flags                       // флаги выполнения

);

QueueUserWorkItem помещает пакет запроса в виде адреса функции и произвольного параметра в очередь запросов порта завершения и сразу же возвращает управление. Вот как выглядит функция, которая будет вызвана одним из потоков в пуле:

DWORD WINAPI ThreadProc(

  LPVOID lpParameter   // произвольный параметр

);

Ее прототип ничем не отличается от стартовой процедуры потока, так что здесь вам все должно быть ясно. Гораздо интереснее знать, что скрывается внутри функции QueueUserWorkItem. Давайте разбираться.

При первом помещении запроса количество потоков в пуле равно нулю, так что QueueUserWorkItem приходится создавать поток и порт завершения. Затем в порт помещается пакет запроса, а поток вызывает функцию GetQueuedCompletionStatus. После обработки запроса поток не разрушается, а остается еще некоторое время в пуле, так что следующий запрос обработается намного быстрее. Если вы отправляете запросы слишком часто, и количество необработанных пакетов увеличивается, QueueUserWorkItem создаст для вызова функции новый поток. Максимальное количество потоков в пуле равно количеству процессоров, что не очень хорошо, но есть способ заставить функцию всегда создавать новый поток.

ПРИМЕЧАНИЕ

Те из вас, кто читал статью Дж. Рихтера «New Windows 2000 Pooling Functions Greatly Simplify Thread Management» из апрельского MSJ за 1999 год, могут поспорить со мной насчет размера пула. В статье указывается, что количество потоков в нем равно удвоенному количеству процессоров в системе, однако это не так. Вы можете собственноручно в этом убедиться, поставив breakpoint на функцию _RtlpInitializeWorkerThreadPool (адрес 0x77FA95CD на Windows 2000 Professional SP3) и вызвав функцию QueueUserWorkItem.

Рассмотрим флаги функции QueueUserWorkItem.

Константа

Значение

Описание

WT_EXECUTEDEFAULT

0

Запрос помещается в простой рабочий поток

WT_EXECUTEINIOTHREAD

1

Запрос помещается в поток ввода/вывода

WT_EXECUTEINPERSISTENTTHREAD

0x80

Запрос помещается в поток, который

 
     
Бесплатные рефераты
 
Банк рефератов
 
Бесплатные рефераты скачать
| мероприятия при чрезвычайной ситуации | Чрезвычайная ситуация | аварийно-восстановительные работы при ЧС | аварийно-восстановительные мероприятия при ЧС | Интенсификация изучения иностранного языка с использованием компьютерных технологий | Лыжный спорт | САИД Ахмад | экономическая дипломатия | Влияние экономической войны на глобальную экономику | экономическая война | экономическая война и дипломатия | Экономический шпионаж | АК Моор рефераты | АК Моор реферат | ноосфера ба забони точики | чесменское сражение | Закон всемирного тяготения | рефераты темы | иохан себастиян бах маълумот | Тарых | шерхо дар борат биология | скачать еротик китоб | Семетей | Караш | Influence of English in mass culture дипломная | Количественные отношения в английском языках | 6466 | чистонхои химия | Гунны | Чистон
 
Рефераты Онлайн
 
Скачать реферат
 
 
 
 
  Все права защищены. Бесплатные рефераты и сочинения. Коллекция бесплатных рефератов! Коллекция рефератов!