Using RxJava, Retrofit in Android - Kotlin

Introduction:

Reactive Programming is a programming paradigm that’s concerned with data streams and propagation of change. The reactive model listens to changes in the event and runs the relevant code accordingly.

RxJava is a Java based implementation of Reactive Programming.

Whereas, RxAndroid is specific to Android platform which utilises some classes on top of the RxJava library.

Classes and Concepts:

There are 3 main components in RxJava.

(i) Observable - It emits a stream of data or events. i.e., a class that can be used to perform some action, and publish the result.

(ii) Observer - It receivers the events or data and acts upon it. i.e. a class that waits and watches the Observable, and reacts whenever the Observable publishes results.

It have 4 interface methods to know the different states of Observable, as follows:

        onSubscribe() - invoked when the Observer is subscribed.
        onNext() - invoked when the new item is emitted from Observer.
        onComplete() - when all the items emitted completedly from Observer.
        onError() - invoked when error occured and emission of data is not successful.

(iii) Operator -  It allows us to manipulate the data that was emitted. Some notable operators are create, just, filter, map.

               (a) just operator - takes a list of arguments and converts the items into Observable items.

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

We can also pass array in just as follows:

Integer[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};

Observable.just(numbers) 
                   ...
                   ... 


                  (b) filter operator - allows the Observable to emit the only values those passes a test

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
         .filter(new Predicate<Integer>() {
             @Override
             public boolean test(Integer integer) throws Exception {
                 return integer % 2 == 0;
             }
         })
         .subscribe(new Observer<Integer>() {
             @Override
             public void onNext(Integer integer) {
                 Log.e(TAG, "EvenNumber: " + integer);
             }

             @Override
             public void onError(Throwable e) {

             }

             @Override
             public void onComplete() {

             }
         });

                  (c) map operator -  transforms each item emitted by an Observable and emits the modified item.

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer,Integer>() {
                    @Override
                    public Integer apply(Integer value) throws Exception {
                        // modifying object here and return
                        value = value*10;
                        return value;
                    }
                })
                .subscribe(new Observer<[Param]>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(User user) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

Also, the following methods are to give instruction to Observable.
  • subscribeOn() - is to instruct the Observable, in which thread the task is to be running.
  • observeOn() - is to instruct the Observable, in which thread the output data should be received.
Threading in RxJava is done with the help of Schedulers. Scheduler can be considered as a thread pool managing one or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread.

Available Schedulers: 

Warning: Lots of definition coming through....  

(a) Schedulers.io() - backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.

(b) Schedulers.computation() - backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.

(c) Schedulers.newThread() - creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.

(d) Schedulers.from(Executor executor) - creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)). This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.

(e) AndroidSchedulers.mainThread() - provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.

(f) Schedulers.single() - It is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.

(g) Schedulers.trampoline() - executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.

Implementation:

Lets see a simple example that fetch data from server and display on App.

You can find the project in Github.

build.gradle 

  implementation 'com.squareup.retrofit2:retrofit:2.5.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    implementation 'com.android.support:recyclerview-v7:28.0.0'
    implementation 'com.android.support:cardview-v7:28.0.0'

MainActivity.kt

Lets fetch data from Server using Retrofit. For that, we have to create Retrofit instance and an interface for request and receive response using Callback.

To know more about Retrofit Integration, you can check, this post.

 val requestInterface = Retrofit.Builder()
            .baseUrl(BASE_URL)
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) 
            .addConverterFactory(GsonConverterFactory.create())
            .build().create(APIInterface::class.java)

Here, addCallAdapterFactory is to add RxJava call factory, for supporting service method return types other than Call

Then let's create an Observable to listen and update.

val observable = requestInterface.getUsers()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(this::handleResponse, this::handleError)

Here,
        subscribeOn - is to carry out Network requests in I/O thread
        observeOn - is to receive API response in mainThread

        this::handleResponse, this::handleError - Java8 method reference


Whole class:


class MainActivity : AppCompatActivity() {

    lateinit var recyclerAdapter: RecyclerAdapter
    var list: ArrayList<UserModel> = ArrayList()
    val BASE_URL = "https://jsonplaceholder.typicode.com/"

    val disposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        setContentView(R.layout.activity_main)


        setAdapter()

        // Build Retrofit Request
        val requestInterface = Retrofit.Builder()
            .baseUrl(BASE_URL)
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build().create(APIInterface::class.java)


