Python을 활용한 마스크 자동 구매 프로그램
코로나 19 로 인해 마스크 품귀 현상이 지속되면서 마스크 구매 경쟁이 치열해졌습니다. 공적 마스크를 배분하는 마스크 5부제까지 시행됐지만, 여전히 마스크 구하기는 어려워서 마스크 구매가 가능한 시점에 알림을 주거나 자동으로 구매하는 프로그램을 만들었습니다.
추진배경
공급의 부족으로 마스크 가격은 계속 치솟고 기존 가격을 유지하는 착한 마스크라 불리는 마스크는 오전 9시에 쉴 새 없이 새로고침을 누르며 클릭을 하여도 활성화된 [구매하기] 버튼조차 구경하기 어렵습니다. 공대생들은 실시간으로 주문한 마스크를 할머니는 5시간씩 줄을 서도 하루 5장을 구매하지 못했다는 기사가 나오기도 했습니다.
추진내용
처음에는 마스크를 게릴라로 판매하는 사이트를 크롤링하여 재고 존재 시 문자로 알람을 보내주는 방식으로 개발하였습니다. 문제는 받은 문자를 클릭하고 들어가더라도 이미 구매는 불가합니다. 문자 송/수신 시간도 존재하고 사이트는 이미 폭주 상태이기 때문입니다.

이후 수정하여 구매가 가능한 시점에 구매하기 버튼까지 클릭하고 결제 대기 상태에 대기하도록 했습니다. 역시 문제는 이 시점에 결제하기를 눌렀는데도 품절로 구매가 안 되었죠. 사실상 판매 오픈 후 대략 40초 ~ 1분 사이에 품절이 되기 때문입니다.

네이버스토어 결제 시 비빌번호 입력창(네이버페이)의 경우 입력창 이미지는 랜덤하게 변경되기 때문에 pyautogui를 이용하여 자동으로 인식하여 클릭하도록 반영했습니다.

네이버페이 결제를 위해 창을 전환하는 시간과 자동으로 인식하는 약간의 시간이 존재하여 조금 더 빠른 결제를 위해 결제수단을 “나중에 결제”로 변경하였습니다.

