Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions bitcoin/BRPeer.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ typedef struct {
uint32_t magicNumber;
char host[INET6_ADDRSTRLEN];
BRPeerStatus status;
int waitingForNetwork;
volatile int needsFilterUpdate;
uint64_t nonce, feePerKb;
char *useragent;
Expand Down Expand Up @@ -985,7 +984,7 @@ static double _peerGetMempoolTime (BRPeerContext *ctx) {
}


static void *_peerThreadRoutine(void *arg)
static void *_peerThreadConnectRoutine(void *arg)
{
BRPeer *peer = arg;
BRPeerContext *ctx = arg;
Expand Down Expand Up @@ -1114,6 +1113,22 @@ static void *_peerThreadRoutine(void *arg)
return NULL; // detached threads don't need to return a value
}

static void *_peerThreadDisconnectRoutine(void *arg) {
BRPeer *peer = arg;
BRPeerContext *ctx = arg;

pthread_cleanup_push(ctx->threadCleanup, ctx->info);

peer_log(peer, "waiting-disconnected");

assert (0 == array_count(ctx->pongCallback));
assert (NULL == ctx->mempoolCallback);
Comment on lines +1124 to +1125
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These probably aren't needed. Added them due to my unfamiliarity with this aspect of the BTC code (assert would catch if we needed to handle this).


if (ctx->disconnected) ctx->disconnected(ctx->info, 0);
pthread_cleanup_pop(1);
return NULL; // detached threads don't need to return a value
}

static void _dummyThreadCleanup(void *info)
{
}
Expand Down Expand Up @@ -1226,35 +1241,31 @@ void BRPeerConnect(BRPeer *peer)
pthread_attr_t attr;

pthread_mutex_lock(&ctx->lock);
if (ctx->status == BRPeerStatusDisconnected || ctx->waitingForNetwork) {
ctx->status = BRPeerStatusConnecting;

if (ctx->status == BRPeerStatusDisconnected || ctx->status == BRPeerStatusWaiting) {
if (ctx->networkIsReachable && ! ctx->networkIsReachable(ctx->info)) { // delay until network is reachable
if (! ctx->waitingForNetwork) peer_log(peer, "waiting for network reachability");
ctx->waitingForNetwork = 1;
if (ctx->status != BRPeerStatusWaiting) peer_log(peer, "waiting for network reachability");
ctx->status = BRPeerStatusWaiting;
}
else {
peer_log(peer, "connecting");
ctx->waitingForNetwork = 0;
ctx->status = BRPeerStatusConnecting;
gettimeofday(&tv, NULL);

// No race - set before the thread starts.
ctx->disconnectTime = tv.tv_sec + (double)tv.tv_usec/1000000 + CONNECT_TIMEOUT;

if (pthread_attr_init(&attr) != 0) {
// error = ENOMEM;
peer_log(peer, "error creating thread");
peer_log(peer, "error creating connect thread");
ctx->status = BRPeerStatusDisconnected;
//if (ctx->disconnected) ctx->disconnected(ctx->info, error);
assert (0);
}
else if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
pthread_attr_setstacksize(&attr, PTHREAD_STACK_SIZE) != 0 ||
pthread_create(&ctx->thread, &attr, _peerThreadRoutine, peer) != 0) {
// error = EAGAIN;
peer_log(peer, "error creating thread");
pthread_create(&ctx->thread, &attr, _peerThreadConnectRoutine, peer) != 0) {
peer_log(peer, "error creating connect thread");
pthread_attr_destroy(&attr);
ctx->status = BRPeerStatusDisconnected;
//if (ctx->disconnected) ctx->disconnected(ctx->info, error);
assert (0);
}
}
}
Expand All @@ -1266,15 +1277,36 @@ void BRPeerDisconnect(BRPeer *peer)
{
BRPeerContext *ctx = (BRPeerContext *)peer;
int socket = -1;
pthread_attr_t attr;

if (_peerCheckAndGetSocket(ctx, &socket)) {
pthread_mutex_lock(&ctx->lock);
ctx->status = BRPeerStatusDisconnected;
pthread_mutex_unlock(&ctx->lock);

if (shutdown(socket, SHUT_RDWR) < 0) peer_log(peer, "%s", strerror(errno));
close(socket);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely the most controversial part; the spawning of a "cleanup"-like thread. Also, the dual nature of this function...

pthread_mutex_lock(&ctx->lock);
if (ctx->status == BRPeerStatusWaiting) {
peer_log(peer, "disconnecting");
ctx->status = BRPeerStatusDisconnected;

if (pthread_attr_init(&attr) != 0) {
peer_log(peer, "error creating disconnect thread");
assert (0);
}
else if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
pthread_attr_setstacksize(&attr, PTHREAD_STACK_SIZE) != 0 ||
pthread_create(&ctx->thread, &attr, _peerThreadDisconnectRoutine, peer) != 0) {
peer_log(peer, "error creating disconnect thread");
pthread_attr_destroy(&attr);
assert (0);
}

ctx->status = BRPeerStatusDisconnected;
}
pthread_mutex_unlock(&ctx->lock);
}

// call this to (re)schedule a disconnect in the given number of seconds, or < 0 to cancel (useful for sync timeout)
Expand Down
3 changes: 2 additions & 1 deletion bitcoin/BRPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ extern "C" {
typedef enum {
BRPeerStatusDisconnected = 0,
BRPeerStatusConnecting,
BRPeerStatusConnected
BRPeerStatusConnected,
BRPeerStatusWaiting
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roll the separate waitingForNetwork flag into this.

} BRPeerStatus;

typedef struct {
Expand Down
14 changes: 4 additions & 10 deletions bitcoin/BRPeerManager.c
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ static void _peerDisconnected(void *info, int error)
if (manager->connectFailureCount > MAX_CONNECT_FAILURES) manager->connectFailureCount = MAX_CONNECT_FAILURES;
}

if (! manager->isConnected && manager->connectFailureCount == MAX_CONNECT_FAILURES) {
if (! manager->isConnected && manager->connectFailureCount >= MAX_CONNECT_FAILURES) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd seen cases where this was greater than MAX_CONNECT_FAILURES. In BRPeerManagerConnect and BRPeerManagerPublishTx, we checked for >=, mirrored that here as I had seen cases where it was greater than at this point.

_BRPeerManagerSyncStopped(manager);

// clear out stored peers so we get a fresh list from DNS on next connect attempt
Expand Down Expand Up @@ -1498,6 +1498,7 @@ static void _peerThreadCleanup(void *info)

free(info);
pthread_mutex_lock(&manager->lock);
assert (0 != manager->peerThreadCount);
manager->peerThreadCount--;
pthread_mutex_unlock(&manager->lock);
if (manager->threadCleanup) manager->threadCleanup(manager->info);
Expand Down Expand Up @@ -1664,7 +1665,7 @@ void BRPeerManagerConnect(BRPeerManager *manager)
for (size_t i = array_count(manager->connectedPeers); i > 0; i--) {
BRPeer *p = manager->connectedPeers[i - 1];

if (BRPeerConnectStatus(p) == BRPeerStatusConnecting) BRPeerConnect(p);
if (BRPeerConnectStatus(p) == BRPeerStatusWaiting) BRPeerConnect(p);
}

if (array_count(manager->connectedPeers) < manager->maxConnectCount) {
Expand Down Expand Up @@ -1706,13 +1707,6 @@ void BRPeerManagerConnect(BRPeerManager *manager)
_peerSetFeePerKb, _peerRequestedTx, _peerNetworkIsReachable, _peerThreadCleanup);
BRPeerSetEarliestKeyTime(info->peer, manager->earliestKeyTime);
BRPeerConnect(info->peer);

if (BRPeerConnectStatus(info->peer) == BRPeerStatusDisconnected) {
pthread_mutex_unlock(&manager->lock);
_peerDisconnected(info, ENOTCONN);
pthread_mutex_lock(&manager->lock);
manager->peerThreadCount--;
}
}
}

Expand Down Expand Up @@ -1744,7 +1738,7 @@ void BRPeerManagerDisconnect(BRPeerManager *manager)
p = manager->connectedPeers[i - 1];
manager->connectFailureCount = MAX_CONNECT_FAILURES; // prevent futher automatic reconnect attempts
BRPeerDisconnect(p);
if (BRPeerConnectStatus(p) == BRPeerStatusConnecting) manager->peerThreadCount--; // waiting for network
while (BRPeerConnectStatus(p) == BRPeerStatusConnecting) BRPeerDisconnect(p);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me leery (endless loop anyone?) but the logic should be that this only happens when the thread is being spawned. Once spawned, the BRPeer thread will set the status to connected or disconnected. In either case, this will break out (fingers crossed but also seen in testing).

}