        // Call API using RxJava & RxAndroid
        val observable = requestInterface.getUsers()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(this::handleResponse, this::handleError)

        // Add API Call/Observable to CompositeDisposable
        disposable.add(observable)

    }

    /**
     * Setting Adapter to RecyclerView
     */
    private fun setAdapter() {
        recyclerAdapter = RecyclerAdapter(this@MainActivity, list)
        recyclerView.apply {
            layoutManager = LinearLayoutManager(this@MainActivity)
            adapter = recyclerAdapter
        }
    }

    /**
     * To handle Response received by RxJava's Observable
     */
    private fun handleResponse(userList: ArrayList<UserModel>) {
        Log.i("userList", "userList==" + userList)
        list.addAll(userList)
        recyclerAdapter.notifyDataSetChanged()
    }

    /**
     * To handle Error (if any), received by RxJava's Observable
     */
    private fun handleError(error: Throwable) {
        Toast.makeText(this, "Error " + error.localizedMessage, Toast.LENGTH_SHORT).show()
    }

    override fun onDestroy() {
        super.onDestroy()
        disposable.dispose()
    }
}

activity_main.xml

<?xml version="1.0" encoding="utf-8"?>
<android.support.v7.widget.RecyclerView
        xmlns:android="http://schemas.android.com/apk/res/android"
        xmlns:tools="http://schemas.android.com/tools"
        android:id="@+id/recyclerView"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        tools:context=".MainActivity"/>



RecyclerAdapter.kt

class RecyclerAdapter(val context: Context, var data : ArrayList<UserModel>?) : RecyclerView.Adapter<RecyclerAdapter.Holder>() {
    override fun onBindViewHolder(holder: Holder, position: Int) {
        holder.bindItems(data?.get(position))
    }

    override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): Holder {
        val v = LayoutInflater.from(context).inflate(R.layout.recycler_item, parent, false)
        return Holder(v)
    }

    override fun getItemCount(): Int = data?.size?:0

    class Holder(itemView: View) : RecyclerView.ViewHolder(itemView){
        fun bindItems(user: UserModel?){
            itemView.name.text =  user?.name
            itemView.email.text = user?.email
        }
    }

}

recycler_item.xml

<?xml version="1.0" encoding="utf-8"?>
<android.support.v7.widget.CardView xmlns:android="http://schemas.android.com/apk/res/android"
                                    xmlns:app="http://schemas.android.com/apk/res-auto"
                                    android:layout_width="match_parent"
                                    android:layout_height="wrap_content"
                                    android:layout_margin="10dp"
                                    app:cardElevation="3dp">

    <LinearLayout
            android:layout_width="match_parent"
            android:layout_height="wrap_content"
            android:orientation="vertical">

        <TextView
                android:id="@+id/name"
                android:layout_width="match_parent"
                android:layout_height="wrap_content"
                android:layout_weight="1"
                android:textAppearance="@android:style/TextAppearance.Medium"
                android:padding="10dp"/>

        <TextView
                android:id="@+id/email"
                android:layout_width="match_parent"
                android:layout_height="wrap_content"
                android:layout_weight="1"
                android:textAppearance="@android:style/TextAppearance.Medium"
                android:padding="10dp"/>

    </LinearLayout>

</android.support.v7.widget.CardView>

APIInterface.kt:

interface APIInterface {
    @GET("users")
    fun getUsers():  Observable<ArrayList<UserModel>>
}

UserModel.kt
data class UserModel(val name: String, val email: String)

Run Application:





You can find the project in Github.


That's it.. Bubyeeeee!!!

Comments

  1. This blog post covers using RxJava and Retrofit in Android app development with Kotlin. It's a guide to implementing reactive programming and efficient network requests for responsive Android apps. If you are looking forward to Hire Kotlin Developers, we will gladly help you.

    ReplyDelete

Post a Comment

Popular posts from this blog

SOAP Client using ksoap2 in Android - Kotlin

Retrofit Integration in Android - Kotlin

Using Camera in Android - Kotlin

Room with LiveData, ViewModel - Android Architecture Components - Kotlin

Map, Location update and AutoComplete Places - Kotlin

RecyclerView with different number of columns using SpanSizeLookup

Bottom Navigation using design Library in Android - Kotlin

Exploring Android Navigation Architecture Component - MVVM - Kotlin

Exploring Android Slices - JetPack

FCM Integration in Android - Kotlin