diff --git a/src/BuildInfo.ts b/src/BuildInfo.ts index e77e15c05..915c8f12b 100644 --- a/src/BuildInfo.ts +++ b/src/BuildInfo.ts @@ -14,6 +14,7 @@ * limitations under the License. */ /** @ignore *//** */ +import {MemberVersion} from './core/MemberVersion'; const clientVersion = require('../package.json').version; @@ -52,4 +53,7 @@ export class BuildInfo { return clientVersion; } + public static calculateMemberVersion(m: MemberVersion) : number { + return BuildInfo.calculateServerVersion(m.major, m.minor, m.patch); + } } diff --git a/src/HazelcastClient.ts b/src/HazelcastClient.ts index 04a98a097..29860a787 100644 --- a/src/HazelcastClient.ts +++ b/src/HazelcastClient.ts @@ -161,7 +161,8 @@ export class HazelcastClient { this.clusterFailoverService ); this.connectionRegistry = new ConnectionRegistryImpl( - this.config.connectionStrategy, + this.config.connectionStrategy.asyncStart, + this.config.connectionStrategy.reconnectMode, this.config.network.smartRouting, this.loadBalancer, this.clusterService diff --git a/src/core/LoadBalancer.ts b/src/core/LoadBalancer.ts index 8ce5b51a8..e557a6ef3 100644 --- a/src/core/LoadBalancer.ts +++ b/src/core/LoadBalancer.ts @@ -53,6 +53,7 @@ export interface LoadBalancer { * Returns the next data member to route to. * * @return Returns the next data member or `null` if no data member is available + * @deprecated Since 5.0, the method is unused */ nextDataMember(): Member | null; @@ -60,6 +61,7 @@ export interface LoadBalancer { * Returns whether this instance supports getting data members through a call to {@link nextDataMember()}. * * @return Returns `true` if this load balancer can get a data member. + * @deprecated Since 5.0, the method is unused */ canGetNextDataMember(): boolean; } diff --git a/src/core/MemberVersion.ts b/src/core/MemberVersion.ts index d7f2d9e55..7706a20d4 100644 --- a/src/core/MemberVersion.ts +++ b/src/core/MemberVersion.ts @@ -26,4 +26,29 @@ export class MemberVersion { this.minor = minor; this.patch = patch; } + + /** + * @param other other version to compare to + * @param ignorePatchVersion whether patch in the version should be ignored + * @return true if this version equals `other` + */ + equals(other: MemberVersion, ignorePatchVersion = false): boolean { + if (ignorePatchVersion) { + return this.major === other.major && this.minor === other.minor; + } else { + return this.major === other.major && this.minor === other.minor && this.patch === other.patch; + } + } + + /** + * @param ignorePatchVersion whether patch in the version should be ignored + * @return string format of this `MemberVersion` + */ + toString(ignorePatchVersion = false): string { + if (ignorePatchVersion) { + return `${this.major}.${this.minor}`; + } else { + return `${this.major}.${this.minor}.${this.patch}`; + } + } } diff --git a/src/invocation/ClusterService.ts b/src/invocation/ClusterService.ts index 4abcaa3d5..bac001627 100644 --- a/src/invocation/ClusterService.ts +++ b/src/invocation/ClusterService.ts @@ -77,7 +77,7 @@ export class ClusterService implements Cluster { * @param uuid The UUID of the member as a string. * @return The member that was found, or undefined if not found. */ - getMember(uuid: string): MemberImpl { + getMember(uuid: string): MemberImpl | undefined { assertNotNull(uuid); return this.memberListSnapshot.members.get(uuid); } diff --git a/src/network/ConnectionManager.ts b/src/network/ConnectionManager.ts index 83c906848..8ef62f08d 100644 --- a/src/network/ConnectionManager.ts +++ b/src/network/ConnectionManager.ts @@ -47,14 +47,15 @@ import { scheduleWithRepetition, shuffleArray, Task, - timedPromise + timedPromise, + memberOfLargerSameVersionGroup } from '../util/Util'; import {BasicSSLOptionsFactory} from '../connection/BasicSSLOptionsFactory'; import {ILogger} from '../logging/ILogger'; import {HeartbeatManager} from './HeartbeatManager'; import {UuidUtil} from '../util/UuidUtil'; import {WaitStrategy} from './WaitStrategy'; -import {ConnectionStrategyConfig, ReconnectMode} from '../config/ConnectionStrategyConfig'; +import {ReconnectMode} from '../config/ConnectionStrategyConfig'; import {ClientConfig, ClientConfigImpl} from '../config/Config'; import {LifecycleState, LifecycleServiceImpl, LifecycleService} from '../LifecycleService'; import {ClientMessage} from '../protocol/ClientMessage'; @@ -81,6 +82,7 @@ export const CLIENT_TYPE = 'NJS'; const SERIALIZATION_VERSION = 1; const SET_TIMEOUT_MAX_DELAY = 2147483647; const BINARY_PROTOCOL_VERSION = Buffer.from('CP2'); +const SQL_CONNECTION_RANDOM_ATTEMPTS = 10; enum ConnectionState { /** @@ -131,10 +133,21 @@ export interface ConnectionRegistry { /** * Returns a random connection from active connections - * @param dataMember true if only data members should be considered * @return Connection if there is at least one connection, otherwise null */ - getRandomConnection(dataMember?: boolean): Connection | null; + getRandomConnection(): Connection | null; + + /** + * Returns a connection for executing SQL. + * + * @throws IllegalStateError If there are more than 2 distinct member versions found + * @return + * * A random connection to a data member from the larger same-version group + * * If there's no such connection, return connection to a random data member + * * If there's no such connection, return any random connection + * * If there are no connections, null is returned + */ + getConnectionForSql(): Connection | null; /** * Returns if invocation allowed. Invocation is allowed only if connection state is {@link INITIALIZED_ON_CLUSTER} @@ -148,24 +161,15 @@ export class ConnectionRegistryImpl implements ConnectionRegistry { private active = false; private readonly activeConnections = new Map(); - private readonly loadBalancer: LoadBalancer; private connectionState = ConnectionState.INITIAL; - private readonly smartRoutingEnabled: boolean; - private readonly asyncStart: boolean; - private readonly reconnectMode: ReconnectMode; - private readonly clusterService: ClusterService; constructor( - connectionStrategy: ConnectionStrategyConfig, - smartRoutingEnabled: boolean, - loadBalancer: LoadBalancer, - clusterService: ClusterService + private readonly asyncStart: boolean, + private readonly reconnectMode: ReconnectMode, + private readonly smartRoutingEnabled: boolean, + private readonly loadBalancer: LoadBalancer, + private readonly clusterService: ClusterService ) { - this.smartRoutingEnabled = smartRoutingEnabled; - this.asyncStart = connectionStrategy.asyncStart; - this.reconnectMode = connectionStrategy.reconnectMode; - this.loadBalancer = loadBalancer; - this.clusterService = clusterService; } isActive(): boolean { @@ -188,41 +192,62 @@ export class ConnectionRegistryImpl implements ConnectionRegistry { return this.activeConnections.get(uuid.toString()); } - getRandomConnection(dataMember = false): Connection | null { + getRandomConnection(): Connection | null { if (this.smartRoutingEnabled) { - let member; - if (dataMember) { - if (this.loadBalancer.canGetNextDataMember()) { - member = this.loadBalancer.nextDataMember(); - } else { - member = null; + const member = this.loadBalancer.next(); + if (member != null) { + const connection = this.getConnection(member.uuid); + if (connection != null) { + return connection; } - } else { - member = this.loadBalancer.next(); } + } + + const iterator = this.activeConnections.values(); + const next = iterator.next(); + if (!next.done) { + return next.value; + } else { + return null; + } + } - if (member !== null) { + getConnectionForSql(): Connection | null { + if (this.smartRoutingEnabled) { + // There might be a race - the chosen member might be just connected or disconnected - try a + // couple of times, the memberOfLargerSameVersionGroup returns a random connection, + // we might be lucky... + for (let i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) { + const member = memberOfLargerSameVersionGroup(this.clusterService.getMembers()); + + if (member === null) { + break; + } const connection = this.getConnection(member.uuid); - if (connection != null) { + if (connection !== undefined) { return connection; } } } - - for (const entry of this.activeConnections.entries()) { - const uuid = entry[0]; + // Otherwise iterate over connections and return the first one that's not to a lite member + let firstConnection: Connection | null = null; + for (const entry of this.activeConnections) { + const memberId = entry[0]; const connection = entry[1]; - if (dataMember) { - const member = this.clusterService.getMember(uuid); - if (!member || member.liteMember) { - continue; - } + + if (firstConnection === null) { + firstConnection = connection; + } + const member = this.clusterService.getMember(memberId); + if (member === undefined || member.liteMember) { + continue; } return connection; } - return null; + // Failed to get a connection to a data member + return firstConnection; } forEachConnection(fn: (conn: Connection) => void): void { diff --git a/src/sql/SqlResult.ts b/src/sql/SqlResult.ts index 65eea7ef4..c9ea554b4 100644 --- a/src/sql/SqlResult.ts +++ b/src/sql/SqlResult.ts @@ -249,7 +249,7 @@ export class SqlResultImpl implements SqlResult { this.sqlService.close(this.connection, this.queryId).then(() => { this.closeDeferred.resolve(); }).catch(err => { - this.closeDeferred.reject(this.sqlService.toHazelcastSqlException(err, this.connection)); + this.closeDeferred.reject(this.sqlService.rethrow(err, this.connection)); }); this.closed = true; @@ -354,7 +354,7 @@ export class SqlResultImpl implements SqlResult { this.fetchDeferred.resolve(sqlPage); this.fetchDeferred = undefined; // Set fetchDeferred to undefined to be able to fetch again }).catch(err => { - this.fetchDeferred.reject(this.sqlService.toHazelcastSqlException(err, this.connection)); + this.fetchDeferred.reject(this.sqlService.rethrow(err, this.connection)); }); return this.fetchDeferred.promise; diff --git a/src/sql/SqlService.ts b/src/sql/SqlService.ts index f8dc85b1f..ec76edd07 100644 --- a/src/sql/SqlService.ts +++ b/src/sql/SqlService.ts @@ -255,7 +255,7 @@ export class SqlServiceImpl implements SqlService { * @param connection * @returns {@link HazelcastSqlException} */ - toHazelcastSqlException(err: any, connection: Connection) : HazelcastSqlException { + rethrow(err: any, connection: Connection): HazelcastSqlException { if (!connection.isAlive()) { return new HazelcastSqlException( this.connectionManager.getClientUuid(), @@ -265,19 +265,23 @@ export class SqlServiceImpl implements SqlService { err ) } else { - if (err instanceof HazelcastSqlException) { - return err; - } - let originatingMemberId; - if (err.hasOwnProperty('originatingMemberId')) { - originatingMemberId = err.originatingMemberId; - } else { - originatingMemberId = this.connectionManager.getClientUuid(); - } - return new HazelcastSqlException( - originatingMemberId, SqlErrorCode.GENERIC, err.message, err - ); + return this.toHazelcastSqlException(err); + } + } + + toHazelcastSqlException(err: any): HazelcastSqlException { + if (err instanceof HazelcastSqlException) { + return err; + } + let originatingMemberId; + if (err.hasOwnProperty('originatingMemberId')) { + originatingMemberId = err.originatingMemberId; + } else { + originatingMemberId = this.connectionManager.getClientUuid(); } + return new HazelcastSqlException( + originatingMemberId, SqlErrorCode.GENERIC, err.message, err + ); } executeStatement(sqlStatement: SqlStatement): SqlResult { @@ -287,9 +291,16 @@ export class SqlServiceImpl implements SqlService { throw new IllegalArgumentError(`Invalid argument given to execute(): ${error.message}`, error) } - const connection = this.connectionRegistry.getRandomConnection(true); + let connection: Connection | null; + + try { + connection = this.connectionRegistry.getConnectionForSql(); + } catch (e) { + throw this.toHazelcastSqlException(e); + } + if (connection === null) { - // Either the client is not connected to the cluster, or there are no data members in the cluster. + // The client is not connected to the cluster. throw new HazelcastSqlException( this.connectionManager.getClientUuid(), SqlErrorCode.CONNECTION_PROBLEM, @@ -352,13 +363,13 @@ export class SqlServiceImpl implements SqlService { SqlServiceImpl.handleExecuteResponse(clientMessage, res); }).catch(err => { res.onExecuteError( - this.toHazelcastSqlException(err, connection) + this.rethrow(err, connection) ); }); return res; } catch (error) { - throw this.toHazelcastSqlException(error, connection); + throw this.rethrow(error, connection); } } diff --git a/src/util/Util.ts b/src/util/Util.ts index 11df91fa9..07d5e5f7d 100644 --- a/src/util/Util.ts +++ b/src/util/Util.ts @@ -18,7 +18,9 @@ import * as assert from 'assert'; import * as Long from 'long'; import * as Path from 'path'; -import {BigDecimal, LocalDate, LocalDateTime, LocalTime, OffsetDateTime, UUID} from '../core'; +import {BigDecimal, IllegalStateError, LocalDate, LocalDateTime, LocalTime, MemberImpl, OffsetDateTime, UUID} from '../core'; +import {MemberVersion} from '../core/MemberVersion'; +import {BuildInfo} from '../BuildInfo'; /** @internal */ export function assertNotNull(v: any): void { @@ -286,3 +288,75 @@ export function timedPromise(wrapped: Promise, timeout: number, err?: Erro return deferred.promise; } + +/** + * Finds a larger same-version group of data members from a collection of + * members. + * Otherwise return a random member from the group. If the same-version + * groups have the same size, return a member from the newer group. + * + * Used for getting an SQL connection for executing SQL. + * + * @param members list of all members + * @throws IllegalStateError If there are more than 2 distinct member versions found + * @return the chosen member or null, if no data member is found + */ +export function memberOfLargerSameVersionGroup(members: MemberImpl[]): MemberImpl | null { + // The members should have at most 2 different version (ignoring the patch version). + // Find a random member from the larger same-version group. + + let version0: MemberVersion | null = null; + let version1: MemberVersion | null = null; + let count0 = 0; + let count1 = 0; + + for (const m of members) { + if (m.liteMember) { + continue; + } + + const version = m.version; + + if (version0 === null || version0.equals(version, true)) { + version0 = version; + count0++; + } else if (version1 === null || version1.equals(version, true)) { + version1 = version; + count1++ + } else { + const strVer0 = version0.toString(true); + const strVer1 = version1.toString(true); + const strVer = version.toString(true); + + throw new IllegalStateError(`More than 2 distinct member versions found: ${strVer0}, ${strVer1}, ${strVer}`); + } + } + + // no data members + if (count0 === 0) { + return null; + } + + let count: number; + let version: MemberVersion; + + if (count0 > count1 || count0 === count1 + && BuildInfo.calculateMemberVersion(version0) > BuildInfo.calculateMemberVersion(version1)) { + count = count0; + version = version0; + } else { + count = count1; + version = version1; + } + + // otherwise return a random member from the larger group + let randomMemberIndex = randomInt(count); + for (const m of members) { + if (!m.liteMember && m.version.equals(version, true)) { + randomMemberIndex--; + if (randomMemberIndex < 0) { + return m; + } + } + } +} diff --git a/test/integration/backward_compatible/sql/ExecuteTest.js b/test/integration/backward_compatible/sql/ExecuteTest.js index 789f92194..3bcc8923e 100644 --- a/test/integration/backward_compatible/sql/ExecuteTest.js +++ b/test/integration/backward_compatible/sql/ExecuteTest.js @@ -25,6 +25,7 @@ chai.should(); const { Client } = require('../../../../lib'); const TestUtil = require('../../../TestUtil'); const RC = require('../../RC'); +const {Lang} = require('../../remote_controller/remote-controller_types'); const getHazelcastSqlException = () => { const { HazelcastSqlException } = require('../../../../lib/core/HazelcastError'); @@ -51,6 +52,15 @@ const getSqlErrorCode = () => { return SqlErrorCode; }; +const LITE_MEMBER_CONFIG = ` + + + +`; + /** * Sql tests */ @@ -60,6 +70,38 @@ describe('SqlExecuteTest', function () { let someMap; let mapName; + const runSQLQueryWithParams = async () => { + for (const _mapName of [mapName, 'partitioned.' + mapName]) { + const entryCount = 10; + const limit = 6; + + await populateMap(entryCount); + // At this point the map includes [0, 1], [1, 2].. [9, 10] + + // There should be "limit" results + const result1 = await TestUtil.getSql(client).execute(`SELECT * FROM ${_mapName} WHERE this <= ?`, [limit]); + const result2 = await TestUtil.getSql(client).executeStatement({ + sql: `SELECT * FROM ${_mapName} WHERE this <= ?`, + params: [limit] + }); + + for (const result of [result1, result2]) { + const rows = []; + for await (const row of result) { + rows.push(row); + } + + sortByKey(rows); + + for (let i = 0; i < limit; i++) { + rows[i]['__key'].should.be.eq(i); + rows[i]['this'].should.be.eq(i + 1); + } + rows.should.have.lengthOf(limit); + } + } + }; + // Sorts sql result rows by __key, first the smallest __key const sortByKey = (array) => { array.sort((a, b) => { @@ -222,35 +264,33 @@ describe('SqlExecuteTest', function () { }); it('should execute with params', async function () { - for (const _mapName of [mapName, 'partitioned.' + mapName]) { - const entryCount = 10; - const limit = 6; - - await populateMap(entryCount); - // At this point the map includes [0, 1], [1, 2].. [9, 10] - - // There should be "limit" results - const result1 = await TestUtil.getSql(client).execute(`SELECT * FROM ${_mapName} WHERE this <= ?`, [limit]); - const result2 = await TestUtil.getSql(client).executeStatement({ - sql: `SELECT * FROM ${_mapName} WHERE this <= ?`, - params: [limit] - }); + await runSQLQueryWithParams(); + }); + }); + describe('mixed cluster of lite and data members', function () { - for (const result of [result1, result2]) { - const rows = []; - for await (const row of result) { - rows.push(row); - } + before(async function () { + cluster = await RC.createCluster(null, LITE_MEMBER_CONFIG); + await RC.startMember(cluster.id); + await RC.startMember(cluster.id); + await RC.executeOnController(cluster.id, ` + instance_0.getCluster().promoteLocalLiteMember(); + `, Lang.JAVASCRIPT); + client = await Client.newHazelcastClient({ + clusterName: cluster.id + }); + TestUtil.markServerVersionAtLeast(this, client, '4.2'); + mapName = TestUtil.randomString(10); + someMap = await client.getMap(mapName); + }); - sortByKey(rows); + after(async function () { + await RC.terminateCluster(cluster.id); + await client.shutdown(); + }); - for (let i = 0; i < limit; i++) { - rows[i]['__key'].should.be.eq(i); - rows[i]['this'].should.be.eq(i + 1); - } - rows.should.have.lengthOf(limit); - } - } + it('should be able to execute sql query', async function () { + await runSQLQueryWithParams(); }); }); describe('options', function () { @@ -450,15 +490,6 @@ describe('SqlExecuteTest', function () { }); }); describe('errors/invalid usage', function () { - const LITE_MEMBER_CONFIG = ` - - - - `; - afterEach(async function () { await RC.terminateCluster(cluster.id); await client.shutdown(); @@ -474,23 +505,21 @@ describe('SqlExecuteTest', function () { mapName = TestUtil.randomString(10); someMap = await client.getMap(mapName); - const error1 = TestUtil.getThrownErrorOrThrow(() => { - TestUtil.getSql(client).execute(`SELECT * FROM ${mapName}`); + const error1 = await TestUtil.getRejectionReasonOrThrow(async () => { + const result = TestUtil.getSql(client).execute(`SELECT * FROM ${mapName}`); + await result.getRowMetadata(); }); error1.should.be.instanceof(getHazelcastSqlException()); - error1.code.should.be.eq(getSqlErrorCode().CONNECTION_PROBLEM); - error1.originatingMemberId.should.be.eq(client.connectionManager.getClientUuid()); - const error2 = TestUtil.getThrownErrorOrThrow(() => { - TestUtil.getSql(client).executeStatement({ + const error2 = await TestUtil.getRejectionReasonOrThrow(async () => { + const result = TestUtil.getSql(client).executeStatement({ sql: `SELECT * FROM ${mapName}`, params: [], options: {} }); + await result.getRowMetadata(); }); error2.should.be.instanceof(getHazelcastSqlException()); - error2.code.should.be.eq(getSqlErrorCode().CONNECTION_PROBLEM); - error2.originatingMemberId.should.be.eq(client.connectionManager.getClientUuid()); }); it('should return an error if connection lost', async function () { diff --git a/test/unit/network/ConnectionRegistryTest.js b/test/unit/network/ConnectionRegistryTest.js index 15b06ccd9..c66aa259c 100644 --- a/test/unit/network/ConnectionRegistryTest.js +++ b/test/unit/network/ConnectionRegistryTest.js @@ -21,12 +21,14 @@ const sinon = require('sinon'); const sandbox = sinon.createSandbox(); const expect = chai.expect; +const should = chai.should(); chai.use(sinonChai); const { ConnectionRegistryImpl } = require('../../../lib/network/ConnectionManager'); -const { ConnectionStrategyConfigImpl, ReconnectMode } = require('../../../lib/config/ConnectionStrategyConfig'); +const { ReconnectMode } = require('../../../lib/config/ConnectionStrategyConfig'); const { RoundRobinLB } = require('../../../lib/util/RoundRobinLB'); const { UuidUtil } = require('../../../lib/util/UuidUtil'); +const Util = require('../../../lib/util/Util'); const { ClientOfflineError, IOError } = require('../../../lib/core/HazelcastError'); describe('ConnectionRegistryTest', function () { @@ -43,33 +45,14 @@ describe('ConnectionRegistryTest', function () { describe('getRandomConnection', function () { - it('should not call nextDataMember() or next() on load balancer ' + - 'when load balancer does not support data members and data member is requested ', function () { - const loadBalancerStub = {}; - loadBalancerStub.canGetNextDataMember = sandbox.fake.returns(false); - loadBalancerStub.next = sandbox.spy(); - loadBalancerStub.nextDataMember = sandbox.spy(); - - const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), - true, - loadBalancerStub, - {} - ); - - connectionRegistry.getRandomConnection(true); - - expect(loadBalancerStub.next.called).to.be.false; - expect(loadBalancerStub.nextDataMember.called).to.be.false; - }); - it('should call load balancer\'s next() when in smart mode', function () { const loadBalancerStub = {}; loadBalancerStub.next = sandbox.fake.returns(null); loadBalancerStub.nextDataMember = sandbox.spy(); const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), + false, + ReconnectMode.ON, true, loadBalancerStub, {} @@ -87,7 +70,8 @@ describe('ConnectionRegistryTest', function () { loadBalancerStub.nextDataMember = sandbox.spy(); const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), + false, + ReconnectMode.ON, true, loadBalancerStub, {} @@ -99,25 +83,6 @@ describe('ConnectionRegistryTest', function () { expect(loadBalancerStub.next.called).to.be.true; }); - it('should call load balancer\'s nextDataMember() when in smart mode and dataMember is needed', function () { - const loadBalancerStub = {}; - loadBalancerStub.next = sandbox.spy(); - loadBalancerStub.nextDataMember = sandbox.fake.returns(null); - loadBalancerStub.canGetNextDataMember = sandbox.fake.returns(true); - - const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), - true, - loadBalancerStub, - {} - ); - - connectionRegistry.getRandomConnection(true); - - expect(loadBalancerStub.nextDataMember.called).to.be.true; - expect(loadBalancerStub.next.called).to.be.false; - }); - it('should use member uuid returned by load balancer to get connection in smart mode', function () { const member = { uuid: UuidUtil.generate() @@ -127,7 +92,8 @@ describe('ConnectionRegistryTest', function () { loadBalancerStub.next = sandbox.fake.returns(member); const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), + false, + ReconnectMode.ON, true, loadBalancerStub, {} @@ -144,7 +110,8 @@ describe('ConnectionRegistryTest', function () { function () { const loadBalancerStub = {next: sandbox.spy(), nextDataMember: sandbox.spy()}; const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), + false, + ReconnectMode.ON, false, loadBalancerStub, {} @@ -167,102 +134,17 @@ describe('ConnectionRegistryTest', function () { expect(loadBalancerStub.nextDataMember.called).to.be.false; } ); - - it('should return data member connection when one exists and when data member is requested, [dummy mode]', - function () { - const firstUUID = UuidUtil.generate(); - const secondUUID = UuidUtil.generate(); - const thirdUUID = UuidUtil.generate(); - - const loadBalancerStub = {next: sandbox.spy(), nextDataMember: sandbox.spy()}; - const clusterServiceStub = {}; - clusterServiceStub.getMember = sandbox.stub(); - - clusterServiceStub.getMember.withArgs(firstUUID.toString()).returns({ - liteMember: true - }); - clusterServiceStub.getMember.withArgs(secondUUID.toString()).returns({ - liteMember: false - }); - clusterServiceStub.getMember.withArgs(thirdUUID.toString()).returns({ - liteMember: true - }); - - const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), - false, - loadBalancerStub, - clusterServiceStub - ); - - const secondConnection = {}; - const firstConnection = {}; - connectionRegistry.setConnection(firstUUID, firstConnection); - connectionRegistry.setConnection(secondUUID, secondConnection); - connectionRegistry.setConnection(thirdUUID, {}); - - const connection = connectionRegistry.getRandomConnection(); - const otherConnection = connectionRegistry.getRandomConnection(); - const dataMemberConnection = connectionRegistry.getRandomConnection(true); - - expect(connection).to.be.equal(firstConnection); - expect(otherConnection).to.be.equal(firstConnection); - expect(dataMemberConnection).to.be.equal(secondConnection); - - expect(loadBalancerStub.next.called).to.be.false; - expect(loadBalancerStub.nextDataMember.called).to.be.false; - } - ); - - it('should return null if there is no data member connection and data member is requested, [dummy mode]', - function () { - const firstUUID = UuidUtil.generate(); - const secondUUID = UuidUtil.generate(); - - const loadBalancerStub = {next: sandbox.spy(), nextDataMember: sandbox.spy()}; - const clusterServiceStub = {}; - clusterServiceStub.getMember = sandbox.stub(); - - clusterServiceStub.getMember.withArgs(firstUUID.toString()).returns({ - liteMember: true - }); - clusterServiceStub.getMember.withArgs(secondUUID.toString()).returns({ - liteMember: true - }); - - const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), - false, - loadBalancerStub, - clusterServiceStub - ); - - const secondConnection = {}; - const firstConnection = {}; - connectionRegistry.setConnection(firstUUID, firstConnection); - connectionRegistry.setConnection(secondUUID, secondConnection); - - const connection = connectionRegistry.getRandomConnection(); - const otherConnection = connectionRegistry.getRandomConnection(); - const dataMemberConnection = connectionRegistry.getRandomConnection(true); - - expect(connection).to.be.equal(firstConnection); - expect(otherConnection).to.be.equal(firstConnection); - expect(dataMemberConnection).to.be.equal(null); - - expect(loadBalancerStub.next.called).to.be.false; - expect(loadBalancerStub.nextDataMember.called).to.be.false; - } - ); }); describe('checkIfInvocationAllowed', function () { it('should return null when connection state is INITIALIZED_ON_CLUSTER and there are some active connections', function () { const connectionRegistry = new ConnectionRegistryImpl( - new ConnectionStrategyConfigImpl(), false, - new RoundRobinLB() + ReconnectMode.ON, + false, + new RoundRobinLB(), + {} ); connectionRegistry.setConnectionState(connectionState.INITIALIZED_ON_CLUSTER); @@ -273,13 +155,12 @@ describe('ConnectionRegistryTest', function () { ); it('should return ClientOfflineError when connection state is INITIAL and with async start', function () { - const connectionStrategyConfig = new ConnectionStrategyConfigImpl(); - connectionStrategyConfig.asyncStart = true; - const connectionRegistry = new ConnectionRegistryImpl( - connectionStrategyConfig, + true, + ReconnectMode.ON, false, - new RoundRobinLB() + new RoundRobinLB(), + {} ); connectionRegistry.setConnectionState(connectionState.INITIAL); @@ -288,13 +169,12 @@ describe('ConnectionRegistryTest', function () { }); it('should return IOError when connection state is INITIAL and without async start', function () { - const connectionStrategyConfig = new ConnectionStrategyConfigImpl(); - connectionStrategyConfig.asyncStart = false; - const connectionRegistry = new ConnectionRegistryImpl( - connectionStrategyConfig, false, - new RoundRobinLB() + ReconnectMode.ON, + false, + new RoundRobinLB(), + {} ); connectionRegistry.setConnectionState(connectionState.INITIAL); @@ -304,13 +184,12 @@ describe('ConnectionRegistryTest', function () { it('should return ClientOfflineError when reconnect mode is async, connection state is INITIALIZED_ON_CLUSTER ' + 'and there are no connections', function () { - const connectionStrategyConfig = new ConnectionStrategyConfigImpl(); - connectionStrategyConfig.reconnectMode = ReconnectMode.ASYNC; - const connectionRegistry = new ConnectionRegistryImpl( - connectionStrategyConfig, false, - new RoundRobinLB() + ReconnectMode.ASYNC, + false, + new RoundRobinLB(), + {} ); connectionRegistry.setConnectionState(connectionState.INITIALIZED_ON_CLUSTER); @@ -318,4 +197,94 @@ describe('ConnectionRegistryTest', function () { expect(connectionRegistry.checkIfInvocationAllowed()).to.be.instanceof(ClientOfflineError); }); }); + + describe('getConnectionForSql', function () { + afterEach(function () { + sandbox.restore(); + }); + + it('should return the connection to the member returned from memberOfLargerSameVersionGroup in smart mode', function () { + const fakeClusterService = { + getMembers: () => {} + }; + const connectionRegistry = new ConnectionRegistryImpl(false, ReconnectMode.ON, true, {}, fakeClusterService); + const fakeMember = {uuid: UuidUtil.generate()}; + const memberConnection = {}; + + sandbox.replace(Util, 'memberOfLargerSameVersionGroup', sandbox.fake.returns(fakeMember)); + + // add connection to the member + connectionRegistry.setConnection(fakeMember.uuid, memberConnection); + const connection = connectionRegistry.getConnectionForSql(); + connection.should.be.eq(memberConnection); + }); + + it('should return the first connection to a data member in dummy mode', function () { + const fakeLiteMember = {uuid: UuidUtil.generate(), liteMember: true}; + const fakeDataMember = {uuid: UuidUtil.generate(), liteMember: false}; + const fakeDataMember2 = {uuid: UuidUtil.generate(), liteMember: false}; + + const fakeClusterService = { + members: { + [fakeLiteMember.uuid.toString()]: fakeLiteMember, + [fakeDataMember.uuid.toString()]: fakeDataMember, + [fakeDataMember2.uuid.toString()]: fakeDataMember2, + }, + getMember: function (memberId) { // arrow function won't work here + return this.members[memberId]; + } + }; + + const connectionRegistry = new ConnectionRegistryImpl(false, ReconnectMode.ON, false, {}, fakeClusterService); + + // add connections + const firstDataMemberConnection = {}; + connectionRegistry.setConnection(fakeDataMember.uuid, firstDataMemberConnection); + connectionRegistry.setConnection(fakeDataMember2.uuid, {}); + connectionRegistry.setConnection(fakeLiteMember.uuid, {}); + + const connection = connectionRegistry.getConnectionForSql(); + connection.should.be.eq(firstDataMemberConnection); + }); + + it('should return the first connection if no data members found in dummy mode', function () { + const fakeLiteMember = {uuid: UuidUtil.generate(), liteMember: true}; + const fakeLiteMember2 = {uuid: UuidUtil.generate(), liteMember: true}; + + const fakeClusterService = { + members: { + [fakeLiteMember.uuid.toString()]: fakeLiteMember, + [fakeLiteMember2.uuid.toString()]: fakeLiteMember2, + }, + getMember: function (memberId) { // arrow function won't work here + return this.members[memberId]; + } + }; + + const connectionRegistry = new ConnectionRegistryImpl(false, ReconnectMode.ON, false, {}, fakeClusterService); + + // add connections + const firstConnection = {}; + connectionRegistry.setConnection(fakeLiteMember.uuid, firstConnection); + connectionRegistry.setConnection(fakeLiteMember2.uuid, {}); + + const connection = connectionRegistry.getConnectionForSql(); + connection.should.be.eq(firstConnection); + }); + + it('should return null if no connection exists', function () { + const connectionRegistry = new ConnectionRegistryImpl(false, ReconnectMode.ON, false, {}, { + getMembers: () => [] + }); + + const connection = connectionRegistry.getConnectionForSql(); + should.equal(connection, null); + + const connectionRegistry2 = new ConnectionRegistryImpl(false, ReconnectMode.ON, true, {}, { + getMembers: () => [] + }); + const connection2 = connectionRegistry2.getConnectionForSql(); + should.equal(connection2, null); + }); + }); }); diff --git a/test/unit/sql/SqlResult.js b/test/unit/sql/SqlResult.js index 8bcf2de28..1297a67ca 100644 --- a/test/unit/sql/SqlResult.js +++ b/test/unit/sql/SqlResult.js @@ -231,6 +231,7 @@ describe('SqlResultTest', function () { beforeEach(function () { fakeSqlService = { toHazelcastSqlException: sandbox.fake((err) => new HazelcastSqlException(null, 1, '', err)), + rethrow: sandbox.fake((err) => new HazelcastSqlException(null, 1, '', err)), close: sandbox.fake.resolves(undefined), fetch: sandbox.fake(() => { return delayedPromise(500); @@ -402,6 +403,7 @@ describe('SqlResultTest', function () { beforeEach(function () { fakeSqlService = { toHazelcastSqlException: sandbox.fake((err) => new HazelcastSqlException(null, 1, '', err)), + rethrow: sandbox.fake((err) => new HazelcastSqlException(null, 1, '', err)), fetch: sandbox.fake.resolves(fakeSqlPage), close: sandbox.fake.resolves() }; diff --git a/test/unit/sql/SqlService.js b/test/unit/sql/SqlService.js index 02f2716ce..f20432fc0 100644 --- a/test/unit/sql/SqlService.js +++ b/test/unit/sql/SqlService.js @@ -64,7 +64,7 @@ describe('SqlServiceTest', function () { isAlive: sandbox.fake.returns(true) }; fakeConnectionRegistry = { - getRandomConnection: sandbox.fake.returns(fakeConnection) + getConnectionForSql: sandbox.fake.returns(fakeConnection) }; fakeSerializationService = { toData: sandbox.fake(v => v) }; fakeInvocationService = { invokeOnConnection: sandbox.fake.resolves(fakeClientResponseMessage) }; @@ -87,9 +87,9 @@ describe('SqlServiceTest', function () { sqlService.execute('s', [], {}).should.be.instanceof(SqlResultImpl); }); - it('should call getRandomConnection once with data member argument being true', function () { + it('should call getConnectionForSql', function () { sqlService.execute('s', [], {}); - fakeConnectionRegistry.getRandomConnection.calledOnceWithExactly(true).should.be.true; + fakeConnectionRegistry.getConnectionForSql.calledOnce.should.be.true; }); it('should call toData on params', function () { @@ -131,9 +131,9 @@ describe('SqlServiceTest', function () { ).should.be.true; }); - it('should throw HazelcastSqlException if no connection to a data member is available', function () { + it('should throw HazelcastSqlException if no connection is available', function () { fakeConnectionRegistry = { - getRandomConnection: sandbox.fake.returns(null) + getConnectionForSql: sandbox.fake.returns(null) }; sqlService = new SqlServiceImpl( fakeConnectionRegistry, @@ -189,7 +189,7 @@ describe('SqlServiceTest', function () { ).should.be.true; }); - it('should invoke on connection returned from getRandomConnection', function () { + it('should invoke on connection returned from getConnectionForSql', function () { sqlService.execute('s', [], {}); fakeInvocationService.invokeOnConnection.calledOnceWithExactly(fakeConnection, fakeClientMessage).should.be.true; });