peerThreadCount = manager->peerThreadCount;
Expand Down
47 changes: 29 additions & 18 deletions bitcoin/testBwm.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ static void *
_testBRWalletManagerConnectThread (void *context) {
BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context;
while (!state->kill) {
nanosleep (&(struct timespec){0, 50000000}, NULL);
BRWalletManagerConnect (state->manager);
}
return NULL;
Expand All @@ -374,6 +375,7 @@ static void *
_testBRWalletManagerDisconnectThread (void *context) {
BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context;
while (!state->kill) {
nanosleep (&(struct timespec){0, 50000000}, NULL);
BRWalletManagerDisconnect (state->manager);
}
return NULL;
Expand All @@ -383,15 +385,27 @@ static void *
_testBRWalletManagerScanThread (void *context) {
BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context;
while (!state->kill) {
nanosleep (&(struct timespec){0, 50000000}, NULL);
BRWalletManagerScan (state->manager);
}
return NULL;
}

static void *
_testBRWalletManagerNetworkThread (void *context) {
BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context;
while (!state->kill) {
nanosleep (&(struct timespec){0, 50000000}, NULL);
BRWalletManagerSetNetworkReachable (state->manager, rand() % 2);
}
return NULL;
}

static void *
_testBRWalletManagerSwapThread (void *context) {
BRRunTestWalletManagerSyncThreadState *state = (BRRunTestWalletManagerSyncThreadState *) context;
while (!state->kill) {
nanosleep (&(struct timespec){0, 50000000}, NULL);
switch (BRWalletManagerGetMode (state->manager)) {
case SYNC_MODE_BRD_ONLY:
BRWalletManagerSetMode (state->manager, SYNC_MODE_P2P_ONLY);
Expand Down Expand Up @@ -943,11 +957,7 @@ BRRunTestWalletManagerSyncForMode (const char *testName,
}

printf("Testing BRWalletManager threading...\n");
if (mode == SYNC_MODE_P2P_ONLY) {
// TODO(fix): There is a thread-related issue in BRPeerManager/BRPeer where we have a use after free; re-enable once that is fixed
fprintf(stderr, "***WARNING*** %s:%d: BRWalletManager threading test is disabled for SYNC_MODE_P2P_ONLY\n", testName, __LINE__);

} else {
{
// Test setup
BRRunTestWalletManagerSyncState state = {0};
BRRunTestWalletManagerSyncTestSetup (&state, blockHeight, 1);
Expand All @@ -956,22 +966,24 @@ BRRunTestWalletManagerSyncForMode (const char *testName,
BRWalletManagerStart (manager);

BRRunTestWalletManagerSyncThreadState threadState = {0, manager};
pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL;
pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL, networkThread = (pthread_t) NULL;

success = (0 == pthread_create (&connectThread, NULL, _testBRWalletManagerConnectThread, (void*) &threadState) &&
0 == pthread_create (&disconnectThread, NULL, _testBRWalletManagerDisconnectThread, (void*) &threadState) &&
0 == pthread_create (&scanThread, NULL, _testBRWalletManagerScanThread, (void*) &threadState));
0 == pthread_create (&scanThread, NULL, _testBRWalletManagerScanThread, (void*) &threadState) &&
0 == pthread_create (&networkThread, NULL, _testBRWalletManagerNetworkThread, (void*) &threadState));
if (!success) {
fprintf(stderr, "***FAILED*** %s:%d: pthread_creates failed\n", testName, __LINE__);
return success;
}

sleep (5);
sleep (10);

threadState.kill = 1;
success = (0 == pthread_join (connectThread, NULL) &&
0 == pthread_join (disconnectThread, NULL) &&
0 == pthread_join (scanThread, NULL));
0 == pthread_join (scanThread, NULL) &&
0 == pthread_join (networkThread, NULL));
if (!success) {
fprintf(stderr, "***FAILED*** %s:%d: pthread_joins failed\n", testName, __LINE__);
return success;
Expand Down Expand Up @@ -1139,11 +1151,7 @@ BRRunTestWalletManagerSyncAllModes (const char *testName,
}

printf("Testing BRWalletManager mode swap threading...\n");
if (primaryMode == SYNC_MODE_P2P_ONLY || secondaryMode == SYNC_MODE_P2P_ONLY) {
// TODO(fix): There is a thread-related issue in BRPeerManager/BRPeer where we have a use after free; re-enable once that is fixed
fprintf(stderr, "***WARNING*** %s:%d: BRWalletManager mode swap threading test is disabled\n", testName, __LINE__);

} else {
{
// Test setup
BRRunTestWalletManagerSyncState state = {0};
BRRunTestWalletManagerSyncTestSetup (&state, blockHeight, 1);
Expand All @@ -1152,23 +1160,26 @@ BRRunTestWalletManagerSyncAllModes (const char *testName,
BRWalletManagerStart (manager);

BRRunTestWalletManagerSyncThreadState threadState = {0, manager};
pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL, swapThread = (pthread_t) NULL;
pthread_t connectThread = (pthread_t) NULL, disconnectThread = (pthread_t) NULL, scanThread = (pthread_t) NULL;
pthread_t networkThread = (pthread_t) NULL, swapThread = (pthread_t) NULL;

success = (0 == pthread_create (&connectThread, NULL, _testBRWalletManagerConnectThread, (void*) &threadState) &&
0 == pthread_create (&disconnectThread, NULL, _testBRWalletManagerDisconnectThread, (void*) &threadState) &&
0 == pthread_create (&scanThread, NULL, _testBRWalletManagerScanThread, (void*) &threadState) &&
0 == pthread_create (&networkThread, NULL, _testBRWalletManagerNetworkThread, (void*) &threadState) &&
0 == pthread_create (&swapThread, NULL, _testBRWalletManagerSwapThread, (void*) &threadState));
if (!success) {
fprintf(stderr, "***FAILED*** %s:%d: pthread_creates failed\n", testName, __LINE__);
return success;
}

sleep (5);
sleep (10);

threadState.kill = 1;
success = (0 == pthread_join (connectThread, NULL) &&
0 == pthread_join (disconnectThread, NULL) &&
0 == pthread_join (scanThread, NULL) &&
0 == pthread_join (networkThread, NULL) &&
0 == pthread_join (swapThread, NULL));
if (!success) {
fprintf(stderr, "***FAILED*** %s:%d: pthread_joins failed\n", testName, __LINE__);
Expand Down Expand Up @@ -1399,7 +1410,7 @@ BRRunTestWalletManagerFileService (const char *storagePath) {
BRTransactionFree(tx);
BRTransactionFree(tx2);
BRSetFree (transactionSet);

///
/// Peer
///
Expand All @@ -1425,7 +1436,7 @@ BRRunTestWalletManagerFileService (const char *storagePath) {

free(p2);
BRSetFree(peerSet);

fileServiceClose(fs);
fileServiceRelease(fs);

Expand Down