개발
기본적으로 selenium을 이용하며 시스템을 확인하여 retina 디스플레이인 경우, pyautogui 이미지 클릭에 대한 좌표에 대한 후처리를 위해 별도로 체크해줍니다.
import os import threading import platform import subprocess import time import pyautogui from pynput.mouse import Button, Controller import cv2 import datetime from datetime import timedelta from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import ElementClickInterceptedException, imeoutException, NoSuchElementException, NoSuchWindowException, WebDriverException if os.name == 'nt': from ctypes import windll is_retina = False if platform.system() == "Darwin": is_retina = subprocess.call("system_profiler SPDisplaysDataType | grep 'retina'", shell=True) else: is_retina = True mouse = Controller()
N개로 수행 할 수 있도록 Browser 라는 클래스로 만듭니다. Input으로 User ID, Password, Pay Number를 받습니다.
class Browser: def __init__(self, user_id, password, pay_number): self.timeout = 2 self.user_id = user_id self.password = password self.pay_number = pay_number self.base_width = 1920 self.base_height = 1080 # 모니터 해상도 가져오기 self.scr_width, self.scr_height = pyautogui.size() print('width={0}, height={1}'.format(self.scr_width, self.scr_height)) self.w_rat = self.base_width / self.scr_width self.h_rat = self.base_height / self.scr_height if os.name == 'nt': user32 = windll.user32 user32.SetProcessDPIAware()
크롬을 사용합니다. 창을 활성화하지 않을 경우 option으로 headless를 추가합니다.
def popup(self): chrome_options = Options() #chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('window-size=1920x1080') # 가져올 크기를 결정 self.driver = webdriver.Chrome('chromedriver', chrome_options=chrome_options)
구매상품에 따라 옵션이 존재 할 수 있습니다. (Ex - 사이즈, 색상 등) 구매하고자 하는 옵션을 입력받고 존재하지 않은 경우 (품절 포함) 다른 상품을 선택하여 구매할 지 입력합니다.
def setOption(self, url, combo=True, option='', another=True): self.url = url self.combo = combo self.option = option self.another = another
로그인을 수행합니다. 자동 로그인 방지를 우회하기 위해 스크립트로 id와 password를 변경합니다.
def login(self): # Naver Login self.driver.get('https://nid.naver.com/nidlogin.login') self.driver.execute_script("document.getElementsByName('id')[0].value=\'" + self.user_id + "\'") self.driver.execute_script("document.getElementsByName('pw')[0].value=\'" + self.password + "\'") WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.XPATH, '//*[@id="frmNIDLogin"]/fieldset/input'))).click() # self.driver.find_element_by_xpath('//*[@id="frmNIDLogin"]/fieldset/input').click() print('{} 로그인'.format(self.user_id)) time.sleep(1)
상품의 재고 여부를 조회합니다. 재고가 존재할 때까지 반복적으로 수행합니다.
def checkStock(self): print('{} 상품 재고 조회'.format(self.user_id)) fiedset = WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.CLASS_NAME, 'prd_num'))) while True: self.driver.refresh() try: not_good = self.driver.find_element_by_class_name('not_goods') except NoSuchElementException: not_good = '' break if self.combo: try: self.option_combo = WebDriverWait(self.driver, 2).until(EC.element_to_be_clickable((By.CLASS_NAME, '_combination_option'))) all_options = self.option_combo.find_elements_by_tag_name("option") try: product_list = WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.CLASS_NAME, 'opt_price'))) index = next(idx for idx, option in enumerate(all_options) if option.get_attribute("text") not in ["사이즈", "필수옵션"] and option.get_attribute( "title") == self.option) # 입력한 옵션값이 있을 경우 품절여부 판단 if index > 0: sold_out = all_options[1].get_attribute("text").find("품절") > -1 # 입력한 옵션은 품절이지만 다른 상품구매도 원한다면 변경하여 진행 if sold_out and self.another: try: index = next(idx for idx, option in enumerate(all_options) if option.get_attribute("text") not in ["사이즈", "필수옵션"] and option.get_attribute( "text").find("품절") == -1) except StopIteration: print('전상품 품절') time.sleep(600) elif sold_out: print('{} 품절 - 재조회'.format(self.option)) self.driver.get(self.url) thread = threading.Thread(target=self.checkStock) thread.start() except StopIteration: print('입력값에 해당하는 옵션 없음'.format(self.option)) if self.another: try: index = next(idx for idx, option in enumerate(all_options) if option.get_attribute("text") not in ["사이즈", "필수옵션"] and option.get_attribute( "text").find("품절") == -1) except StopIteration: print('전상품 품절') time.sleep(600) else: print('더이상 진행하지 않음') time.sleep(600) except TimeoutException: print('서버폭주로 비활성화') self.driver.get(self.url) thread = threading.Thread(target=self.checkStock) thread.start() if self.another: input_option = all_options[index].get_attribute("title") self.option_combo.send_keys(input_option) else: try: self.option_combo.send_keys(self.option) except AttributeError: print('Combobox Error') self.driver.get(self.url) thread = threading.Thread(target=self.checkStock) thread.start() self.checkout()
구매를 진행합니다. 위에 설명해 드린 것처럼 자동결제까지 구현했다가 좀 더 빠른 구매를 위해 “나중에 결제”로 변경하였습니다.
def checkout(self): try: self.selectItem(self.option_combo) try: WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.CLASS_NAME, 'cart'))).click() except TimeoutException: print('페이지오류 - 재조회') self.driver.get(self.url) thread = threading.Thread(target=self.checkStock) thread.start() try: cart_popup = WebDriverWait(self.driver, 1).until(EC.element_to_be_clickable((By.CLASS_NAME, '_layer_cart_add'))) WebDriverWait(cart_popup, 1).until(EC.element_to_be_clickable((By.CLASS_NAME, 'button_close'))).click() print('{} 장바구니'.format(self.user_id)) except TimeoutException: pass try: WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.PARTIAL_LINK_TEXT, '구매하기'))).click() except TimeoutException: print('페이지오류 - 재조회') self.driver.get(self.url) thread = threading.Thread(target=self.checkStock) thread.start() print('{} 구매하기'.format(self.user_id)) try: msg_popup = WebDriverWait(self.driver, 1).until(EC.element_to_be_clickable((By.CLASS_NAME, '_layer_order_check'))) WebDriverWait(msg_popup, 1).until(EC.element_to_be_clickable((By.CLASS_NAME, 'button_close'))).click() self.selectItem(self.option_combo) WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.PARTIAL_LINK_TEXT, '구매하기'))).click() except TimeoutException: pass payment = WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.CLASS_NAME, 'payment'))) paymethod_list = WebDriverWait(payment, self.timeout).until(EC.element_to_be_clickable((By.CLASS_NAME, 'paymethod_list'))) paymethod = WebDriverWait(paymethod_list, self.timeout).until(EC.element_to_be_clickable((By.XPATH, '//label[@for="generalPayments"]'))) paymethod.click() #payment = WebDriverWait(self.driver, 1).until(EC.element_to_be_clickable((By.CLASS_NAME, 'payment_list'))) #btn_payNext = WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.XPATH,'//li[4]/span/span'))) #btn_payNext.click() print('URL : {}'.format(self.driver.current_url)) try: msg_popup = WebDriverWait(self.driver, 1).until(EC.element_to_be_clickable((By.XPATH, "//form[@id='orderForm']/div/div[5]/div/div[2]/ul/li[3]/div/label"))) WebDriverWait(msg_popup, 0).until(EC.element_to_be_clickable((By.XPATH, "(//button[@type='button'])[14]"))).click() except TimeoutException: pass payment_item = paymethod.find_element_by_xpath('//label[@for="pay18"]') payment_item.click() btn_allAgree = WebDriverWait(self.driver, self.timeout).until(EC.element_to_be_clickable((By.XPATH, '//*[@id="allAgree"]'))) btn_allAgree.click() self.order() except ElementClickInterceptedException: print('버튼 비활성화') thread = threading.Thread(target=self.checkStock) thread.start() # 10분간 대기 이후 Thread는 종료 #time.sleep(600) except TimeoutException: print('서버폭주로 비활성화') self.driver.get(self.url) thread = threading.Thread(target=self.checkStock) thread.start() except NoSuchWindowException as e: print('NoSuchWindowException') print('Browser종료') # 10분간 대기 이후 Thread는 종료 time.sleep(600) except WebDriverException as e: print('WebDriverException') print('Browser종료') # 10분간 대기 이후 Thread는 종료 time.sleep(600)
모든 수행은 Thread로 수행됩니다.
def selectItem(self, option_combo): if self.combo: try: purchase_list = self.driver.find_elements_by_class_name('_purchase_unit') if purchase_list: for item in purchase_list: item_text = item.find_elements_by_tag_name('em')[0].text if self.option != item_text: btn_del = item.find_element_by_partial_link_text("삭제") btn_del.click() purchase_list = self.driver.find_elements_by_class_name('_purchase_unit') if purchase_list == []: option_combo.send_keys(self.option) else: option_combo.send_keys(self.option) except NoSuchElementException: option_combo.send_keys(self.option) def order(self): btn_pay = WebDriverWait(self.driver, self.timeout).until( EC.element_to_be_clickable((By.CLASS_NAME, 'btn_payment'))) # storing the current window handle to get back to dashbord main_page = self.driver.current_window_handle btn_pay.click() print('{} 구매완료'.format(self.user_id)) # 10분간 대기 이후 Thread는 종료 time.sleep(600) def startBrowser(self): self.popup() self.login() # Url로 이동 self.driver.get(self.url) print('{} Url로 이동'.format(self.user_id)) self.checkStock()
browser1 = Browser(아이디, 패스워드, 페이번호) browser1.setOption('https://smartstore.naver.com/mfbshop/products/4819214174', True, 'Small (소)', False) browser_thread1 = threading.Thread(target=browser1.startBrowser) browser_thread1.start()
Summary
위에 소스에는 생략되었지만 pyautogui 를 이용하여 네이버페이를 클릭하여 자동결제도 가능하게 구현했습니다. 단점은 듀얼모니터의 경우는 주모니터만 인식되고 약간의 Delay가 발생합니다. 하지만 간단한 방법으로 화면 캡처한 결과를 좌표로 활용하여 마우스와 키보드를 제어하도록 구현하는데 좋은 것 같습니다.
distinct_num = [char for char in self.pay_number] num_position = {} for num in list(set(distinct_num)): img_num = cv2.imread('img//{num}.png'.format(num=num)) img_height, img_width, img_channel = img_num.shape resize_img = cv2.resize(img_num,(int(img_width * self.w_rat), int(img_height * self.h_rat))) num_position[num] = pyautogui.locateCenterOnScreen(resize_img, confidence=0.9) for num in distinct_num: print(num_position[num]) if num_position[num] is None: break else: if is_retina: pyautogui.moveTo(num_position[num][0] / 2, num_position[num][1] / 2) mouse.click(Button.left) else: pyautogui.click(num_position[num])
다만 retina 디스플레이를 대비하여 아래와 같이 이미지를 두벌로 준비해야 합니다.

