package com.august.luna.network.dataStream;

import android.annotation.SuppressLint;
import com.august.luna.network.dataStream.mqtt.ArrayMapMemoryPersistence;
import com.august.luna.network.dataStream.mqtt.MqttDriver;
import com.august.luna.network.dataStream.mqtt.MqttKeepAlive;
import com.august.luna.network.dataStream.mqtt.MqttMessage;
import com.august.luna.utils.rx.NetworkConnectivityObserver;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import g.b.c.h.a.f;
import g.b.c.h.a.g;
import g.b.c.h.a.h;
import g.b.c.h.a.i;
import g.b.c.h.a.k;
import g.b.c.h.a.l;
import g.b.c.h.a.m;
import g.b.c.h.a.n;
import io.fabric.sdk.android.services.settings.SettingsJsonConstants;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.Functions;
import io.reactivex.processors.PublishProcessor;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import kotlin.Metadata;
import kotlin.jvm.JvmField;
import kotlin.jvm.internal.Intrinsics;
import l.a.r;
import l.e.a.j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: RxMqtt.kt */
@Metadata(bv = {1, 0, 3}, d1 = {"\u0000n\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000e\n\u0000\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000b\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0003\u0018\u0000 )2\u00020\u0001:\u0002)*B-\u0012\b\u0010\u0002\u001a\u0004\u0018\u00010\u0003\u0012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005\u0012\u0006\u0010\u0007\u001a\u00020\b\u0012\u0006\u0010\t\u001a\u00020\n¢\u0006\u0002\u0010\u000bJ\b\u0010\u0018\u001a\u00020\u0019H\u0016J\b\u0010\u001a\u001a\u00020\u0019H\u0016J\u0016\u0010\u001b\u001a\b\u0012\u0004\u0012\u00020\u001d0\u001c2\u0006\u0010\u001e\u001a\u00020\u0006H\u0016J\u001e\u0010\u001b\u001a\b\u0012\u0004\u0012\u00020\u001d0\u001c2\u0006\u0010\u001e\u001a\u00020\u00062\u0006\u0010\u001f\u001a\u00020\u0017H\u0016J\u0010\u0010 \u001a\u00020\u000e2\u0006\u0010\u001e\u001a\u00020\u0003H\u0002J\u0016\u0010!\u001a\b\u0012\u0004\u0012\u00020\u00170\"2\u0006\u0010\u001e\u001a\u00020\u0006H\u0016J\b\u0010#\u001a\u00020\u0019H\u0016J\b\u0010$\u001a\u00020\u0019H\u0016J\u0018\u0010%\u001a\u00020\u00192\u0006\u0010\u001e\u001a\u00020\u00062\u0006\u0010&\u001a\u00020\u001dH\u0017J\u0018\u0010'\u001a\u00020(2\u0006\u0010\u001e\u001a\u00020\u00062\u0006\u0010&\u001a\u00020\u001dH\u0016R*\u0010\f\u001a\u001e\u0012\u0004\u0012\u00020\u0003\u0012\u0004\u0012\u00020\u000e0\rj\u000e\u0012\u0004\u0012\u00020\u0003\u0012\u0004\u0012\u00020\u000e`\u000fX\u0082\u0004¢\u0006\u0002\n\u0000R\u0011\u0010\t\u001a\u00020\n¢\u0006\b\n\u0000\u001a\u0004\b\u0010\u0010\u0011R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082.¢\u0006\u0002\n\u0000R\u000e\u0010\u0014\u001a\u00020\u0015X\u0082.¢\u0006\u0002\n\u0000R\u000e\u0010\u0016\u001a\u00020\u0017X\u0082\u000e¢\u0006\u0002\n\u0000R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n\u0000¨\u0006+"}, d2 = {"Lcom/august/luna/network/dataStream/RxMqtt;", "Lcom/august/luna/network/dataStream/RxDataStream;", "brokerURI", "", "channels", "", "Lcom/august/luna/network/dataStream/DataStreamChannel;", "gson", "Lcom/google/gson/Gson;", "connectivityObserver", "Lcom/august/luna/utils/rx/NetworkConnectivityObserver;", "(Ljava/lang/String;Ljava/util/List;Lcom/google/gson/Gson;Lcom/august/luna/utils/rx/NetworkConnectivityObserver;)V", "channelMap", "Ljava/util/HashMap;", "Lcom/august/luna/network/dataStream/RxMqtt$StreamHolder;", "Lkotlin/collections/HashMap;", "getConnectivityObserver", "()Lcom/august/luna/utils/rx/NetworkConnectivityObserver;", "disposables", "Lio/reactivex/disposables/CompositeDisposable;", "driver", "Lcom/august/luna/network/dataStream/mqtt/MqttDriver;", "forceDisabled", "", "disable", "", "enable", "getChannel", "Lio/reactivex/Flowable;", "Lcom/google/gson/JsonObject;", "channel", "forceReconnect", "getOrCreate", "isChannelOnline", "Lio/reactivex/Single;", "onBackground", "onForeground", "publish", "data", "publishRx", "Lio/reactivex/Completable;", "Companion", "StreamHolder", "pubsub_release"}, k = 1, mv = {1, 1, 15})
/* loaded from: classes.dex */
public final class RxMqtt implements RxDataStream {

