blob: 7aae5f2b990c3cee3fb17338c9ae72c4a5cb8ccb [file] [log] [blame]
Scott Andersonb0114cb2012-04-09 14:08:22 -07001// Copyright 2006 Google Inc. All Rights Reserved.
2
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6
7// http://www.apache.org/licenses/LICENSE-2.0
8
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// worker.h : worker thread interface
16
17// This file contains the Worker Thread class interface
18// for the SAT test. Worker Threads implement a repetative
19// task used to test or stress the system.
20
21#ifndef STRESSAPPTEST_WORKER_H_
22#define STRESSAPPTEST_WORKER_H_
23
24#include <pthread.h>
25
26#include <sys/time.h>
27#include <sys/types.h>
28
29#include <libaio.h>
30
31#include <queue>
32#include <set>
33#include <string>
34#include <vector>
35
36// This file must work with autoconf on its public version,
37// so these includes are correct.
38#include "disk_blocks.h"
39#include "queue.h"
40#include "sattypes.h"
41
42
43// Global Datastruture shared by the Cache Coherency Worker Threads.
44struct cc_cacheline_data {
45 int *num;
46};
47
48// Typical usage:
49// (Other workflows may be possible, see function comments for details.)
50// - Control thread creates object.
51// - Control thread calls AddWorkers(1) for each worker thread.
52// - Control thread calls Initialize().
53// - Control thread launches worker threads.
54// - Every worker thread frequently calls ContinueRunning().
55// - Control thread periodically calls PauseWorkers(), effectively sleeps, and
56// then calls ResumeWorkers().
57// - Some worker threads may exit early, before StopWorkers() is called. They
58// call RemoveSelf() after their last call to ContinueRunning().
59// - Control thread eventually calls StopWorkers().
60// - Worker threads exit.
61// - Control thread joins worker threads.
62// - Control thread calls Destroy().
63// - Control thread destroys object.
64//
65// Threadsafety:
66// - ContinueRunning() may be called concurrently by different workers, but not
67// by a single worker.
68// - No other methods may ever be called concurrently, with themselves or
69// eachother.
70// - This object may be used by multiple threads only between Initialize() and
71// Destroy().
72//
73// TODO(matthewb): Move this class and its unittest to their own files.
74class WorkerStatus {
75 public:
76 //--------------------------------
77 // Methods for the control thread.
78 //--------------------------------
79
80 WorkerStatus() : num_workers_(0), status_(RUN) {}
81
82 // Called by the control thread to increase the worker count. Must be called
83 // before Initialize(). The worker count is 0 upon object initialization.
84 void AddWorkers(int num_new_workers) {
85 // No need to lock num_workers_mutex_ because this is before Initialize().
86 num_workers_ += num_new_workers;
87 }
88
89 // Called by the control thread. May not be called multiple times. If
90 // called, Destroy() must be called before destruction.
91 void Initialize();
92
93 // Called by the control thread after joining all worker threads. Must be
94 // called iff Initialize() was called. No methods may be called after calling
95 // this.
96 void Destroy();
97
98 // Called by the control thread to tell the workers to pause. Does not return
99 // until all workers have called ContinueRunning() or RemoveSelf(). May only
100 // be called between Initialize() and Stop(). Must not be called multiple
101 // times without ResumeWorkers() having been called inbetween.
102 void PauseWorkers();
103
104 // Called by the control thread to tell the workers to resume from a pause.
105 // May only be called between Initialize() and Stop(). May only be called
106 // directly after PauseWorkers().
107 void ResumeWorkers();
108
109 // Called by the control thread to tell the workers to stop. May only be
110 // called between Initialize() and Destroy(). May only be called once.
111 void StopWorkers();
112
113 //--------------------------------
114 // Methods for the worker threads.
115 //--------------------------------
116
117 // Called by worker threads to decrease the worker count by one. May only be
118 // called between Initialize() and Destroy(). May wait for ResumeWorkers()
119 // when called after PauseWorkers().
120 void RemoveSelf();
121
122 // Called by worker threads between Initialize() and Destroy(). May be called
123 // any number of times. Return value is whether or not the worker should
124 // continue running. When called after PauseWorkers(), does not return until
125 // ResumeWorkers() or StopWorkers() has been called. Number of distinct
126 // calling threads must match the worker count (see AddWorkers() and
127 // RemoveSelf()).
128 bool ContinueRunning();
129
130 // TODO(matthewb): Is this functionality really necessary? Remove it if not.
131 //
132 // This is a hack! It's like ContinueRunning(), except it won't pause. If
133 // any worker threads use this exclusively in place of ContinueRunning() then
134 // PauseWorkers() should never be used!
135 bool ContinueRunningNoPause();
136
137 private:
138 enum Status { RUN, PAUSE, STOP };
139
140 void WaitOnPauseBarrier() {
141 int error = pthread_barrier_wait(&pause_barrier_);
142 if (error != PTHREAD_BARRIER_SERIAL_THREAD)
143 sat_assert(error == 0);
144 }
145
146 void AcquireNumWorkersLock() {
147 sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
148 }
149
150 void ReleaseNumWorkersLock() {
151 sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
152 }
153
154 void AcquireStatusReadLock() {
155 sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
156 }
157
158 void AcquireStatusWriteLock() {
159 sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
160 }
161
162 void ReleaseStatusLock() {
163 sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
164 }
165
166 Status GetStatus() {
167 AcquireStatusReadLock();
168 Status status = status_;
169 ReleaseStatusLock();
170 return status;
171 }
172
173 // Returns the previous status.
174 Status SetStatus(Status status) {
175 AcquireStatusWriteLock();
176 Status prev_status = status_;
177 status_ = status;
178 ReleaseStatusLock();
179 return prev_status;
180 }
181
182 pthread_mutex_t num_workers_mutex_;
183 int num_workers_;
184
185 pthread_rwlock_t status_rwlock_;
186 Status status_;
187
188 // Guaranteed to not be in use when (status_ != PAUSE).
189 pthread_barrier_t pause_barrier_;
190
191 DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
192};
193
194
195// This is a base class for worker threads.
196// Each thread repeats a specific
197// task on various blocks of memory.
198class WorkerThread {
199 public:
200 // Enum to mark a thread as low/med/high priority.
201 enum Priority {
202 Low,
203 Normal,
204 High,
205 };
206 WorkerThread();
207 virtual ~WorkerThread();
208
209 // Initialize values and thread ID number.
210 virtual void InitThread(int thread_num_init,
211 class Sat *sat_init,
212 class OsLayer *os_init,
213 class PatternList *patternlist_init,
214 WorkerStatus *worker_status);
215
216 // This function is DEPRECATED, it does nothing.
217 void SetPriority(Priority priority) { priority_ = priority; }
218 // Spawn the worker thread, by running Work().
219 int SpawnThread();
220 // Only for ThreadSpawnerGeneric().
221 void StartRoutine();
222 bool InitPriority();
223
224 // Wait for the thread to complete its cleanup.
225 virtual bool JoinThread();
226 // Kill worker thread with SIGINT.
227 virtual bool KillThread();
228
229 // This is the task function that the thread executes.
230 // This is implemented per subclass.
231 virtual bool Work();
232
233 // Starts per-WorkerThread timer.
234 void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
235 // Reads current timer value and returns run duration without recording it.
236 int64 ReadThreadTimer() {
237 struct timeval end_time_;
238 gettimeofday(&end_time_, NULL);
239 return (end_time_.tv_sec - start_time_.tv_sec)*1000000 +
240 (end_time_.tv_usec - start_time_.tv_usec);
241 }
242 // Stops per-WorkerThread timer and records thread run duration.
243 // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
244 // is effectively paused and restarted, so runduration_usec accumulates on.
245 void StopThreadTimer() {
246 runduration_usec_ += ReadThreadTimer();
247 }
248
249 // Acccess member variables.
250 bool GetStatus() {return status_;}
251 int64 GetErrorCount() {return errorcount_;}
252 int64 GetPageCount() {return pages_copied_;}
253 int64 GetRunDurationUSec() {return runduration_usec_;}
254
255 // Returns bandwidth defined as pages_copied / thread_run_durations.
256 virtual float GetCopiedData();
257 // Calculate worker thread specific copied data.
258 virtual float GetMemoryCopiedData() {return 0;}
259 virtual float GetDeviceCopiedData() {return 0;}
260 // Calculate worker thread specific bandwidth.
261 virtual float GetMemoryBandwidth()
262 {return GetMemoryCopiedData() / (
263 runduration_usec_ * 1.0 / 1000000);}
264 virtual float GetDeviceBandwidth()
265 {return GetDeviceCopiedData() / (
266 runduration_usec_ * 1.0 / 1000000);}
267
268 void set_cpu_mask(cpu_set_t *mask) {
269 memcpy(&cpu_mask_, mask, sizeof(*mask));
270 }
271
272 void set_cpu_mask_to_cpu(int cpu_num) {
273 cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
274 }
275
276 void set_tag(int32 tag) {tag_ = tag;}
277
278 // Returns CPU mask, where each bit represents a logical cpu.
279 bool AvailableCpus(cpu_set_t *cpuset);
280 // Returns CPU mask of CPUs this thread is bound to,
281 bool CurrentCpus(cpu_set_t *cpuset);
282 // Returns Current Cpus mask as string.
283 string CurrentCpusFormat() {
284 cpu_set_t current_cpus;
285 CurrentCpus(&current_cpus);
286 return cpuset_format(&current_cpus);
287 }
288
289 int ThreadID() {return thread_num_;}
290
291 // Bind worker thread to specified CPU(s)
292 bool BindToCpus(const cpu_set_t *cpuset);
293
294 protected:
295 // This function dictates whether the main work loop
296 // continues, waits, or terminates.
297 // All work loops should be of the form:
298 // do {
299 // // work.
300 // } while (IsReadyToRun());
301 virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); }
302 // TODO(matthewb): Is this function really necessary? Remove it if not.
303 //
304 // Like IsReadyToRun(), except it won't pause.
305 virtual bool IsReadyToRunNoPause() {
306 return worker_status_->ContinueRunningNoPause();
307 }
308
309 // These are functions used by the various work loops.
310 // Pretty print and log a data miscompare.
311 virtual void ProcessError(struct ErrorRecord *er,
312 int priority,
313 const char *message);
314
315 // Compare a region of memory with a known data patter, and report errors.
316 virtual int CheckRegion(void *addr,
317 class Pattern *pat,
318 int64 length,
319 int offset,
320 int64 patternoffset);
321
322 // Fast compare a block of memory.
323 virtual int CrcCheckPage(struct page_entry *srcpe);
324
325 // Fast copy a block of memory, while verifying correctness.
326 virtual int CrcCopyPage(struct page_entry *dstpe,
327 struct page_entry *srcpe);
328
329 // Fast copy a block of memory, while verifying correctness, and heating CPU.
330 virtual int CrcWarmCopyPage(struct page_entry *dstpe,
331 struct page_entry *srcpe);
332
333 // Fill a page with its specified pattern.
334 virtual bool FillPage(struct page_entry *pe);
335
336 // Copy with address tagging.
337 virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
338 uint64 *srcmem64,
339 unsigned int size_in_bytes,
340 AdlerChecksum *checksum,
341 struct page_entry *pe);
342 // SSE copy with address tagging.
343 virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
344 uint64 *srcmem64,
345 unsigned int size_in_bytes,
346 AdlerChecksum *checksum,
347 struct page_entry *pe);
348 // Crc data with address tagging.
349 virtual bool AdlerAddrCrcC(uint64 *srcmem64,
350 unsigned int size_in_bytes,
351 AdlerChecksum *checksum,
352 struct page_entry *pe);
353 // Setup tagging on an existing page.
354 virtual bool TagAddrC(uint64 *memwords,
355 unsigned int size_in_bytes);
356 // Report a mistagged cacheline.
357 virtual bool ReportTagError(uint64 *mem64,
358 uint64 actual,
359 uint64 tag);
360 // Print out the error record of the tag mismatch.
361 virtual void ProcessTagError(struct ErrorRecord *error,
362 int priority,
363 const char *message);
364
365 // A worker thread can yield itself to give up CPU until it's scheduled again
366 bool YieldSelf();
367
368 protected:
369 // General state variables that all subclasses need.
370 int thread_num_; // Thread ID.
371 volatile bool status_; // Error status.
372 volatile int64 pages_copied_; // Recorded for memory bandwidth calc.
373 volatile int64 errorcount_; // Miscompares seen by this thread.
374
375 cpu_set_t cpu_mask_; // Cores this thread is allowed to run on.
376 volatile uint32 tag_; // Tag hint for memory this thread can use.
377
378 bool tag_mode_; // Tag cachelines with vaddr.
379
380 // Thread timing variables.
381 struct timeval start_time_; // Worker thread start time.
382 volatile int64 runduration_usec_; // Worker run duration in u-seconds.
383
384 // Function passed to pthread_create.
385 void *(*thread_spawner_)(void *args);
386 pthread_t thread_; // Pthread thread ID.
387 Priority priority_; // Worker thread priority.
388 class Sat *sat_; // Reference to parent stest object.
389 class OsLayer *os_; // Os abstraction: put hacks here.
390 class PatternList *patternlist_; // Reference to data patterns.
391
392 // Work around style guide ban on sizeof(int).
393 static const uint64 iamint_ = 0;
394 static const int wordsize_ = sizeof(iamint_);
395
396 private:
397 WorkerStatus *worker_status_;
398
399 DISALLOW_COPY_AND_ASSIGN(WorkerThread);
400};
401
402// Worker thread to perform File IO.
403class FileThread : public WorkerThread {
404 public:
405 FileThread();
406 // Set filename to use for file IO.
407 virtual void SetFile(const char *filename_init);
408 virtual bool Work();
409
410 // Calculate worker thread specific bandwidth.
411 virtual float GetDeviceCopiedData()
412 {return GetCopiedData()*2;}
413 virtual float GetMemoryCopiedData();
414
415 protected:
416 // Record of where these pages were sourced from, and what
417 // potentially broken components they passed through.
418 struct PageRec {
419 struct Pattern *pattern; // This is the data it should contain.
420 void *src; // This is the memory location the data was sourced from.
421 void *dst; // This is where it ended up.
422 };
423
424 // These are functions used by the various work loops.
425 // Pretty print and log a data miscompare. Disks require
426 // slightly different error handling.
427 virtual void ProcessError(struct ErrorRecord *er,
428 int priority,
429 const char *message);
430
431 virtual bool OpenFile(int *pfile);
432 virtual bool CloseFile(int fd);
433
434 // Read and write whole file to disk.
435 virtual bool WritePages(int fd);
436 virtual bool ReadPages(int fd);
437
438 // Read and write pages to disk.
439 virtual bool WritePageToFile(int fd, struct page_entry *src);
440 virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
441
442 // Sector tagging support.
443 virtual bool SectorTagPage(struct page_entry *src, int block);
444 virtual bool SectorValidatePage(const struct PageRec &page,
445 struct page_entry *dst,
446 int block);
447
448 // Get memory for an incoming data transfer..
449 virtual bool PagePrepare();
450 // Remove memory allocated for data transfer.
451 virtual bool PageTeardown();
452
453 // Get memory for an incoming data transfer..
454 virtual bool GetEmptyPage(struct page_entry *dst);
455 // Get memory for an outgoing data transfer..
456 virtual bool GetValidPage(struct page_entry *dst);
457 // Throw out a used empty page.
458 virtual bool PutEmptyPage(struct page_entry *src);
459 // Throw out a used, filled page.
460 virtual bool PutValidPage(struct page_entry *src);
461
462
463 struct PageRec *page_recs_; // Array of page records.
464 int crc_page_; // Page currently being CRC checked.
465 string filename_; // Name of file to access.
466 string devicename_; // Name of device file is on.
467
468 bool page_io_; // Use page pool for IO.
469 void *local_page_; // malloc'd page fon non-pool IO.
470 int pass_; // Number of writes to the file so far.
471
472 // Tag to detect file corruption.
473 struct SectorTag {
474 volatile uint8 magic;
475 volatile uint8 block;
476 volatile uint8 sector;
477 volatile uint8 pass;
478 char pad[512-4];
479 };
480
481 DISALLOW_COPY_AND_ASSIGN(FileThread);
482};
483
484
485// Worker thread to perform Network IO.
486class NetworkThread : public WorkerThread {
487 public:
488 NetworkThread();
489 // Set hostname to use for net IO.
490 virtual void SetIP(const char *ipaddr_init);
491 virtual bool Work();
492
493 // Calculate worker thread specific bandwidth.
494 virtual float GetDeviceCopiedData()
495 {return GetCopiedData()*2;}
496
497 protected:
498 // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
499 virtual bool IsNetworkStopSet();
500 virtual bool CreateSocket(int *psocket);
501 virtual bool CloseSocket(int sock);
502 virtual bool Connect(int sock);
503 virtual bool SendPage(int sock, struct page_entry *src);
504 virtual bool ReceivePage(int sock, struct page_entry *dst);
505 char ipaddr_[256];
506 int sock_;
507
508 private:
509 DISALLOW_COPY_AND_ASSIGN(NetworkThread);
510};
511
512// Worker thread to reflect Network IO.
513class NetworkSlaveThread : public NetworkThread {
514 public:
515 NetworkSlaveThread();
516 // Set socket for IO.
517 virtual void SetSock(int sock);
518 virtual bool Work();
519
520 protected:
521 virtual bool IsNetworkStopSet();
522
523 private:
524 DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
525};
526
527// Worker thread to detect incoming Network IO.
528class NetworkListenThread : public NetworkThread {
529 public:
530 NetworkListenThread();
531 virtual bool Work();
532
533 private:
534 virtual bool Listen();
535 virtual bool Wait();
536 virtual bool GetConnection(int *pnewsock);
537 virtual bool SpawnSlave(int newsock, int threadid);
538 virtual bool ReapSlaves();
539
540 // For serviced incoming connections.
541 struct ChildWorker {
542 WorkerStatus status;
543 NetworkSlaveThread thread;
544 };
545 typedef vector<ChildWorker*> ChildVector;
546 ChildVector child_workers_;
547
548 DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
549};
550
551// Worker thread to perform Memory Copy.
552class CopyThread : public WorkerThread {
553 public:
554 CopyThread() {}
555 virtual bool Work();
556 // Calculate worker thread specific bandwidth.
557 virtual float GetMemoryCopiedData()
558 {return GetCopiedData()*2;}
559
560 private:
561 DISALLOW_COPY_AND_ASSIGN(CopyThread);
562};
563
564// Worker thread to perform Memory Invert.
565class InvertThread : public WorkerThread {
566 public:
567 InvertThread() {}
568 virtual bool Work();
569 // Calculate worker thread specific bandwidth.
570 virtual float GetMemoryCopiedData()
571 {return GetCopiedData()*4;}
572
573 private:
574 virtual int InvertPageUp(struct page_entry *srcpe);
575 virtual int InvertPageDown(struct page_entry *srcpe);
576 DISALLOW_COPY_AND_ASSIGN(InvertThread);
577};
578
579// Worker thread to fill blank pages on startup.
580class FillThread : public WorkerThread {
581 public:
582 FillThread();
583 // Set how many pages this thread should fill before exiting.
584 virtual void SetFillPages(int64 num_pages_to_fill_init);
585 virtual bool Work();
586
587 private:
588 // Fill a page with the data pattern in pe->pattern.
589 virtual bool FillPageRandom(struct page_entry *pe);
590 int64 num_pages_to_fill_;
591 DISALLOW_COPY_AND_ASSIGN(FillThread);
592};
593
594// Worker thread to verify page data matches pattern data.
595// Thread will check and replace pages until "done" flag is set,
596// then it will check and discard pages until no more remain.
597class CheckThread : public WorkerThread {
598 public:
599 CheckThread() {}
600 virtual bool Work();
601 // Calculate worker thread specific bandwidth.
602 virtual float GetMemoryCopiedData()
603 {return GetCopiedData();}
604
605 private:
606 DISALLOW_COPY_AND_ASSIGN(CheckThread);
607};
608
609
610// Worker thread to poll for system error messages.
611// Thread will check for messages until "done" flag is set.
612class ErrorPollThread : public WorkerThread {
613 public:
614 ErrorPollThread() {}
615 virtual bool Work();
616
617 private:
618 DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
619};
620
621// Computation intensive worker thread to stress CPU.
622class CpuStressThread : public WorkerThread {
623 public:
624 CpuStressThread() {}
625 virtual bool Work();
626
627 private:
628 DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
629};
630
631// Worker thread that tests the correctness of the
632// CPU Cache Coherency Protocol.
633class CpuCacheCoherencyThread : public WorkerThread {
634 public:
635 CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
636 int cc_cacheline_count_,
637 int cc_thread_num_,
638 int cc_inc_count_);
639 virtual bool Work();
640
641 protected:
642 cc_cacheline_data *cc_cacheline_data_; // Datstructure for each cacheline.
643 int cc_local_num_; // Local counter for each thread.
644 int cc_cacheline_count_; // Number of cache lines to operate on.
645 int cc_thread_num_; // The integer id of the thread which is
646 // used as an index into the integer array
647 // of the cacheline datastructure.
648 int cc_inc_count_; // Number of times to increment the counter.
649
650 private:
651 DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
652};
653
654// Worker thread to perform disk test.
655class DiskThread : public WorkerThread {
656 public:
657 explicit DiskThread(DiskBlockTable *block_table);
658 virtual ~DiskThread();
659 // Calculate disk thread specific bandwidth.
660 virtual float GetDeviceCopiedData() {
661 return (blocks_written_ * write_block_size_ +
662 blocks_read_ * read_block_size_) / kMegabyte;}
663
664 // Set filename for device file (in /dev).
665 virtual void SetDevice(const char *device_name);
666 // Set various parameters that control the behaviour of the test.
667 virtual bool SetParameters(int read_block_size,
668 int write_block_size,
669 int64 segment_size,
670 int64 cache_size,
671 int blocks_per_segment,
672 int64 read_threshold,
673 int64 write_threshold,
674 int non_destructive);
675
676 virtual bool Work();
677
678 virtual float GetMemoryCopiedData() {return 0;}
679
680 protected:
681 static const int kSectorSize = 512; // Size of sector on disk.
682 static const int kBufferAlignment = 512; // Buffer alignment required by the
683 // kernel.
684 static const int kBlockRetry = 100; // Number of retries to allocate
685 // sectors.
686
687 enum IoOp {
688 ASYNC_IO_READ = 0,
689 ASYNC_IO_WRITE = 1
690 };
691
692 virtual bool OpenDevice(int *pfile);
693 virtual bool CloseDevice(int fd);
694
695 // Retrieves the size (in bytes) of the disk/file.
696 virtual bool GetDiskSize(int fd);
697
698 // Retrieves the current time in microseconds.
699 virtual int64 GetTime();
700
701 // Do an asynchronous disk I/O operation.
702 virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
703 int64 offset, int64 timeout);
704
705 // Write a block to disk.
706 virtual bool WriteBlockToDisk(int fd, BlockData *block);
707
708 // Verify a block on disk.
709 virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
710
711 // Main work loop.
712 virtual bool DoWork(int fd);
713
714 int read_block_size_; // Size of blocks read from disk, in bytes.
715 int write_block_size_; // Size of blocks written to disk, in bytes.
716 int64 blocks_read_; // Number of blocks read in work loop.
717 int64 blocks_written_; // Number of blocks written in work loop.
718 int64 segment_size_; // Size of disk segments (in bytes) that the disk
719 // will be split into where testing can be
720 // confined to a particular segment.
721 // Allows for control of how evenly the disk will
722 // be tested. Smaller segments imply more even
723 // testing (less random).
724 int blocks_per_segment_; // Number of blocks that will be tested per
725 // segment.
726 int cache_size_; // Size of disk cache, in bytes.
727 int queue_size_; // Length of in-flight-blocks queue, in blocks.
728 int non_destructive_; // Use non-destructive mode or not.
729 int update_block_table_; // If true, assume this is the thread
730 // responsible for writing the data in the disk
731 // for this block device and, therefore,
732 // update the block table. If false, just use
733 // the block table to get data.
734
735 // read/write times threshold for reporting a problem
736 int64 read_threshold_; // Maximum time a read should take (in us) before
737 // a warning is given.
738 int64 write_threshold_; // Maximum time a write should take (in us) before
739 // a warning is given.
740 int64 read_timeout_; // Maximum time a read can take before a timeout
741 // and the aborting of the read operation.
742 int64 write_timeout_; // Maximum time a write can take before a timeout
743 // and the aborting of the write operation.
744
745 string device_name_; // Name of device file to access.
746 int64 device_sectors_; // Number of sectors on the device.
747
748 std::queue<BlockData*> in_flight_sectors_; // Queue of sectors written but
749 // not verified.
750 void *block_buffer_; // Pointer to aligned block buffer.
751
752 io_context_t aio_ctx_; // Asynchronous I/O context for Linux native AIO.
753
754 DiskBlockTable *block_table_; // Disk Block Table, shared by all disk
755 // threads that read / write at the same
756 // device
757
758 DISALLOW_COPY_AND_ASSIGN(DiskThread);
759};
760
761class RandomDiskThread : public DiskThread {
762 public:
763 explicit RandomDiskThread(DiskBlockTable *block_table);
764 virtual ~RandomDiskThread();
765 // Main work loop.
766 virtual bool DoWork(int fd);
767 protected:
768 DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
769};
770
771// Worker thread to perform checks in a specific memory region.
772class MemoryRegionThread : public WorkerThread {
773 public:
774 MemoryRegionThread();
775 ~MemoryRegionThread();
776 virtual bool Work();
777 void ProcessError(struct ErrorRecord *error, int priority,
778 const char *message);
779 bool SetRegion(void *region, int64 size);
780 // Calculate worker thread specific bandwidth.
781 virtual float GetMemoryCopiedData()
782 {return GetCopiedData();}
783 virtual float GetDeviceCopiedData()
784 {return GetCopiedData() * 2;}
785 void SetIdentifier(string identifier) {
786 identifier_ = identifier;
787 }
788
789 protected:
790 // Page queue for this particular memory region.
791 char *region_;
792 PageEntryQueue *pages_;
793 bool error_injection_;
794 int phase_;
795 string identifier_;
796 static const int kPhaseNoPhase = 0;
797 static const int kPhaseCopy = 1;
798 static const int kPhaseCheck = 2;
799
800 private:
801 DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
802};
803
804#endif // STRESSAPPTEST_WORKER_H_