마지막으로 UI를 포함하여 개발하였습니다. UI를 개발할 때에는 다른 사용자가 사용한다는 생각으로 개발하다보니 고민해야 할 게 항상 많은 것 같습니다.

프로그램을 이용하여 대량으로 구매하진 않았습니다. 마스크 품귀 현상이 지속되고 이미 많은 사람이 기술(UiPath, Selenium, Macro 등)을 이용하여 구매하고 있었죠. 기술과 정보의 격차로 인한 마스크 수급의 차이가 있었는데 마스크 페이지가 열리면 “새로고침”을 하고 있는 저를 보면서 나는 과연 지금까지 나의 편의를 위해 개발을 한 적이 있었는지 의문에서 개발했었고 단순하지만 다양한 방법을 고민하면서 매우 재밌었던 것 같습니다.
'Project' 카테고리의 다른 글
Labeling Tool (UTTU) (2) | 2023.01.03 |
---|---|
문서이해 Solution 개발 (VisionOCR) (0) | 2023.01.02 |
Covid-19 사회적 거리두기 측정 (0) | 2022.02.24 |
Paperless Hospital - 서류관리 Mobile App 개발 (Android & Python) (0) | 2022.02.18 |
EDS(Easy Data Sync) - Database Migration & Synchronization (0) | 2021.03.02 |
댓글
이 글 공유하기
다른 글
-
문서이해 Solution 개발 (VisionOCR)
문서이해 Solution 개발 (VisionOCR)
2023.01.02OCR (Optical Character Recognition) 이란 기술은 이미지 속에 글자가 어디에 위치해 있는지 찾고 어떤 글자인지를 판별하는 기술입니다. 가족관계증명서, 등본, 사업자등록증, 보험 처리를 위한 의료진단서, 영수증, 주민등록증 촬영을 할 때 우리 주변에서 이 기술이 적용된 사례들을 어렵지 않게 찾아 볼 수 있습니다. 그만큼 실용적이고 중요한 기술입니다. 기업들은 오래전부터 OCR을 주목해왔고 다양한 방식으로 연구개발하며 적용해 왔습니다. 하지만 여전히 OCR이 기업이나 다양한 프로젝트에서 오르내리는 것은 아직 제대로 정착시키지 못했다는 점을 방증하는 것이기도 합니다. 실제 OCR 도입을 원하는 고객을 만나보면 소비자의 기대는 저만치 앞서 나가 있는데 기술이 따라오지 못하는 형국이라고… -
Covid-19 사회적 거리두기 측정
Covid-19 사회적 거리두기 측정
2022.02.24코로나19 이후, 즉 포스트 코로나(Post COVID-19) 시대의 세상은 지금과 완전히 다를 것입니다. 이미 각 국가의 고강도 사회적 거리두기 정책으로 인해 보금자리였던 집이 또 하나의 업무 공간으로 탈바꿈하였으며, 오프라인 사회에서 온라인 사회로, 나아가 디지털 사회로 전환하는 발판을 만들어 주었습니다. COVID-19 소위 뉴-노멀이라는 새로운 세상에서 IT는 지금보다 더 큰 역할을 할 것으로 예상됩니다. 언택트(Untact) 문화의 확산, 온라인 소비의 증가, 온라인 교육 등의 생활이 일상화되는 움직임은 국가 간 경계를 뒤로 한 채 전 세계적인 디지털 전환을 가속화할 것이기 때문입니다. 언제 어느 순간 나타날지도 모르는, 어쩌면 이미 다가와 있는 상상 가능한 미래에 사전적으로 대비하기 위해서는 … -
Paperless Hospital - 서류관리 Mobile App 개발 (Android & Python)
Paperless Hospital - 서류관리 Mobile App 개발 (Android & Python)
2022.02.18골칫거리 “종이서류” 동네 병원을 가보면 데스크의 서랍과 가려진 벽면 뒤에는 진료 종이서류로 빼곡하게 쌓여 있습니다. 병원의 종이차트 의무보관 기간은 5년으로, 늘어나는 환자 수 만큼 종이차트 역시 늘어나기 있고 병원은 보관 방법과 폐기 문제를 놓고 골머리를 앓고 있습니다. 그나마 재정에 여유가 있는 대형 병원들은 수십억원의 예산을 투자하여 디지털 사업을 추진하고 있지만 중/소규모의 병의원에서는 단독으로 프로젝트를 외주에 줄만큼 양이 되지 않기 때문에 외부 사업자가 일을 맡지도 않을뿐더러 전문성이 없거나 비싼 비용을 지불해야 하는 경우가 대부분입니다. 특정 의료장비의 경우는 직접 전자 문서화된 데이타로 보관 할 수 있지만, 노후화된 의료장비나 장비의 특성상 출력을 필요로 하는 경우는 종이서류로 존재하게 … -
EDS(Easy Data Sync) - Database Migration & Synchronization
EDS(Easy Data Sync) - Database Migration & Synchronization
2021.03.02시스템을 운영하다보면 테스트 DB에 반영하고 단위테스트와 통합테스트를 진행하고 변경한 데이타를 운영기에 반영해는 경우가 많습니다. 동일한 DBMS이거나 한 두개 정도의 테이블만 복제/동기화 해야한다면 시중에 잘 알려진 복제 툴을 사용해도 문제가 아닐 것입니다. 허나 시스템이 방대해지면서 여러 종류의 DB와 Table을 사용해야하고 특정 데이타만 동기화 하고 싶다면 단순한 복제 툴로 해결하기 어려워 집니다. 실제로 하나의 기준을 설정하기 위해 84개의 테이블에 접근하고 800Row에 가까운 Insert가 이루어집니다. 화면을 통해 기준을 넣다보면 4~6시간이 소요됩니다. EDS (Easy Data Sync) - Database Migration & Synchronization EDS 솔루션은 Databas…
댓글을 사용할 수 없습니다.