    @NotNull
    public static final String DEFAULT_BROKER = "tcp://dev-mqtt.august.com:1883";

    /* renamed from: a, reason: collision with root package name */
    public MqttDriver f8967a;

    /* renamed from: b, reason: collision with root package name */
    public final HashMap<String, StreamHolder> f8968b;

    /* renamed from: c, reason: collision with root package name */
    public CompositeDisposable f8969c;

    /* renamed from: d, reason: collision with root package name */
    public boolean f8970d;

    /* renamed from: e, reason: collision with root package name */
    public final Gson f8971e;

    /* renamed from: f, reason: collision with root package name */
    @NotNull
    public final NetworkConnectivityObserver f8972f;

    /* renamed from: Companion, reason: from kotlin metadata */
    @Deprecated
    public static final Companion INSTANCE = new Companion(null);

    @JvmField
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) RxMqtt.class);

    /* compiled from: RxMqtt.kt */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000:\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\t\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\b\u0000\u0018\u00002\u00020\u0001B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u000e\u0010\u0012\u001a\u00020\u00132\u0006\u0010\u0014\u001a\u00020\u0015R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n\u0000R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n\u0000\u001a\u0004\b\u0007\u0010\bR\u0011\u0010\t\u001a\u00020\n8F¢\u0006\u0006\u001a\u0004\b\u000b\u0010\fR\u0017\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000e¢\u0006\b\n\u0000\u001a\u0004\b\u0010\u0010\u0011¨\u0006\u0016"}, d2 = {"Lcom/august/luna/network/dataStream/RxMqtt$StreamHolder;", "", "gson", "Lcom/google/gson/Gson;", "(Lcom/google/gson/Gson;)V", "_sequence", "Ljava/util/concurrent/atomic/AtomicLong;", "getGson", "()Lcom/google/gson/Gson;", "sequence", "", "getSequence", "()J", "stream", "Lio/reactivex/processors/PublishProcessor;", "Lcom/google/gson/JsonObject;", "getStream", "()Lio/reactivex/processors/PublishProcessor;", "processMessage", "", SettingsJsonConstants.PROMPT_MESSAGE_KEY, "Lcom/august/luna/network/dataStream/mqtt/MqttMessage;", "pubsub_release"}, k = 1, mv = {1, 1, 15})
    /* loaded from: classes.dex */
    public static final class StreamHolder {

        /* renamed from: a, reason: collision with root package name */
        @NotNull
        public final PublishProcessor<JsonObject> f8973a;

        /* renamed from: b, reason: collision with root package name */
        public final AtomicLong f8974b;

        /* renamed from: c, reason: collision with root package name */
        @NotNull
        public final Gson f8975c;

        public StreamHolder(@NotNull Gson gson) {
            Intrinsics.checkParameterIsNotNull(gson, "gson");
            this.f8975c = gson;
            PublishProcessor<JsonObject> create = PublishProcessor.create();
            Intrinsics.checkExpressionValueIsNotNull(create, "PublishProcessor.create()");
            this.f8973a = create;
            this.f8974b = new AtomicLong(0L);
        }

        @NotNull
        /* renamed from: getGson, reason: from getter */
        public final Gson getF8975c() {
            return this.f8975c;
        }

        public final long getSequence() {
            return this.f8974b.getAndIncrement();
        }

        @NotNull
        public final PublishProcessor<JsonObject> getStream() {
            return this.f8973a;
        }

        public final void processMessage(@NotNull MqttMessage message) {
            Intrinsics.checkParameterIsNotNull(message, "message");
            JsonObject jsonObject = (JsonObject) this.f8975c.fromJson(message.getF8999a(), JsonObject.class);
            if (jsonObject.has("origin")) {
                JsonElement jsonElement = jsonObject.get("origin");
                Intrinsics.checkExpressionValueIsNotNull(jsonElement, "json[\"origin\"]");
                if (Intrinsics.areEqual(jsonElement.getAsString(), "luna")) {
                    Intrinsics.checkExpressionValueIsNotNull(jsonObject.get("status"), "json[\"status\"]");
                    if (!Intrinsics.areEqual(r3.getAsString(), "self-five")) {
                        Companion unused = RxMqtt.INSTANCE;
                        RxMqtt.LOG.debug("filtered message with origin:{} status:{}", jsonObject.get("origin"), jsonObject.get("status"));
                        return;
                    }
                }
            }
            Companion unused2 = RxMqtt.INSTANCE;
            RxMqtt.LOG.debug("channel: {} received message: {}", message.getChannel(), jsonObject);
            this.f8973a.onNext(jsonObject);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: RxMqtt.kt */
    /* renamed from: com.august.luna.network.dataStream.RxMqtt$a, reason: from kotlin metadata */
    /* loaded from: classes.dex */
    public static final class Companion {
        public Companion() {
        }

        public /* synthetic */ Companion(j jVar) {
            this();
        }
    }

    public RxMqtt(@Nullable String str, @NotNull List<? extends DataStreamChannel> channels, @NotNull Gson gson, @NotNull NetworkConnectivityObserver connectivityObserver) {
        Intrinsics.checkParameterIsNotNull(channels, "channels");
        Intrinsics.checkParameterIsNotNull(gson, "gson");
        Intrinsics.checkParameterIsNotNull(connectivityObserver, "connectivityObserver");
        this.f8971e = gson;
        this.f8972f = connectivityObserver;
        this.f8968b = new HashMap<>(channels.size());
        MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(str == null ? DEFAULT_BROKER : str, MqttAsyncClient.generateClientId(), new ArrayMapMemoryPersistence(), new MqttKeepAlive());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setAutomaticReconnect(false);
        mqttConnectOptions.setKeepAliveInterval(30);
        this.f8967a = new MqttDriver(mqttAsyncClient, mqttConnectOptions, new f(this));
    }

    public static final /* synthetic */ Companion access$Companion() {
        return INSTANCE;
    }

    public static final /* synthetic */ MqttDriver access$getDriver$p(RxMqtt rxMqtt) {
        MqttDriver mqttDriver = rxMqtt.f8967a;
        if (mqttDriver != null) {
            return mqttDriver;
        }
        Intrinsics.throwUninitializedPropertyAccessException("driver");
        throw null;
    }

    public final StreamHolder a(String str) {
        HashMap<String, StreamHolder> hashMap = this.f8968b;
        StreamHolder streamHolder = hashMap.get(str);
        if (streamHolder == null) {
            streamHolder = new StreamHolder(this.f8971e);
            this.f8968b.put(str, streamHolder);
            MqttDriver mqttDriver = this.f8967a;
            if (mqttDriver == null) {
                Intrinsics.throwUninitializedPropertyAccessException("driver");
                throw null;
            }
            mqttDriver.addChannel(r.listOf(str));
            hashMap.put(str, streamHolder);
        }
        return streamHolder;
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    public void disable() {
        this.f8970d = true;
        MqttDriver mqttDriver = this.f8967a;
        if (mqttDriver != null) {
            mqttDriver.disconnect();
        } else {
            Intrinsics.throwUninitializedPropertyAccessException("driver");
            throw null;
        }
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    public void enable() {
        this.f8970d = false;
        MqttDriver mqttDriver = this.f8967a;
        if (mqttDriver != null) {
            mqttDriver.reconnect().subscribe(Functions.EMPTY_ACTION, Functions.emptyConsumer());
        } else {
            Intrinsics.throwUninitializedPropertyAccessException("driver");
            throw null;
        }
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    @NotNull
    public Flowable<JsonObject> getChannel(@NotNull DataStreamChannel channel) {
        Intrinsics.checkParameterIsNotNull(channel, "channel");
        return getChannel(channel, false);
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    @NotNull
    public Flowable<JsonObject> getChannel(@NotNull DataStreamChannel channel, boolean forceReconnect) {
        Intrinsics.checkParameterIsNotNull(channel, "channel");
        Flowable<JsonObject> flatMapPublisher = Single.just(Boolean.valueOf(forceReconnect)).flatMapCompletable(new g(this)).andThen(Single.fromCallable(new h(this, channel))).flatMapPublisher(i.f21534a);
        Intrinsics.checkExpressionValueIsNotNull(flatMapPublisher, "Single.just(forceReconne…apPublisher { it.stream }");
        return flatMapPublisher;
    }

    @NotNull
    /* renamed from: getConnectivityObserver, reason: from getter */
    public final NetworkConnectivityObserver getF8972f() {
        return this.f8972f;
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    @NotNull
    public Single<Boolean> isChannelOnline(@NotNull DataStreamChannel channel) {
        Intrinsics.checkParameterIsNotNull(channel, "channel");
        MqttDriver mqttDriver = this.f8967a;
        if (mqttDriver == null) {
            Intrinsics.throwUninitializedPropertyAccessException("driver");
            throw null;
        }
        Single<Boolean> onErrorReturn = mqttDriver.getStatus().timeout(10L, TimeUnit.SECONDS).map(g.b.c.h.a.j.f21535a).onErrorReturn(k.f21536a);
        Intrinsics.checkExpressionValueIsNotNull(onErrorReturn, "driver.getStatus()\n     … .onErrorReturn { false }");
        return onErrorReturn;
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    public void onBackground() {
        MqttDriver mqttDriver = this.f8967a;
        if (mqttDriver == null) {
            Intrinsics.throwUninitializedPropertyAccessException("driver");
            throw null;
        }
        mqttDriver.disconnect();
        CompositeDisposable compositeDisposable = this.f8969c;
        if (compositeDisposable != null) {
            compositeDisposable.dispose();
        } else {
            Intrinsics.throwUninitializedPropertyAccessException("disposables");
            throw null;
        }
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    public void onForeground() {
        this.f8969c = new CompositeDisposable();
        CompositeDisposable compositeDisposable = this.f8969c;
        if (compositeDisposable == null) {
            Intrinsics.throwUninitializedPropertyAccessException("disposables");
            throw null;
        }
        MqttDriver mqttDriver = this.f8967a;
        if (mqttDriver == null) {
            Intrinsics.throwUninitializedPropertyAccessException("driver");
            throw null;
        }
        Disposable subscribe = mqttDriver.connect().andThen(Flowable.defer(new l(this))).subscribe(new m(this), n.f21539a);
        Intrinsics.checkExpressionValueIsNotNull(subscribe, "driver.connect()\n       …on a channel!\", error) })");
        RxDataStreamKt.plusAssign(compositeDisposable, subscribe);
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    @SuppressLint({"CheckResult"})
    public void publish(@NotNull DataStreamChannel channel, @NotNull JsonObject data) {
        Intrinsics.checkParameterIsNotNull(channel, "channel");
        Intrinsics.checkParameterIsNotNull(data, "data");
        publishRx(channel, data).subscribe(Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    @Override // com.august.luna.network.dataStream.RxDataStream
    @NotNull
    public Completable publishRx(@NotNull DataStreamChannel channel, @NotNull JsonObject data) {
        Intrinsics.checkParameterIsNotNull(channel, "channel");
        Intrinsics.checkParameterIsNotNull(data, "data");
        if (this.f8970d) {
            LOG.warn("Warning - client is force-disabled. Call enable()");
            Completable complete = Completable.complete();
            Intrinsics.checkExpressionValueIsNotNull(complete, "Completable.complete()");
            return complete;
        }
        String channel2 = channel.getChannel();
        MqttDriver mqttDriver = this.f8967a;
        if (mqttDriver == null) {
            Intrinsics.throwUninitializedPropertyAccessException("driver");
            throw null;
        }
        data.addProperty("origin", "luna");
        StreamHolder streamHolder = this.f8968b.get(channel2);
        data.addProperty("sequence_number", streamHolder != null ? Long.valueOf(streamHolder.getSequence()) : null);
        String jsonElement = data.toString();
        Intrinsics.checkExpressionValueIsNotNull(jsonElement, "data.let {\n             ….toString()\n            }");
        return mqttDriver.publish(channel2, jsonElement);
    }
}
