Stream handler DLLs create worker threads for each stream created. A worker thread, for example, would consist of code to request a buffer from the Sync/Stream Manager, fill it with data from a device and return it to the Sync/Stream Manager. This would continue until the end of stream or until some other kind of stream stop.
A worker thread does all the work of the stream handler itself. It is a good idea to create a worker thread for each stream instance. Basically, a worker thread either loops in the read routine or loops in the write routine. This depends on whether it is a source or target. If it is a source, the thread loops in the read routine and reads from its device using whatever commands it uses to interface with its device. It also interfaces with the Sync/Stream Manager requesting empty buffers and returning full buffers. On the other hand, when the DLL stream handler is the target, the thread will also be in a big loop, but it is performing different operations. It will be requesting full buffers from the Sync/Stream Manager and then consuming those buffers by passing them off to the device (in whatever way it communicates with its device).
#include <os2.h> #include <os2me.h> #include <hhpheap.h> #include <shi.h> PSIB psib; /* Stream Instance Block */ { /* Start of FsshRead */ RC rc = NO_ERROR; /* local return code */ char NeedBuf; /* local loop Boolean */ LONG NumBytesIO; /* Number of bytes from mmio */ PARM_NOTIFY npget, npret; /* parms for SMH_NOTIFY calls */ SRCBUFTAB SrcBufTab = {0}; /* Source buffer table */ ULONG ulStopped = DO_SLEEP; /* did we get stop disc or flush */ BOOL bAtEOS = FALSE; /* End of Stream indicator */ ULONG ulPostCount; /* Temp to hold count */ /* Before we start lets do some init stuff: */ npget.ulFunction = SMH_NOTIFY; npget.hid = psib->HandlerID; npget.hstream = psib->hStream; npget.ulGetNumEntries = 1L; npget.ulRetNumEntries = 0L; npget.pGetBufTab = &SrcBufTab; npget.pRetBufTab = NULL; npret.ulFunction = SMH_NOTIFY; npret.hid = psib->HandlerID; npret.hstream = psib->hStream; npret.ulFlags = BUF_RETURNFULL; npret.ulGetNumEntries = 0L; npret.ulRetNumEntries = 1L; npret.pGetBufTab = NULL; npret.pRetBufTab = &SrcBufTab; /* Wait until we get the ShcStart */ DosWaitEventSem(psib->hevStop, SEM_INDEFINITE_WAIT); /* We will loop forever getting an empty buffer, calling the device to */ /* fill up the buffer, sending it to the consumer. During each */ /* iteration of the loop we will check the action flags for */ /* asynchronous requests to do things. */ if (psib->ulActionFlags & SIBACTFLG_KILL) { /* Must have been a create error */ rc = 1L; } /* Must have been a create error */ /* Start the main loop */ while (!rc) { /* while no severe error */ if (psib->ulActionFlags) rc = CheckNSleep(psib); /* * Get a buffer */ NeedBuf = TRUE; while ((NeedBuf) && (!rc)) { /* while we don't have a buffer */ /* Clear the stop sem, so if after we call ssm to get a buffer if */ /* it returns none avail then we won't miss a SSMBuffer Start */ /* before we go to sleep. */ DosResetEventSem(psib->hevStop, &ulPostCount); npget.ulFlags = BUF_GETEMPTY; rc = SMHEntryPoint(&npget); /* get a buffer */ if (!rc) { NeedBuf = FALSE; /* make sure attribute is 0 so we don't pass around a bad value */ SrcBufTab.ulMessageParm = 0L; } else { /* return code from smhnotify */ if (rc == ERROR_BUFFER_NOT_AVAILABLE) { /* buffer not available */ /* the smhnotify resets the num entries to 0 when none avail */ npget.ulGetNumEntries = 1L; ulStopped = DO_SLEEP; rc = SleepNCheck(psib, &ulStopped); } /* buffer not available */ } /* return code from smhnotify */ } /* while we don't have a buffer */ /* We have a buffer or an error */ if (!rc) { /* have a buffer - do the read */ NumBytesIO = mmioRead((HMMIO)psib->ulAssocP1, (PCHAR)SrcBufTab.pBuffer, (LONG)SrcBufTab.ulLength); if (NumBytesIO == -1L) { /* an error */ SrcBufTab.ulLength = 0L; /* get the real error code */ rc = mmioGetLastError((HMMIO)psib->ulAssocP1); rc = ShIOError(psib, npret, rc); } /* an error */ else { /* We have some data */ if (NumBytesIO != (LONG)SrcBufTab.ulLength) { /* End of stream */ npret.ulFlags |= BUF_EOS; bAtEOS = TRUE; DosResetEventSem(psib->hevStop, &ulPostCount); SrcBufTab.ulLength = NumBytesIO; } /* End of stream */ /* Send the data to the stream manager */ rc = SMHEntryPoint(&npret); if (!rc) { /* data sent ok */ if (bAtEOS) { bAtEOS = FALSE; ulStopped = DO_SLEEP; rc = SleepNCheck(psib, &ulStopped); } } /* data sent ok */ } /* We have some data */ /* Clear the EOS if it was set. And attribute */ npret.ulFlags = BUF_RETURNFULL; SrcBufTab.ulMessageParm = 0L; } /* have a buffer - do the read */ } /* while no severe error */ /* We get here if an error has occurred or a kill has */ /* been sent. In the case of the kill, reset the */ /* return code to 0 (no error) and exit the thread. */ /* Otherwise, report the error event and exit the */ /* thread. */ if (psib->ulActionFlags & SIBACTFLG_KILL) { rc = 0L; } else { ReportEvent(psib, rc, /* Return code */ EVENT_ERROR, /* event type */ 0L, /* user info */ NONRECOVERABLE_ERROR); /* Severe Error */ } /* Only set this flag when we no longer need access to the sib since */ /* Destroy may get control and Free the sib. */ psib->ulActionFlags |= SIBACTFLG_THREAD_DEAD; return; } /* End of FsshRead */